"""
Various subroutines used in the main pipeline flow.
We keep them separately to make the pipeline logic easier to read at a glance.
"""
import datetime
import ConfigParser
import logging
import os
from pprint import pprint
from collections import defaultdict, namedtuple
from tkp.config import parse_to_dict
from tkp.db.dump import dump_db
import colorlog
logger = logging.getLogger(__name__)
[docs]def load_job_config(pipe_config):
"""
Locates the job_params.cfg in 'job_directory' and loads via ConfigParser.
"""
job_directory = pipe_config['DEFAULT']['job_directory']
job_config = ConfigParser.SafeConfigParser()
job_config.read(os.path.join(job_directory, 'job_params.cfg'))
return parse_to_dict(job_config)
def dump_configs_to_logdir(log_dir, job_config, pipe_config):
if not os.path.isdir(log_dir):
os.makedirs(log_dir)
with open(os.path.join(log_dir, 'job_params.cfg'), 'w') as f:
pprint(job_config, stream=f)
with open(os.path.join(log_dir, 'pipeline.cfg'), 'w') as f:
pprint(pipe_config, stream=f)
[docs]def check_job_configs_match(job_config_1, job_config_2):
"""
Check if job configs match, except dataset_id which we expect to change.
"""
jc_from_file = job_config_1.copy()
jc_from_db = job_config_2.copy()
del jc_from_file['persistence']['dataset_id']
del jc_from_db['persistence']['dataset_id']
return jc_from_file==jc_from_db
[docs]def setup_logging(log_dir, debug, use_colorlog,
basename='trap'):
"""
Sets up logging to stdout, + info/debug level logfiles in log_dir.
Args:
log_dir (string): log directory
debug (bool): do we want debug level logging on stdout?
basename (string): basename of the log file
"""
if not os.path.isdir(log_dir):
os.makedirs(log_dir)
info_log_file = os.path.join(log_dir, basename+'.log')
debug_log_file = os.path.join(log_dir, basename+'.debug.log')
formatter = logging.Formatter(
'%(asctime)s %(levelname)s %(name)s: %(message)s',
datefmt="%Y-%m-%d %H:%M:%S"
)
debug_formatter = logging.Formatter(
'%(asctime)s %(levelname)s %(name)s %(funcName)s() process %(processName)s '
'(%(process)d) thread %(threadName)s (%(thread)d) : %(message)s',
datefmt="%Y-%m-%d %H:%M:%S"
)
info_hdlr = logging.FileHandler(info_log_file)
info_hdlr.setLevel(logging.INFO)
info_hdlr.setFormatter(formatter)
debug_hdlr = logging.FileHandler(debug_log_file)
debug_hdlr.setLevel(logging.DEBUG)
debug_hdlr.setFormatter(debug_formatter)
stdout_handler = logging.StreamHandler()
if debug:
stdout_handler.setLevel(logging.DEBUG)
formatter = debug_formatter
else:
stdout_handler.setLevel(logging.INFO)
formatter = formatter
if use_colorlog:
formatter = colorlog.ColoredFormatter(
"%(log_color)s" + formatter._fmt,
datefmt="%H:%M:%S",
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
}
)
stdout_handler.setFormatter(formatter)
root_logger = logging.getLogger()
# We set level to debug, and handle output via handler-levels
root_logger.setLevel(logging.DEBUG)
# Trash any preset handlers and start fresh
root_logger.handlers = []
root_logger.addHandler(stdout_handler)
root_logger.addHandler(info_hdlr)
root_logger.addHandler(debug_hdlr)
logger.info("logging to %s" % log_dir)
# Suppress noisy streams
logging.getLogger('tkp.sourcefinder.image.sigmaclip').setLevel(logging.INFO)
def dump_database_backup(db_config, job_dir):
if 'dump_backup_copy' in db_config:
if db_config['dump_backup_copy']:
output_name = os.path.join(
job_dir, "%s_%s_%s.dump" % (
db_config['host'], db_config['database'],
datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
)
)
dump_db(
db_config['engine'], db_config['host'], str(db_config['port']),
db_config['database'], db_config['user'], db_config['password'],
output_name
)
ImageMetadataForSort = namedtuple('ImageMetadataForSort', [
'url',
'timestamp',
'frequency',
])
[docs]def group_per_timestep(metadatas):
"""
groups a list of TRAP images per time step.
Per time step the images are order per frequency. We could add other
ordering logic (per stoke) later on.
Args:
metadatas (tuple): list of ImageMetadataForSort
Returns:
tuple: List of tuples. The list is sorted by timestamp.
Each tuple has the timestamp as a first element,
and a list of ImageMetadataForSort sorted by frequency as the
second element.
"""
grouped_dict = defaultdict(list)
for metadata in metadatas:
grouped_dict[metadata.timestamp].append(metadata)
grouped_tuple = grouped_dict.items()
# sort for timestamp
grouped_tuple.sort()
# and then sort the nested items per freq and stokes
[l[1].sort(key=lambda x: x.frequency) for l in grouped_tuple]
# only return the urls
return [(stamp, [m.url for m in metas]) for stamp, metas in grouped_tuple]