Source code for ocd.orchestrator

# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''This module orchestrates OCD.

This are initialised:

    * storage classes
    * state machines
    * the decision maker

Then it starts listening for TSC events and send them to the machines and
finally to the decision maker.
'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

from pprint import pformat
import traceback

from . import config
from . import auto_schedule
from . import heart_beat
from . import storage
from . import states
from . import tcs_proxy


[docs]class Orchestrator(object): '''This class is the pulsing heart of OCD. Parameters ---------- event_listener : :class:`tcs_lib.tcs_event.TCSEvent` instance the :meth:`event_listener.next` listen for events return them one at a time as a two tuple ``(header, event)`` conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration object ''' def __init__(self, event_listener, conf): self._event_listener = event_listener self._conf = conf self._do_log_events = conf['general'].getboolean('log_events', fallback=False) # list of topics tracked by the various pieces and bits of OCD tcs_topics = set() # initialize the OCD heartbeat handler self._heart_beat = (heart_beat.HeartBeatHandler .from_name('ocd_main_loop')) tcs_topics.update(self._heart_beat.topics) # initialize the Vaults tcs_topics.update(self._init_vaults()) # initialize the MetrologyState tcs_topics.update(self._init_states()) # now we have all that is necessary to actually run a shot self._shot_runner = auto_schedule.ShotRunner(conf, self._meta_state, 'satisfied', self._metrology_vault, self._azimuth_vault) tcs_topics.update(self._shot_runner.topics) # initialise the Configuration updater self._config_updater = config.ConfigUpdater(conf, self, self._metrology_vault, self._azimuth_vault, self._metrology_state, self._shot_runner) tcs_topics.update(self._config_updater.topics) # subscribe the topics if conf['topics'].getboolean('auto_subscribe'): self._event_listener.subscribe(tcs_topics)
[docs] def _init_vaults(self): '''Initialize the vaults and save them into :attr:`_metrology_vault` and :attr:`_azimuth_vault`. Returns ------- list topics used by the vaults ''' self._metrology_vault = storage.MetrologyVault(self._conf) self._azimuth_vault = storage.AzimuthVault(self._conf) return self._metrology_vault.topics + self._azimuth_vault.topics
[docs] def _init_states(self): '''Initialize the states and save them into :attr:`_metrology_state`, :attr:`_run_shot_state`, :attr:`_hetdex_allowed_state` and :attr:`_meta_state`. Returns ------- list topics used by the state ''' self._metrology_state = states.MetrologyState(self._conf, self._metrology_vault) # initialize the RunShotState self._run_shot_state = states.RunShotState() # initialize the HetdexAllowedState self._hetdex_allowed_state = states.HetdexAllowedState() # initialize the MetaState machine _machines_states = [(self._metrology_state, 'good'), (self._run_shot_state, 'idle'), (self._hetdex_allowed_state, 'allowed'), ] self._meta_state = states.MetaState(_machines_states) return (self._metrology_state.topics + self._run_shot_state.topics + self._hetdex_allowed_state.topics + self._meta_state.topics)
[docs] def run(self): '''Run OCD''' for header, event in self._event_listener: self._log_event(header, event) try: self._handle_event(header, event) except Exception: err_msg = ('While handling event "{0}"' ' the following error occurred.\n' 'If you think that it is a bug in OCD or an error' ' that OCD should handle implicitly, please report' ' this with **the full traceback** to the' ' developers.\n{2}' ) tcs_proxy.tcs_log.log_error(err_msg, header, event, traceback.format_exc())
[docs] def _handle_event(self, header, event): '''Handle the input event. Parameters ---------- header : string tcs event header event : dict tcs event payload ''' # if it is an OCD heartbeat topic, there is not need to keep processing # it handled = self._heart_beat.handle_event(header, event) if handled: return # if it is a config updater topic, there is not need to keep processing # it handled = self._config_updater.handle_event(header, event) if handled: return # store the information self._metrology_vault.store_event(header, event) self._azimuth_vault.store_event(header, event) # update the state machines self._metrology_state.handle_event(header, event) self._run_shot_state.handle_event(header, event) self._hetdex_allowed_state.handle_event(header, event) # and then update the meta state self._meta_state.handle_event(header, event) # now decide if it's time to run the shot self._shot_runner.handle_event(header, event)
[docs] def _log_event(self, header, event): '''If ``log_events`` in the ``[general]`` section is ``true``, send a DEBUG log message, otherwise do nothing. Parameters ---------- header : string tcs event header event : dict tcs event payload ''' if self._do_log_events: log_msg = 'Handling event "{0}" with payload "{1}"' pevent = pformat(event).replace('\n', '') tcs_proxy.tcs_log.log_debug(log_msg, header, pevent)
[docs] def update_config(self, conf): '''Update the local copy of the configuration file and :attr:`_do_log_events`. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` new configuration object Returns ------- string message to report back ''' self._conf = conf self._do_log_events = conf['general'].getboolean('log_events', fallback=False) return ('Orchestrator: the configuration object and the local variable' ' storing "[general]log_events" option have been updated')