Source code for ocd.config

# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''Configuration functionalities.

The :class:`ConfigUpdater` class handles the following event:

+------------------------------+----------------------------------------------+
| Topic                        | Event                                        |
+==============================+==============================================+
| ocd.config.                  | section (string): section to modify          |
+ modification_request         +----------------------------------------------+
|                              | option (string): option to modify            |
+                              +----------------------------------------------+
|                              | value (string): new value of the option      |
+------------------------------+----------------------------------------------+

The :meth:`ConfigUpdater.handle_event` method emits the following event after
updating the configuration:

+------------------------------+----------------------------------------------+
| Topic                        | Event                                        |
+==============================+==============================================+
| ocd.config.                  | success (bool): whether the update is        |
| modification_replay          | successful or not                            |
+                              +----------------------------------------------+
|                              | update_msg (string): when updating the       |
|                              | configuration, the objects can return a      |
|                              | message. The messages are stored here        |
+                              +----------------------------------------------+
|                              | error_msg (string): error message during     |
|                              | validation                                   |
+                              +----------------------------------------------+
|                              | warn_msg (string): warning messages during   |
|                              | validation                                   |
+------------------------------+----------------------------------------------+

If the objects passed to :class:`ConfigUpdater` what to update the
their configuration object they must provide a :meth:`update_config`. The
return value of these methods, if any, is collated in the ``update_msg`` event
value.

The :attr:`CONFIG_MOD_REPLY` events are emitted using the ``ocd_main_loop`` ZMQ
server. See :func:`ocd.utils.init_zmq_servers` for more information.
'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import argparse as ap
from configparser import Error as ConfError
import os
import traceback
import warnings

import pkg_resources
import pyhetdex.tools.configuration as configparser
import pyhetdex.tools.io_helpers as ioh
from six.moves import StringIO
from tcs_lib.string_helpers import bytes_to_string

from . import errors
from . import utils


MASTER_CONFIG_FILE = os.path.join(utils.STATIC_DIR, 'ocd.cfg')
'''Name of the master configuration file w.r.t. the ocd package'''
LOGGER_CONF = os.path.join(utils.STATIC_DIR, 'loggers.cfg')
'''Name of the loggers configuration file w.r.t. the ocd package'''

CONFIG_MOD_REQUEST = 'ocd.config.modification_request'
'''Topic of the events emitted to request a configuration request'''
CONFIG_MOD_REPLY = 'ocd.config.modification_replay'
'''After the modification has been done send a replay with all the necessary
information'''


[docs]def config_file_argument(parser): '''Add the parser argument to provide and parse the configuration files. The configuration object assigned to the ``config`` attribute of the parsed namespace Parameters ---------- parser : :class:`argparse.ArgumentParser` parser to which add the argument Returns ------- parser : :class:`argparse.ArgumentParser` modified parser ''' parser.add_argument('--config', nargs='?', const='ocd.cfg', help='''Override the builtin configurations with the given configuration file. If the option is used without a value, "%(const)s" is used. Non existing files are skipped and a warning is printed to the screen. ''') return parser
[docs]def load_config(config_file=None, args=None, raise_serious=False, raise_warning=False): """Load the configuration file. The default file is loaded and then, if provided, overwritten by the ``config_file``. **After** the configuration files are loaded and if ``args`` is given, use the latter and :func:`~pyhetdex.tools.configuration.overrides_conf` to override the configuration entries. Parameters ---------- config_file : string, optional name of the configuration file to use to override the configurations args : :class:`argparse.Namespace`, optional structure, typically the Namespace create by :mod:`argparse`. raise_serious, raise_warning : bool, optional options passed to :func:`check_config` Returns ------- config : :class:`pyhetdex.tools.configuration.ConfigParser` """ # create the configuration file allowing options without value to be able # to have ``None`` from the parsed file config = configparser.ConfigParser(defaults=os.environ, allow_no_value=True) # load the default configuration file config_stream = pkg_resources.resource_string(__name__, MASTER_CONFIG_FILE) config_stream = StringIO(bytes_to_string(config_stream)) config.read_file(config_stream) if config_file: # load the given config file loaded_names = config.read(config_file) if not loaded_names: msg = ("File '{}' does not exist. The default configuration will" " not be overwritten".format(config_file)) warnings.warn(msg) if args: config = configparser.override_conf(config, args) # move the wait_last_write to the wait_last_readout config = move_config(config, ['wait_last_write', ], 'run_shot', 'run_shot', '2018/06/05', to_options=['wait_last_readout', ]) config = check_config(config, raise_serious=raise_serious, raise_warning=raise_warning) return config
[docs]def move_config(conf, from_options, from_section, to_section, end_move_date, to_options=None): '''Move the configuration options from section ``from_section`` to ``to_section`` warning about every option moved. If an option is not found in ``from_section``, it is skipped. If the ``from_section`` is not found, skip the move. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration to modify from_options : list of strings configuration entries to move from_section, to_section : strings configuration section from and to which the options must be moved. end_move_date: string earliest date at which the automatic moving of the configuration is removed to_options : list of strings, optional if ``None`` the option names are the same as ``from_options``; otherwise it must be a list with the same length of ``from_options`` Returns ------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` modified configuration ''' if to_options is None: to_options = from_options[:] else: if len(from_options) != len(to_options): raise ValueError('"to_options" must be either "None" or have the' ' same number of elements as "from_options". It' ' has {} elements instead of' ' {}'.format(len(to_options), len(from_options))) try: from_sec = conf[from_section] except KeyError: return conf to_sec = conf[to_section] for from_opt, to_opt in zip(from_options, to_options): try: to_sec[to_opt] = from_sec.pop(from_opt) except KeyError: pass # ignore non existing options else: warnings.warn('Option "{fo}" in section [{fs}] has been' ' automatically moved to option "{to}"' ' in section [{ts}]. This functionality will be' ' removed after the {end}. Please update your' ' copy of the configuration to reflect the official' ' OCD one.'.format(fo=from_opt, to=to_opt, fs=from_section, ts=to_section, end=end_move_date)) return conf
[docs]def check_config(conf, raise_serious=False, raise_warning=False): '''Runs check on the configuration and notify the user of possible issues. There are two times of warnings: * :exc:`ocd.errors.OCDSeriousWarning`: used to notify serious warnings; those warnings are, by default, turned into exceptions; * :exc:`ocd.errors.OCDWarning`: used to notify modifications to the configuration object; Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration to modify raise_serious : bool, optional whether to raise :exc:`~ocd.errors.OCDSeriousWarning` raise_warning : bool, optional whether to raise :exc:`~ocd.errors.OCDWarning` Returns ------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration to modify Raises ------ OCDSeriousWarning if ``raise_serious`` is ``True`` and there are serious warnings OCDWarning if ``raise_warning`` is ``True`` and there are warnings ''' serious_warnings = [] standard_warnings = [] # check the serious stuff # check that event_urls and ocd_db_replay from [urls] are not given at the # same time if conf['urls']['event_urls'] and conf['urls']['ocd_db_replay']: msg = ('* Both the "event_urls" and "ocd_db_replay" urls in the' ' "[urls]" section are given. This might give rise to' ' inconsistencies as ``ocd run`` can listen to two sources' ' of events') serious_warnings.append(msg) # check the other stuff # if [autoschedule]/skip_shot_submission is true and # [database]/mysql_update_obsnum is true, set the latter to false and # notify the user skip_shot = conf['autoschedule'].getboolean('skip_shot_submission', fallback=False) update_mysql = conf['database'].getboolean('mysql_update_obsnum', fallback=True) if skip_shot and update_mysql: msg = ('* Both ``skip_shot_submission`` option of the' ' ``[autoschedule]``' ' section and ``mysql_update_obsnum`` of the ``[database]``' ' section is set to ``true/yes/on``. Since we do not want to' ' modify the observation number table if OCD does not run' ' shots, the latter option has be set to ``false``.') standard_warnings.append(msg) conf['database']['mysql_update_obsnum'] = 'false' all_warnings = [[standard_warnings, errors.OCDWarning, raise_warning], [serious_warnings, errors.OCDSeriousWarning, raise_serious]] for warn_list, Category, should_raise in all_warnings: if warn_list: warn = Category('\n'.join(warn_list)) if should_raise: raise warn else: warnings.warn(warn) return conf
[docs]def get_value(conf, section, option, cast_to=None): '''Get the value of the configuration option casting it to ``cast_to``. Fore empty options (without a ``=``) returns a ``None`` Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration object section, option : strings section and option values cast_to : class, optional class to which cast the value. Allowed values: ``None``, ``int``, ``float``, ``bool`` Raises ------ ocd.errors.OCDValueError if ``cast_to`` is not one of the allowed cast classes ''' if cast_to is None: getter = conf.get elif cast_to is int: getter = conf.getint elif cast_to is float: getter = conf.getfloat elif cast_to is bool: getter = conf.getboolean else: raise errors.OCDValueError('The allowed values of ``cast_to`` are:' ' None, int, float, bool') try: return getter(section, option) except (TypeError, AttributeError, ValueError): return None
[docs]def save_config(conf): '''Save the configuration file in a temporary directory. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration to save Returns ------- out_file : string name of the file ''' out_file = utils.get_out_file(':tmpdir:', 'ocd_{}.cfg', '', 5) with open(out_file, 'w') as f: conf.write(f) return out_file
[docs]class ConfigUpdater(object): '''Update the configuration in all the ``objects``. The original configuration object and the list of objects to update are passed when creating one instance. When an event with topic :attr:`CONFIG_MOD_REQUEST` is received, the configuration object is updated and the ``update_config(config)`` method of all the ``objects`` is called. If an object doesn't have such a method it is skipped. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration to modify objects : objects that (might) want to update the configuration Attributes ---------- conf, objects : same as input topics : list list of topics to which the :class:`ConfigUpdater` react ''' def __init__(self, conf, *objects): self.conf = conf self.objects = objects self.topics = [CONFIG_MOD_REQUEST, ]
[docs] def handle_event(self, tcs_topic, tcs_event): '''When an event with the given topic arrives, decide whether it's time to run a shot. Parameters ---------- tcs_topic : string topic of the event tcs_event : dict event to handle Returns ------- bool whether the event triggers some computation ''' if tcs_topic not in self.topics: return False else: reply = dict(success=False, update_msg='', error_msg='', warn_msg='') try: self._update_config(tcs_event) reply['success'] = True except KeyError as e: reply['error_msg'] = ('The incoming event is missing the' ' mandatory key {}'.format(e.args[0])) except ConfError as e: reply['error_msg'] = ('Setting the configuration failed' ' because {}'.format(e.message)) except TypeError as e: reply['error_msg'] = ('The value of the option must be a' ' string not:' '{}'.format(tcs_event['value'])) else: messages = [] try: for obj in self.objects: try: msg = obj.update_config(self.conf) except AttributeError: msg = '{} was not updated'.format(obj) messages.append(msg) except Exception as e: error_msg = ('While updating the configuration the' ' following exception happened:' '\n{}'.format(traceback.format_exc())) reply['error_msg'] = error_msg reply['success'] = False reply['update_msg'] = '\n'.join(messages) # send back the reply server = utils.get_zmq_server('ocd_main_loop') server.send_tcs_event(CONFIG_MOD_REPLY, reply) return True
[docs] def _update_config(self, tcs_event): '''Update the configuration file using the tcs_event. Get the section, option and value to update from the event and then update the configuration. Parameters ---------- tcs_event : dict event to handle ''' sec = tcs_event['section'] opt = tcs_event['option'] val = tcs_event['value'] self.conf.set(sec, opt, val)
# command line subcommands
[docs]def config_subcommand(subparsers): '''Add a subcommand "config" to handle the configuration file. Parameters ---------- subparsers : argparse subparsers object subparser to use to generate new parsers Returns ------- parser : :class:`argparse.ArgumentParser` modified parser ''' parser = subparsers.add_parser('config', help='Manage configuration files', description='''OCD subcommand to deal with configuration files''') subsubparser = parser.add_subparsers(title='''Configuration subcommands''', description="""Type '%(prog)s cmd -h' for detailed information about the subcommands""", dest='conf_subcommand') subsubparser.required = True copy_p = subsubparser.add_parser('copy', help='''Copy the configuration files''', description='''Copy the configuration files in the desired directory''', parents=[utils.common_parser_arguments(), ], formatter_class=ap.ArgumentDefaultsHelpFormatter) copy_p.set_defaults(func=_copy_conf) copy_p.add_argument('-t', '--to-dir', default='.', help='''Copy the configuration files to the given directory. By default use the current directory''') copy_p.add_argument('-f', '--force', action='store_true', help="""Force overwriting existing configuration files. If this option is used, all local modifications will be lost""") copy_p.add_argument('-b', '--backup', action='store_true', help='''Backup existing files before copying the new ones. If this option is used, the `-f/--force` is ignored''') send = subsubparser.add_parser('send', help='''Send a configuration option''', description='''Send a configuration option to the OCD main loop''', parents=[utils.common_parser_arguments(), ], formatter_class=ap.ArgumentDefaultsHelpFormatter) send.set_defaults(func=_send_conf) send = utils.override_epilog_msg(send) send.add_argument('section', help='Configuration section') send.add_argument('option', help='Configuration option to override') send.add_argument('value', help='New configuration value') send = config_file_argument(send) send.add_argument('-n', '--max-attempts', type=int, help='''Maximum number of times "%(prog)s" attempts to enable/disable HETDEX shot execution''', default=5) send.add_argument('-I', '--interval', type=float, help='''Wait "%(dest)s" seconds before a new attempt.''', default=1) title = 'Override options in the [urls] section' overrides_urls = send.add_argument_group(title=title) overrides_urls.add_argument('-o', '--ocd-config', dest='setting__urls__ocd_config', metavar='OCD_CONFIG', help='''Urls/paths used to send the signal to update the configuration entries.''', nargs='+') overrides_urls.add_argument('-i', '--ocd-main-loop', dest='setting__urls__ocd_main_loop', metavar='OCD_MAIN_LOOP', help='''Url/path from where the confirmation of the update arrives.''', nargs='+') title = 'Override options in the [config] section' overrides_config = send.add_argument_group(title=title) overrides_config.add_argument('-N', '--n-ocd-config-hetdex', dest='setting__config__n_ocd_config', metavar='N_OCD_CONFIG', help='''Which of the urls for the ``ocd_config`` option of the ``[urls]`` section use to setup a ZMQ server''') return subparsers
[docs]def _copy_conf(args): '''Copy the configuration files into the target directory Parameters ---------- args : :class:`~argparse.Namespace` parsed command line arguments ''' ocd_cr = ioh.CopyResource(__name__, backup=args.backup, force=args.force, verbose=False) ocd_cr([MASTER_CONFIG_FILE, LOGGER_CONF], args.to_dir, reldir=utils.STATIC_DIR) ocd_cr.report()
[docs]def _send_conf(args): '''Copy the configuration files into the target directory Parameters ---------- args : :class:`~argparse.Namespace` parsed command line arguments ''' # load the configuration file config = load_config(config_file=args.config, args=args) # Setup the servers utils.init_zmq_servers(args.subcommand, config) print('Servers initialized') send_listen = utils.SendAndListen.from_names('ocd_config', 'ocd_main_loop', topics=CONFIG_MOD_REPLY, n_attempts=args.max_attempts, interval=args.interval) print('Starting to communicate the new configuration to OCD') event = {'section': args.section, 'option': args.option, 'value': args.value} reply = send_listen.communicate(CONFIG_MOD_REQUEST, event) returned_topic, returned_event, timeout_hit = reply if timeout_hit: print('WARNING: OCD has not replied back confirming that it acted' ' on my request. Are you sure that OCD is up and running?') else: success = returned_event['success'] if success: msg = ('SUCCESS: OCD confirms that the new configuration option' ' has been propagated.') else: msg = ('ERROR: Setting the new configuration option failed') separator = '************************' print(msg) print(separator) print(returned_event['update_msg'] or 'No info messages') print(separator) print(returned_event['warn_msg'] or 'No warning messages') print(separator) print(returned_event['error_msg'] or 'No error messages')