# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''Configuration functionalities.
The :class:`ConfigUpdater` class handles the following event:
+------------------------------+----------------------------------------------+
| Topic | Event |
+==============================+==============================================+
| ocd.config. | section (string): section to modify |
+ modification_request +----------------------------------------------+
| | option (string): option to modify |
+ +----------------------------------------------+
| | value (string): new value of the option |
+------------------------------+----------------------------------------------+
The :meth:`ConfigUpdater.handle_event` method emits the following event after
updating the configuration:
+------------------------------+----------------------------------------------+
| Topic | Event |
+==============================+==============================================+
| ocd.config. | success (bool): whether the update is |
| modification_replay | successful or not |
+ +----------------------------------------------+
| | update_msg (string): when updating the |
| | configuration, the objects can return a |
| | message. The messages are stored here |
+ +----------------------------------------------+
| | error_msg (string): error message during |
| | validation |
+ +----------------------------------------------+
| | warn_msg (string): warning messages during |
| | validation |
+------------------------------+----------------------------------------------+
If the objects passed to :class:`ConfigUpdater` what to update the
their configuration object they must provide a :meth:`update_config`. The
return value of these methods, if any, is collated in the ``update_msg`` event
value.
The :attr:`CONFIG_MOD_REPLY` events are emitted using the ``ocd_main_loop`` ZMQ
server. See :func:`ocd.utils.init_zmq_servers` for more information.
'''
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse as ap
from configparser import Error as ConfError
import os
import traceback
import warnings
import pkg_resources
import pyhetdex.tools.configuration as configparser
import pyhetdex.tools.io_helpers as ioh
from six.moves import StringIO
from tcs_lib.string_helpers import bytes_to_string
from . import errors
from . import utils
MASTER_CONFIG_FILE = os.path.join(utils.STATIC_DIR, 'ocd.cfg')
'''Name of the master configuration file w.r.t. the ocd package'''
LOGGER_CONF = os.path.join(utils.STATIC_DIR, 'loggers.cfg')
'''Name of the loggers configuration file w.r.t. the ocd package'''
CONFIG_MOD_REQUEST = 'ocd.config.modification_request'
'''Topic of the events emitted to request a configuration request'''
CONFIG_MOD_REPLY = 'ocd.config.modification_replay'
'''After the modification has been done send a replay with all the necessary
information'''
[docs]def config_file_argument(parser):
'''Add the parser argument to provide and parse the configuration files.
The configuration object assigned to the ``config`` attribute of the parsed
namespace
Parameters
----------
parser : :class:`argparse.ArgumentParser`
parser to which add the argument
Returns
-------
parser : :class:`argparse.ArgumentParser`
modified parser
'''
parser.add_argument('--config', nargs='?', const='ocd.cfg',
help='''Override the builtin configurations with the
given configuration file. If the option is used without
a value, "%(const)s" is used. Non existing files are
skipped and a warning is printed to the screen.
''')
return parser
[docs]def load_config(config_file=None, args=None, raise_serious=False,
raise_warning=False):
"""Load the configuration file. The default file is loaded and then, if
provided, overwritten by the ``config_file``.
**After** the configuration files are loaded and if ``args`` is given, use
the latter and :func:`~pyhetdex.tools.configuration.overrides_conf` to
override the configuration entries.
Parameters
----------
config_file : string, optional
name of the configuration file to use to override the configurations
args : :class:`argparse.Namespace`, optional
structure, typically the Namespace create by :mod:`argparse`.
raise_serious, raise_warning : bool, optional
options passed to :func:`check_config`
Returns
-------
config : :class:`pyhetdex.tools.configuration.ConfigParser`
"""
# create the configuration file allowing options without value to be able
# to have ``None`` from the parsed file
config = configparser.ConfigParser(defaults=os.environ,
allow_no_value=True)
# load the default configuration file
config_stream = pkg_resources.resource_string(__name__, MASTER_CONFIG_FILE)
config_stream = StringIO(bytes_to_string(config_stream))
config.read_file(config_stream)
if config_file:
# load the given config file
loaded_names = config.read(config_file)
if not loaded_names:
msg = ("File '{}' does not exist. The default configuration will"
" not be overwritten".format(config_file))
warnings.warn(msg)
if args:
config = configparser.override_conf(config, args)
# move the wait_last_write to the wait_last_readout
config = move_config(config, ['wait_last_write', ], 'run_shot', 'run_shot',
'2018/06/05', to_options=['wait_last_readout', ])
config = check_config(config, raise_serious=raise_serious,
raise_warning=raise_warning)
return config
[docs]def move_config(conf, from_options, from_section, to_section, end_move_date,
to_options=None):
'''Move the configuration options from section ``from_section`` to
``to_section`` warning about every option moved. If an option is not found
in ``from_section``, it is skipped. If the ``from_section`` is not found,
skip the move.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration to modify
from_options : list of strings
configuration entries to move
from_section, to_section : strings
configuration section from and to which the options must be moved.
end_move_date: string
earliest date at which the automatic moving of the configuration is
removed
to_options : list of strings, optional
if ``None`` the option names are the same as ``from_options``;
otherwise it must be a list with the same length of ``from_options``
Returns
-------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
modified configuration
'''
if to_options is None:
to_options = from_options[:]
else:
if len(from_options) != len(to_options):
raise ValueError('"to_options" must be either "None" or have the'
' same number of elements as "from_options". It'
' has {} elements instead of'
' {}'.format(len(to_options), len(from_options)))
try:
from_sec = conf[from_section]
except KeyError:
return conf
to_sec = conf[to_section]
for from_opt, to_opt in zip(from_options, to_options):
try:
to_sec[to_opt] = from_sec.pop(from_opt)
except KeyError:
pass # ignore non existing options
else:
warnings.warn('Option "{fo}" in section [{fs}] has been'
' automatically moved to option "{to}"'
' in section [{ts}]. This functionality will be'
' removed after the {end}. Please update your'
' copy of the configuration to reflect the official'
' OCD one.'.format(fo=from_opt, to=to_opt,
fs=from_section, ts=to_section,
end=end_move_date))
return conf
[docs]def check_config(conf, raise_serious=False, raise_warning=False):
'''Runs check on the configuration and notify the user of possible
issues. There are two times of warnings:
* :exc:`ocd.errors.OCDSeriousWarning`: used to notify serious warnings;
those warnings are, by default, turned into exceptions;
* :exc:`ocd.errors.OCDWarning`: used to notify modifications to the
configuration object;
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration to modify
raise_serious : bool, optional
whether to raise :exc:`~ocd.errors.OCDSeriousWarning`
raise_warning : bool, optional
whether to raise :exc:`~ocd.errors.OCDWarning`
Returns
-------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration to modify
Raises
------
OCDSeriousWarning
if ``raise_serious`` is ``True`` and there are serious warnings
OCDWarning
if ``raise_warning`` is ``True`` and there are warnings
'''
serious_warnings = []
standard_warnings = []
# check the serious stuff
# check that event_urls and ocd_db_replay from [urls] are not given at the
# same time
if conf['urls']['event_urls'] and conf['urls']['ocd_db_replay']:
msg = ('* Both the "event_urls" and "ocd_db_replay" urls in the'
' "[urls]" section are given. This might give rise to'
' inconsistencies as ``ocd run`` can listen to two sources'
' of events')
serious_warnings.append(msg)
# check the other stuff
# if [autoschedule]/skip_shot_submission is true and
# [database]/mysql_update_obsnum is true, set the latter to false and
# notify the user
skip_shot = conf['autoschedule'].getboolean('skip_shot_submission',
fallback=False)
update_mysql = conf['database'].getboolean('mysql_update_obsnum',
fallback=True)
if skip_shot and update_mysql:
msg = ('* Both ``skip_shot_submission`` option of the'
' ``[autoschedule]``'
' section and ``mysql_update_obsnum`` of the ``[database]``'
' section is set to ``true/yes/on``. Since we do not want to'
' modify the observation number table if OCD does not run'
' shots, the latter option has be set to ``false``.')
standard_warnings.append(msg)
conf['database']['mysql_update_obsnum'] = 'false'
all_warnings = [[standard_warnings, errors.OCDWarning, raise_warning],
[serious_warnings, errors.OCDSeriousWarning,
raise_serious]]
for warn_list, Category, should_raise in all_warnings:
if warn_list:
warn = Category('\n'.join(warn_list))
if should_raise:
raise warn
else:
warnings.warn(warn)
return conf
[docs]def get_value(conf, section, option, cast_to=None):
'''Get the value of the configuration option casting it to ``cast_to``.
Fore empty options (without a ``=``) returns a ``None``
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration object
section, option : strings
section and option values
cast_to : class, optional
class to which cast the value. Allowed values: ``None``, ``int``,
``float``, ``bool``
Raises
------
ocd.errors.OCDValueError
if ``cast_to`` is not one of the allowed cast classes
'''
if cast_to is None:
getter = conf.get
elif cast_to is int:
getter = conf.getint
elif cast_to is float:
getter = conf.getfloat
elif cast_to is bool:
getter = conf.getboolean
else:
raise errors.OCDValueError('The allowed values of ``cast_to`` are:'
' None, int, float, bool')
try:
return getter(section, option)
except (TypeError, AttributeError, ValueError):
return None
[docs]def save_config(conf):
'''Save the configuration file in a temporary directory.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration to save
Returns
-------
out_file : string
name of the file
'''
out_file = utils.get_out_file(':tmpdir:', 'ocd_{}.cfg', '', 5)
with open(out_file, 'w') as f:
conf.write(f)
return out_file
[docs]class ConfigUpdater(object):
'''Update the configuration in all the ``objects``.
The original configuration object and the list of objects to update are
passed when creating one instance.
When an event with topic :attr:`CONFIG_MOD_REQUEST` is received, the
configuration object is updated and the ``update_config(config)`` method of
all the ``objects`` is called. If an object doesn't have such a method it
is skipped.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration to modify
objects :
objects that (might) want to update the configuration
Attributes
----------
conf, objects :
same as input
topics : list
list of topics to which the :class:`ConfigUpdater` react
'''
def __init__(self, conf, *objects):
self.conf = conf
self.objects = objects
self.topics = [CONFIG_MOD_REQUEST, ]
[docs] def handle_event(self, tcs_topic, tcs_event):
'''When an event with the given topic arrives, decide whether it's time
to run a shot.
Parameters
----------
tcs_topic : string
topic of the event
tcs_event : dict
event to handle
Returns
-------
bool
whether the event triggers some computation
'''
if tcs_topic not in self.topics:
return False
else:
reply = dict(success=False, update_msg='', error_msg='',
warn_msg='')
try:
self._update_config(tcs_event)
reply['success'] = True
except KeyError as e:
reply['error_msg'] = ('The incoming event is missing the'
' mandatory key {}'.format(e.args[0]))
except ConfError as e:
reply['error_msg'] = ('Setting the configuration failed'
' because {}'.format(e.message))
except TypeError as e:
reply['error_msg'] = ('The value of the option must be a'
' string not:'
'{}'.format(tcs_event['value']))
else:
messages = []
try:
for obj in self.objects:
try:
msg = obj.update_config(self.conf)
except AttributeError:
msg = '{} was not updated'.format(obj)
messages.append(msg)
except Exception as e:
error_msg = ('While updating the configuration the'
' following exception happened:'
'\n{}'.format(traceback.format_exc()))
reply['error_msg'] = error_msg
reply['success'] = False
reply['update_msg'] = '\n'.join(messages)
# send back the reply
server = utils.get_zmq_server('ocd_main_loop')
server.send_tcs_event(CONFIG_MOD_REPLY, reply)
return True
[docs] def _update_config(self, tcs_event):
'''Update the configuration file using the tcs_event.
Get the section, option and value to update from the event and then
update the configuration.
Parameters
----------
tcs_event : dict
event to handle
'''
sec = tcs_event['section']
opt = tcs_event['option']
val = tcs_event['value']
self.conf.set(sec, opt, val)
# command line subcommands
[docs]def config_subcommand(subparsers):
'''Add a subcommand "config" to handle the configuration file.
Parameters
----------
subparsers : argparse subparsers object
subparser to use to generate new parsers
Returns
-------
parser : :class:`argparse.ArgumentParser`
modified parser
'''
parser = subparsers.add_parser('config', help='Manage configuration files',
description='''OCD subcommand to deal with
configuration files''')
subsubparser = parser.add_subparsers(title='''Configuration
subcommands''', description="""Type
'%(prog)s cmd -h' for detailed
information about the subcommands""",
dest='conf_subcommand')
subsubparser.required = True
copy_p = subsubparser.add_parser('copy', help='''Copy the configuration
files''', description='''Copy the
configuration files in the desired
directory''',
parents=[utils.common_parser_arguments(), ],
formatter_class=ap.ArgumentDefaultsHelpFormatter)
copy_p.set_defaults(func=_copy_conf)
copy_p.add_argument('-t', '--to-dir', default='.', help='''Copy the
configuration files to the given directory. By default
use the current directory''')
copy_p.add_argument('-f', '--force', action='store_true',
help="""Force overwriting existing configuration files.
If this option is used, all local modifications will be
lost""")
copy_p.add_argument('-b', '--backup', action='store_true',
help='''Backup existing files before copying the new
ones. If this option is used, the `-f/--force` is
ignored''')
send = subsubparser.add_parser('send', help='''Send a configuration
option''', description='''Send a
configuration option to the OCD main
loop''',
parents=[utils.common_parser_arguments(), ],
formatter_class=ap.ArgumentDefaultsHelpFormatter)
send.set_defaults(func=_send_conf)
send = utils.override_epilog_msg(send)
send.add_argument('section', help='Configuration section')
send.add_argument('option', help='Configuration option to override')
send.add_argument('value', help='New configuration value')
send = config_file_argument(send)
send.add_argument('-n', '--max-attempts', type=int, help='''Maximum
number of times "%(prog)s" attempts to enable/disable
HETDEX shot execution''', default=5)
send.add_argument('-I', '--interval', type=float, help='''Wait "%(dest)s"
seconds before a new attempt.''', default=1)
title = 'Override options in the [urls] section'
overrides_urls = send.add_argument_group(title=title)
overrides_urls.add_argument('-o', '--ocd-config',
dest='setting__urls__ocd_config',
metavar='OCD_CONFIG',
help='''Urls/paths used to send the signal to
update the configuration entries.''',
nargs='+')
overrides_urls.add_argument('-i', '--ocd-main-loop',
dest='setting__urls__ocd_main_loop',
metavar='OCD_MAIN_LOOP',
help='''Url/path from where the confirmation of
the update arrives.''', nargs='+')
title = 'Override options in the [config] section'
overrides_config = send.add_argument_group(title=title)
overrides_config.add_argument('-N', '--n-ocd-config-hetdex',
dest='setting__config__n_ocd_config',
metavar='N_OCD_CONFIG', help='''Which of the
urls for the ``ocd_config`` option of the
``[urls]`` section use to setup a ZMQ
server''')
return subparsers
[docs]def _copy_conf(args):
'''Copy the configuration files into the target directory
Parameters
----------
args : :class:`~argparse.Namespace`
parsed command line arguments
'''
ocd_cr = ioh.CopyResource(__name__, backup=args.backup, force=args.force,
verbose=False)
ocd_cr([MASTER_CONFIG_FILE, LOGGER_CONF], args.to_dir,
reldir=utils.STATIC_DIR)
ocd_cr.report()
[docs]def _send_conf(args):
'''Copy the configuration files into the target directory
Parameters
----------
args : :class:`~argparse.Namespace`
parsed command line arguments
'''
# load the configuration file
config = load_config(config_file=args.config, args=args)
# Setup the servers
utils.init_zmq_servers(args.subcommand, config)
print('Servers initialized')
send_listen = utils.SendAndListen.from_names('ocd_config',
'ocd_main_loop',
topics=CONFIG_MOD_REPLY,
n_attempts=args.max_attempts,
interval=args.interval)
print('Starting to communicate the new configuration to OCD')
event = {'section': args.section, 'option': args.option,
'value': args.value}
reply = send_listen.communicate(CONFIG_MOD_REQUEST, event)
returned_topic, returned_event, timeout_hit = reply
if timeout_hit:
print('WARNING: OCD has not replied back confirming that it acted'
' on my request. Are you sure that OCD is up and running?')
else:
success = returned_event['success']
if success:
msg = ('SUCCESS: OCD confirms that the new configuration option'
' has been propagated.')
else:
msg = ('ERROR: Setting the new configuration option failed')
separator = '************************'
print(msg)
print(separator)
print(returned_event['update_msg'] or 'No info messages')
print(separator)
print(returned_event['warn_msg'] or 'No warning messages')
print(separator)
print(returned_event['error_msg'] or 'No error messages')