Source code for ocd.states

# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''Module containing the states necessary to run OCD

The classes here use `the transitions package
<https://github.com/pytransitions/transitions>`_ for the states definitions and
transitions.

TCS events
**********

.. _transition_event:

Transition event
----------------

When any of the states change a TCS-like event with topic
:data:`STATE_CHANGE_TOPIC` and the following payload, based on
:class:`tcs_lib.tcs_event.TCSDict`.

+------------------------------+----------------------------------------------+
| Topic                        | Event                                        |
+==============================+==============================================+
| ocd.states.change            | machine_name (string): name of the state     |
|                              | machine that just had the transition         |
+                              +----------------------------------------------+
|                              | transition (string): name of the transition  |
+                              +----------------------------------------------+
|                              | old_state (string): name of the old state    |
+                              +----------------------------------------------+
|                              | new_state (string): name of the new state    |
+                              +----------------------------------------------+
|                              | msg (string): extra message attached to the  |
|                              | transition                                   |
+                              +----------------------------------------------+
|                              | traceback (string): full error traceback     |
|                              | if some problem happened                     |
+                              +----------------------------------------------+
|                              | __data_time (string): time at which the      |
|                              | event payload is created                     |
+                              +----------------------------------------------+
|                              | __wire_time (string): time at which the      |
|                              | event is sent                                |
+                              +----------------------------------------------+
|                              | __data (bool): whether an other event with   |
|                              | data is coming (always false)                |
+                              +----------------------------------------------+
|                              | __system, __source, __key (string):          |
|                              | components of the topic                      |
+------------------------------+----------------------------------------------+

.. _state_event:

State event
-----------

When a :data:`TCS_HEARTBEAT_TOPIC` is received, emit a TCS-like event with
topic :data:`STATE_NAME_TOPIC` and the following payload, based on
:class:`tcs_lib.tcs_event.TCSDict`.

+------------------------------+----------------------------------------------+
| Topic                        | Event                                        |
+==============================+==============================================+
| ocd.states.state             | machine_name (string): name of the state     |
|                              | machine that just had the transition         |
+                              +----------------------------------------------+
|                              | state (string): name of the current state    |
+                              +----------------------------------------------+
|                              | __data_time (string): time at which the      |
|                              | event payload is created                     |
+                              +----------------------------------------------+
|                              | __wire_time (string): time at which the      |
|                              | event is sent                                |
+                              +----------------------------------------------+
|                              | __data (bool): whether an other event with   |
|                              | data is coming (always false)                |
+                              +----------------------------------------------+
|                              | __system, __source, __key (string):          |
|                              | components of the topic                      |
+------------------------------+----------------------------------------------+
'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import abc
from collections import namedtuple

import six
import transitions
from tcs_lib import tcs_event

from . import errors
from . import run_shot
from . import shots_db
from . import tcs_proxy
from . import utils

TCS_HEARTBEAT_TOPIC = 'tcs.receiver.heartbeat'
'''Topic of the TCS heartbeat events. The machines in this module emit their
state when such an event arrive'''

STATE_CHANGE_TOPIC = 'ocd.states.change'
'''Topic used when emitting TCS-like events on state change'''
STATE_NAME_TOPIC = 'ocd.states.state'
'''Topic used when emitting TCS-like events when a :data:`TCS_HEARTBEAT_TOPIC`
event comes in'''


[docs]class CallbackMixin(object): '''Mixin class that provides implementation for common callbacks that can be used by state machine'''
[docs] def log_state_change(self, state_event): '''Log state transitions. This should be used after a transition takes place, for example as a ``finalize_event`` callback. The function looks for the following entries in ``event.kwargs``: * ``extra_msg``: an extra message to appen to the state change message; if present comes after the ``traceback``; * ``traceback``: traceback to add to the log message; the ``abort`` transition will typically use this; * ``forced``: if the transition is forced; typically the ``to_{state}`` transitions will use it. Parameters ---------- state_event : :class:`transition.EventData` happened event ''' msg, fmt_args = self._msg_fmt(state_event) traceback = state_event.kwargs.get('traceback', None) if traceback is not None: msg += ' with error:\n{tb}' fmt_args['tb'] = traceback extra_msg = state_event.kwargs.get('extra_msg', None) if extra_msg is not None: msg += ('.\nExtra message provided with the state' ' transition:\n{e_msg}') fmt_args['e_msg'] = extra_msg tcs_proxy.tcs_log.log_debug(msg.format(**fmt_args))
[docs] def emit_state_change(self, state_event): '''Emit a TCS event with topic :attr:`STATE_CHANGE_TOPIC` to document state transitions. This should be used after a transition takes place, for example as a ``finalize_event`` callback. The function looks for the following entries in ``event.kwargs``: * ``extra_msg``: an extra message to appen to the state change message; if present comes after the ``traceback``; * ``traceback``: traceback to add to the log message; the ``abort`` transition will typically use this; * ``forced``: if the transition is forced; typically the ``to_{state}`` transitions will use it. Parameters ---------- state_event : :class:`transition.EventData` happened event ''' # create the event payload event = dict(machine_name=self.__class__.__name__, transition=state_event.event.name, old_state=state_event.transition.source, new_state=state_event.transition.dest, msg=state_event.kwargs.get('extra_msg', ''), traceback=state_event.kwargs.get('traceback', '')) # and use to instantiate the corresponding TCSDict event = tcs_event.TCSDict(event) event.topic = STATE_CHANGE_TOPIC # finally send the event zmq_server = utils.get_zmq_server('ocd_main_loop') zmq_server.send_tcs_event(STATE_CHANGE_TOPIC, event)
[docs] def _msg_fmt(self, state_event): '''Create a message from the state_event containing the transition name, the source and destination state name and whether the transition is forced. The function looks for the following entries in ``event.kwargs``: * ``forced``: if the transition is forced; typically the ``to_{state}`` transitions will use it. Parameters ---------- state_event : :class:`transition.EventData` happened event Returns ------- msg : string string to containing the message to send fmt_args : dict formatting keys for ``msg`` (only positional arguments) ''' msg = ('{name}: {force}transition "{trans}" between state "{source}"' ' and "{dest}"') fmt_args = dict(name=self.__class__.__name__, force='', trans=state_event.event.name, source=state_event.transition.source, dest=state_event.transition.dest) if state_event.kwargs.get('forced', False): fmt_args['force'] = 'forced ' return msg, fmt_args
[docs]class StateMixin(object): '''Mixin class that provides implementation for common functionalities that can be used by state machines'''
[docs] def emit_state_name(self): '''Emit a TCS event with topic :attr:`STATE_NAME_TOPIC` to document state the current state. This method assumes that the class has a :attr:`state` attribute ''' # instantiate the TCSDict with the necessary keys event = tcs_event.TCSDict(machine_name=self.__class__.__name__, state=self.state) event.topic = STATE_NAME_TOPIC # finally send the event zmq_server = utils.get_zmq_server('ocd_main_loop') zmq_server.send_tcs_event(STATE_NAME_TOPIC, event)
[docs]@six.add_metaclass(abc.ABCMeta) class BaseState(object): '''This class defines the mandatory API for state machines used by OCD. The properties and methods defined here must be reimplemented in derived classes Attributes ---------- topics trigger_topics machine ''' @property def topics(self): '''list of topics handled by the state machine. Defaults to :attr:`trigger_topics` plus :data:`TCS_HEARTBEAT_TOPIC`''' return self.trigger_topics + [TCS_HEARTBEAT_TOPIC, ] @utils.abstractproperty def trigger_topics(self): # pragma: no cover '''list of topics that can trigger a machine state change''' pass @utils.abstractproperty def machine(self): # pragma: no cover ''':class:`transitions.Machine` instance representing the states''' pass
[docs] @abc.abstractmethod def handle_event(self, tcs_topic, tcs_event): # pragma: no cover '''Handle a TCS event and decide whether the status need to be changed or not. Parameters ---------- tcs_topic : string topic of the event tcs_event : dict event to handle Returns ------- bool ``True`` if the event has been handled, ``False`` otherwise ''' return False
[docs]class RunShotState(BaseState, CallbackMixin, StateMixin): '''State machine that keeps track of shots run by OCD. The method :meth:`handle_event` accepts events from the TCS stream and, if relevant, use the event payloads to modify the internal state. See the documentation for a graph of the states and explicit transitions. .. warning:: all the transitions called ``to_{state}``, with ``{state}`` one of the state names, are created by the :class:`transitions.Machine` and allow to force the transition to the corresponding state. Attributes ---------- exp_f : string format string to build the exposure name machine : :class:`transitions.Machine` underlying class representing the states topic_handler_map : dict mapping between the topics and the functions that handle the corresponding events topics trigger_topics ''' machine = None def __init__(self): self.exp_f = 'exp{0:02d}' self.machine = self._make_machine() # create the mapping between topics and functions self.topic_handler_map = {run_shot.RUN_TOPIC: self._handle_run_event, run_shot.SETUP_TOPIC: self._handle_setup_event, run_shot.EXP_TOPIC: self._handle_exp_event} self._log_header = '{}:'.format(self.__class__.__name__) @property def trigger_topics(self): '''Topics that trigger a machine state transition. See :mod:`ocd.run_shot` for a description of topics.''' return list(self.topic_handler_map.keys())
[docs] def _make_machine(self): '''create the machine with all the relevant states and transitions Returns ------- machine : :class:`transitions.Machine` state machine ''' states = ['idle', 'started', 'setup', 'setup_done'] _transitions = [['start', 'idle', 'started'], ['do_setup', 'started', 'setup'], ['finish_setup', 'setup', 'setup_done'], ['finish', 'exp03_done', 'idle'] ] # add the exposures and their transitions n_exps = 3 for e in range(1, n_exps+1): exp = self.exp_f.format(e) states += [exp, exp + '_done'] # add the transition to start an exposure if e > 1: source = self.exp_f.format(e-1) + '_done' else: source = 'setup_done' _transitions.append(['do_' + exp, source, exp]) # add the transition to end an exposure dest = exp + '_done' _transitions.append(['finish_' + exp, exp, dest]) machine = transitions.Machine(model=self, states=states, transitions=_transitions, initial=states[0], finalize_event=['log_state_change', 'emit_state_change'], send_event=True) # title=self.__class__.__name__) # NOTE: use it only with GraphMachine # NOTE: in order to properly report the error, the following transition # should be called with the associated traceback: # self.abort(traceback='error') machine.add_transition('abort', '*', 'idle') return machine
[docs] def on_enter_idle(self, state_event): '''Callback that is triggered when entering the ``idle`` state. The function looks for the following entry in ``event.kwargs``: * ``tcs_event``: if present and dictionary like, try to extract the ``shotid`` key; if successful it updates the database calling :func:`ocd.shots_db.update_shot`. Failures are logged as warning. If ``tcs_event`` is not present nothing happens. Parameters ---------- state_event : :class:`transition.EventData` happened event ''' try: key = 'tcs_event' tcs_event = state_event.kwargs[key] except KeyError: msg = ('{0} To properly handle the transition to "idle" state, the' ' "{1}" keyword argument must be passed to the transition' ' "{2}"') # this is a programming error tcs_proxy.tcs_log.log_fatal(msg, self._log_header, key, state_event.event.name) return try: success = not tcs_event['error'] shots_db.update_shot(tcs_event, success=success) except (TypeError, KeyError, errors.ShotDoesNotExist, errors.ShotIntegrityError) as e: msg = ('{0} The shot database could not be updated because' ' the following error happened:\n{1}: {2}') tcs_proxy.tcs_log.log_warn(msg, self._log_header, type(e), e)
[docs] def handle_event(self, tcs_topic, tcs_event): '''Handle a TCS event and: * update the state, if ``tcs_topic`` is :data:`ocd.run_shot.ALL_TOPICS`; see :mod:`ocd.run_shot` for a description of topics and events; * emit the current the state with :meth:`StateMixin.emit_state_name` (if ``tcs_topic`` is :data:`TCS_HEARTBEAT_TOPIC`). Parameters ---------- tcs_topic : string topic of the event tcs_event : dict event to handle Returns ------- bool ``True`` if ``tcs_topic`` in :attr:`topics`, ``False`` otherwise ''' if tcs_topic in self.topic_handler_map: handler = self.topic_handler_map[tcs_topic] handler(tcs_event) return True elif tcs_topic in self.topics: self.emit_state_name() return True else: # if the topic is not in the list of interesting one, ignore it return False
[docs] def _handle_run_event(self, tcs_event): '''Handle events associated with :attr:`~ocd.run_shot.RUN_TOPIC` with the following logic: * if the event is marked as starting, try to execute the :meth:`start` transition; if it fails force :meth:`to_started`; * if the event is marked as finished, try to execute the :meth:`finish` transition; if it fails force :meth:`to_idle`; * if the event is marked as finished and an error happened, execute the :meth:`abort` with the traceback contained in the event Parameters ---------- tcs_event : dict event to handle ''' exec_status = tcs_event['exec_status'] if exec_status == run_shot.EXEC_STATUS.START: self._do_transition('start', 'to_started') elif exec_status == run_shot.EXEC_STATUS.FINISH: if tcs_event['error']: # if there is an error, abort self.abort(traceback=tcs_event['traceback'], tcs_event=tcs_event) else: self._do_transition('finish', 'to_idle', tcs_event=tcs_event) else: msg = 'Status {0} of event {1} is unknown' tcs_proxy.tcs_log.log_debug(msg, exec_status, run_shot.RUN_TOPIC)
[docs] def _handle_setup_event(self, tcs_event): '''Handle events associated with :attr:`~ocd.run_shot.SETUP_TOPIC` with the following logic: * if the event is marked as starting, try to execute the :meth:`do_setup` transition; if it fails force :meth:`to_setup`; * if the event is marked as finished, try to execute the :meth:`finish_setup` transition; if it fails force :meth:`to_setup_done`; Parameters ---------- tcs_event : dict event to handle ''' exec_status = tcs_event['exec_status'] if exec_status == run_shot.EXEC_STATUS.START: self._do_transition('do_setup', 'to_setup') elif exec_status == run_shot.EXEC_STATUS.FINISH: self._do_transition('finish_setup', 'to_setup_done') else: msg = 'Status {0} of event {1} is unknown' tcs_proxy.tcs_log.log_debug(msg, exec_status, run_shot.SETUP_TOPIC)
[docs] def _handle_exp_event(self, tcs_event): '''Handle events associated with :attr:`~ocd.run_shot.EXP_TOPIC` with the following logic: * if the event is marked as starting, try to execute the :meth:`do_exp{0:02d}` transition; if it fails force :meth:`to_exp{0:02d}`; * if the event is marked as finished, try to execute the :meth:`finish_exp{0:02d}` transition; if it fails force :meth:`to_exp{0:02d}_done`. The format field ``{0:02d}`` is a zero padded two digit integer, whose value is taken from the ``exposure`` key of the input ``tcs_event`` Parameters ---------- tcs_event : dict event to handle ''' exec_status = tcs_event['exec_status'] exp_str = self.exp_f.format(tcs_event['exposure']) try: if exec_status == run_shot.EXEC_STATUS.START: self._do_transition('do_' + exp_str, 'to_' + exp_str) elif exec_status == run_shot.EXEC_STATUS.FINISH: self._do_transition('finish_' + exp_str, 'to_' + exp_str + '_done') else: msg = 'Status {0} of event {1} for exposure {2} is unknown' tcs_proxy.tcs_log.log_debug(msg, exec_status, run_shot.SETUP_TOPIC, exp_str) except AttributeError: msg = '{0} does not have states representing exposure {1}' tcs_proxy.tcs_log.log_debug(msg, self.__class__.__name__, exp_str)
[docs] def _do_transition(self, trigger_name, force_name, *args, **kwargs): '''Try to execute the ``triggered`` transition: if it fail, run the forced one. Parameters ---------- triggered, forced : string name of the transitions to force args, kwargs : arguments positional and keyword arguments to pass to the triggered transitions ''' try: self.trigger(trigger_name, *args, **kwargs) except transitions.core.MachineError: kwargs['forced'] = True self.trigger(force_name, *args, **kwargs)
[docs]class MetrologyState(BaseState, CallbackMixin, StateMixin): '''State machine that keep track of the metrology. Any time one of the events that can update containers in the :class:`ocd.storage.MetrologyVault`, a state change is evaluated. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration vault : :class:`ocd.storage.MetrologyVault` vault containing the values of interest Attributes ---------- machine : :class:`transitions.Machine` underlying class representing the states topics trigger_topics quantities : list names of the quantities to check ref_values : dict for each quantities, save the minimum and maximum value allowed ''' machine = None def __init__(self, conf, vault): self._conf = conf self._vault = vault self.machine = self._make_machine() # Get the reference values from the configuration file # quantities to check self.quantities = ['fwhm', 'skymag', 'transparency'] # get all the reference values self.ref_values = {} self._set_ref_values()
[docs] def _set_ref_values(self): '''set the reference value in :attr:`ref_values`''' section = 'containers' option = 'ref_{q}' for k in self.quantities: self.ref_values[k] = self._conf.get_list(section, option.format(q=k), cast_to=float)[:2]
@property def trigger_topics(self): '''Same as the topics used by the input ``vault``''' return self._vault.topics
[docs] def _make_machine(self): '''create the machine with all the relevant states and transitions Returns ------- machine : :class:`transitions.Machine` state machine ''' states = ['bad', 'good'] machine = transitions.Machine(model=self, states=states, initial=states[0], finalize_event=['log_state_change', 'emit_state_change'], send_event=True) machine.add_transition('improved', '*', 'good') machine.add_transition('deteriorated', '*', 'bad') return machine
[docs] def handle_event(self, tcs_topic, tcs_event): '''Handle a TCS event and: * update the state, if ``tcs_topic`` is the same of the topics handled by :class:`ocd.storage.MetrologyVault`. * emit the current the state with :meth:`StateMixin.emit_state_name` (if ``tcs_topic`` is :data:`TCS_HEARTBEAT_TOPIC`). Parameters ---------- tcs_topic : string topic of the event tcs_event : dict event to handle Returns ------- bool whether the topic triggers a state change ''' if tcs_topic in self.trigger_topics: probes_conditions, probe_msg = self.compare_probes() is_good = self.good_conditions(probes_conditions) if is_good: self.improved(extra_msg=probe_msg) else: self.deteriorated(extra_msg=probe_msg) return True elif tcs_topic in self.topics: self.emit_state_name() return True else: # if the topic is of no interest, skip it return False
[docs] def compare_probes(self): '''For each probe, check that the conditions are good enough to run HETDEX. Compare the :attr:`~ocd.storage.Container.median_masked` of the values stored in the :class:`~ocd.storage.MetrologyVault` the corresponding expected values from the ``[containers]`` section of the configuration file. Returns ------- is_good : list of bool True is the conditions are in spec for each guide probe probe_msg : string string with the guide probe values and their reference values ''' # check both probes and store if the conditions are good for each one # independently is_good = [] container_name = '{q}_g{p}' # log message msg = ('Masked median values for gp1 and gp2, compared with' ' the reference values.\n') for k in self.quantities: line = ('{k}: {{{k}_g1}}, {{{k}_g2}},' ' ref: [{{{k}_min}}, {{{k}_max}}]\n') msg += line.format(k=k) # dictionary with the keys for the log message msg_keys = {} for probe in [1, 2]: comparisons = [] for k in self.quantities: key = container_name.format(q=k, p=probe) container = self._vault[key] value = container.median_masked min_, max_ = self.ref_values[k] comparisons.append(min_ <= value <= max_) # save the values for the log message msg_keys[key] = value msg_keys['{k}_min'.format(k=k)] = min_ msg_keys['{k}_max'.format(k=k)] = max_ is_good.append(all(comparisons)) probe_msg = msg.format(**msg_keys) return is_good, probe_msg
[docs] def good_conditions(self, probes_conditions): '''From the list of boolean representing the probes conditions, extract a single value representing good/bad conditions. If the option ``both_gp_good`` of the ``[containers]`` section is ``yes``/``true``, then all the probes must be in spec, otherwise can be just one of them. Parameters ---------- probes_conditions : list of bool True is the conditions are in spec for each guide probe Returns ------- bool good conditions ''' if self._conf['containers'].getboolean('both_gp_good'): compare_func = all else: compare_func = any return compare_func(probes_conditions)
[docs] def update_config(self, conf): '''Update the local copy of the configuration file and :attr:`ref_values`. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` new configuration object Returns ------- string message to report back ''' self._conf = conf self._set_ref_values() return ('MetrologyState: the configuration object and the local' ' variable storing "[containers]ref_*" options have been' ' updated')
HETDEX_ALLOWED_TOPIC = 'ocd.states.hetdex_allowed' '''Topic associated with the events necessary to update the :class:`HetdexAllowedState` state machine''' HETDEX_CHANGED_TOPIC = 'ocd.states.hetdex_allowed_changed' '''Topic of the events emitted after a state is changed''' _HetdexToggle = namedtuple('ENUM', 'ENABLE, DISABLE') HETDEX_TOGGLE = _HetdexToggle(ENABLE=0, DISABLE=1) '''Enumerator to use to signal that HETDEX **is to be** enabled or disabled''' _HetdexToggleChanged = namedtuple('ENUM', 'ENABLED, DISABLED') HETDEX_CHANGED = _HetdexToggleChanged(ENABLED=10, DISABLED=11) '''Enumerator to use to signal that HETDEX **has been** enabled or disabled'''
[docs]class HetdexAllowedState(BaseState, CallbackMixin, StateMixin): '''This state tracks whether OCD is allowed to execute HETDEX shots even if all the other states would allow it. A state change is triggered by tcs events with topic :data:`HETDEX_ALLOWED_TOPIC`. The associated event needs to contain one entry: +------------------------------+------------------------------------------+ | Topic | Event | +==============================+==========================================+ | ocd.states.hetdex_allowed | action (int): | | | if it is :data:`HETDEX_TOGGLE.ENABLE` | | | triggers :meth:`allow`, if it is | | | :data:`HETDEX_TOGGLE.DISABLE` triggers | | | :meth:`forbid` | +------------------------------+------------------------------------------+ This state is triggered externally via the ``ocd allow_hetdex {start,stop}`` command. When a state change happens, a TCS-like event with topic :data:`HETDEX_CHANGED_TOPIC` is emitted. The associated event is: +---------------------------+-------------+-------------------------------+ | Topic | State | Event | +===========================+=============+===============================+ | ocd.states.hetdex_allowed | allowed | new_state (int): | | | | :data:`HETDEX_CHANGED.ENABLED`| +---------------------------+-------------+-------------------------------+ | ocd.states.hetdex_allowed | not_allowed |new_state (int): | | | |:data:`HETDEX_CHANGED.DISABLED`| +---------------------------+-------------+-------------------------------+ .. note:: Since the state is update in the main OCD loop, it make sense to use the ``ocd_main_loop`` ZeroMQ server here. In the case we need more flexibility it is possible to add a new server. Attributes ---------- machine : :class:`transitions.Machine` underlying class representing the states topics trigger_topics ''' machine = None def __init__(self): self.machine = self._make_machine() @property def trigger_topics(self): '''Topics that triggers a machine state change. I.e. :attr:`HETDEX_ALLOWED_TOPIC`''' return [HETDEX_ALLOWED_TOPIC, ]
[docs] def _make_machine(self): '''create the machine with all the relevant states and transitions Returns ------- machine : :class:`transitions.Machine` state machine ''' states = ['allowed', 'not_allowed'] machine = transitions.Machine(model=self, states=states, initial=states[1], finalize_event=['log_state_change', 'emit_state_change'], send_event=True) machine.add_transition('allow', '*', 'allowed', after='emit_tcs_event') machine.add_transition('forbid', '*', 'not_allowed', after='emit_tcs_event') return machine
[docs] def handle_event(self, tcs_topic, tcs_event): '''Handle a TCS event and: * update the state, if ``tcs_topic`` is :data:`HETDEX_ALLOWED_TOPIC`. * emit the current the state with :meth:`StateMixin.emit_state_name` (if ``tcs_topic`` is :data:`TCS_HEARTBEAT_TOPIC`). Parameters ---------- tcs_topic : string topic of the event tcs_event : dict event to handle Returns ------- bool whether the event has been handled ''' if tcs_topic in self.trigger_topics: try: action = tcs_event['action'] except KeyError: # if the action key does not exist, ignore it pass else: if action == HETDEX_TOGGLE.ENABLE: self.allow() elif action == HETDEX_TOGGLE.DISABLE: self.forbid() else: # no transition msg = 'Action {0} of event {1} is unknown' tcs_proxy.tcs_log.log_debug(msg, action, HETDEX_ALLOWED_TOPIC) return True elif tcs_topic in self.topics: self.emit_state_name() return True else: return False
[docs] def emit_tcs_event(self, state_event): '''Emit a TCS-like event using a ZeroMQ server. See :class:`HetdexAllowedState` for more info. Parameters ---------- state_event : :class:`transition.EventData` happened event ''' dest_state = state_event.transition.dest if dest_state == 'allowed': event = {'new_state': HETDEX_CHANGED.ENABLED} else: event = {'new_state': HETDEX_CHANGED.DISABLED} zmq_server = utils.get_zmq_server('ocd_main_loop') zmq_server.send_tcs_event(HETDEX_CHANGED_TOPIC, event)
[docs]class MetaState(BaseState, CallbackMixin, StateMixin): '''Collect other states and decide whether they all have the expected state. Parameters ---------- machines_states : list of 2-tuples (machine, string) each element is a state machine and the name of the reference state; if all the machines are in the corresponding state, :class:`MetaState` switches to the ``allowed`` state, otherwise it switches to ``not_allowed`` Attributes ---------- machine : :class:`transitions.Machine` underlying class representing the states topics trigger_topics Raises ------ ValueError if machines_states doesn't contain at least one element and if any element is not a 2-tuple ''' machine = None def __init__(self, machines_states): if not machines_states or any(len(sm) != 2 for sm in machines_states): raise ValueError('"machines_states" must have at least one element' ' and each element should be a list or tuple of' ' length two.') self.machine = self._make_machine() # get a list of unique topics self._trigger_topics = sum((m[0].trigger_topics for m in machines_states), []) self._trigger_topics = list(set(self._trigger_topics)) self._topics = list(set(self._trigger_topics + [TCS_HEARTBEAT_TOPIC, ])) self._machines_states = machines_states @property def trigger_topics(self): '''topics that trigger a machine state change; they are the union of the trigger_topics from the input machines.''' return self._trigger_topics
[docs] def _make_machine(self): '''create the machine with all the relevant states and transitions Returns ------- machine : :class:`transitions.Machine` state machine ''' states = ['satisfied', 'not_satisfied'] machine = transitions.Machine(model=self, states=states, initial=states[1], finalize_event=['log_state_change', 'emit_state_change'], send_event=True) machine.add_transition('satisfy', '*', 'satisfied') machine.add_transition('deviate', '*', 'not_satisfied') return machine
[docs] def handle_event(self, tcs_topic, tcs_event): '''If ``tcs_topic`` is in the list of :attr:`trigger_topics` that were handled by the input state machines, re-evaluate their states and if they all are in their reference state, :meth:`allow` the execution of a new shot, otherwise :meth:`forbid` it. If ``tcs_topic`` is not in :attr:`trigger_topics` but in :attr:`topics`, emit the current the state with :meth:`StateMixin.emit_state_name`. Parameters ---------- tcs_topic : string topic of the event tcs_event : dict event to handle, ignored Returns ------- bool whether the event has been handled ''' if tcs_topic in self.trigger_topics: is_reference = (machine.state == ref_state for (machine, ref_state) in self._machines_states) if all(is_reference): self.satisfy() else: self.deviate() return True elif tcs_topic in self.topics: self.emit_state_name() return True else: return False