# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''Module containing the states necessary to run OCD
The classes here use `the transitions package
<https://github.com/pytransitions/transitions>`_ for the states definitions and
transitions.
TCS events
**********
.. _transition_event:
Transition event
----------------
When any of the states change a TCS-like event with topic
:data:`STATE_CHANGE_TOPIC` and the following payload, based on
:class:`tcs_lib.tcs_event.TCSDict`.
+------------------------------+----------------------------------------------+
| Topic | Event |
+==============================+==============================================+
| ocd.states.change | machine_name (string): name of the state |
| | machine that just had the transition |
+ +----------------------------------------------+
| | transition (string): name of the transition |
+ +----------------------------------------------+
| | old_state (string): name of the old state |
+ +----------------------------------------------+
| | new_state (string): name of the new state |
+ +----------------------------------------------+
| | msg (string): extra message attached to the |
| | transition |
+ +----------------------------------------------+
| | traceback (string): full error traceback |
| | if some problem happened |
+ +----------------------------------------------+
| | __data_time (string): time at which the |
| | event payload is created |
+ +----------------------------------------------+
| | __wire_time (string): time at which the |
| | event is sent |
+ +----------------------------------------------+
| | __data (bool): whether an other event with |
| | data is coming (always false) |
+ +----------------------------------------------+
| | __system, __source, __key (string): |
| | components of the topic |
+------------------------------+----------------------------------------------+
.. _state_event:
State event
-----------
When a :data:`TCS_HEARTBEAT_TOPIC` is received, emit a TCS-like event with
topic :data:`STATE_NAME_TOPIC` and the following payload, based on
:class:`tcs_lib.tcs_event.TCSDict`.
+------------------------------+----------------------------------------------+
| Topic | Event |
+==============================+==============================================+
| ocd.states.state | machine_name (string): name of the state |
| | machine that just had the transition |
+ +----------------------------------------------+
| | state (string): name of the current state |
+ +----------------------------------------------+
| | __data_time (string): time at which the |
| | event payload is created |
+ +----------------------------------------------+
| | __wire_time (string): time at which the |
| | event is sent |
+ +----------------------------------------------+
| | __data (bool): whether an other event with |
| | data is coming (always false) |
+ +----------------------------------------------+
| | __system, __source, __key (string): |
| | components of the topic |
+------------------------------+----------------------------------------------+
'''
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import abc
from collections import namedtuple
import six
import transitions
from tcs_lib import tcs_event
from . import errors
from . import run_shot
from . import shots_db
from . import tcs_proxy
from . import utils
TCS_HEARTBEAT_TOPIC = 'tcs.receiver.heartbeat'
'''Topic of the TCS heartbeat events. The machines in this module emit their
state when such an event arrive'''
STATE_CHANGE_TOPIC = 'ocd.states.change'
'''Topic used when emitting TCS-like events on state change'''
STATE_NAME_TOPIC = 'ocd.states.state'
'''Topic used when emitting TCS-like events when a :data:`TCS_HEARTBEAT_TOPIC`
event comes in'''
[docs]class CallbackMixin(object):
'''Mixin class that provides implementation for common callbacks that can
be used by state machine'''
[docs] def log_state_change(self, state_event):
'''Log state transitions. This should be used after a transition takes
place, for example as a ``finalize_event`` callback.
The function looks for the following entries in ``event.kwargs``:
* ``extra_msg``: an extra message to appen to the state change message;
if present comes after the ``traceback``;
* ``traceback``: traceback to add to the log message; the ``abort``
transition will typically use this;
* ``forced``: if the transition is forced; typically the ``to_{state}``
transitions will use it.
Parameters
----------
state_event : :class:`transition.EventData`
happened event
'''
msg, fmt_args = self._msg_fmt(state_event)
traceback = state_event.kwargs.get('traceback', None)
if traceback is not None:
msg += ' with error:\n{tb}'
fmt_args['tb'] = traceback
extra_msg = state_event.kwargs.get('extra_msg', None)
if extra_msg is not None:
msg += ('.\nExtra message provided with the state'
' transition:\n{e_msg}')
fmt_args['e_msg'] = extra_msg
tcs_proxy.tcs_log.log_debug(msg.format(**fmt_args))
[docs] def emit_state_change(self, state_event):
'''Emit a TCS event with topic :attr:`STATE_CHANGE_TOPIC` to document
state transitions. This should be used after a transition takes place,
for example as a ``finalize_event`` callback.
The function looks for the following entries in ``event.kwargs``:
* ``extra_msg``: an extra message to appen to the state change message;
if present comes after the ``traceback``;
* ``traceback``: traceback to add to the log message; the ``abort``
transition will typically use this;
* ``forced``: if the transition is forced; typically the ``to_{state}``
transitions will use it.
Parameters
----------
state_event : :class:`transition.EventData`
happened event
'''
# create the event payload
event = dict(machine_name=self.__class__.__name__,
transition=state_event.event.name,
old_state=state_event.transition.source,
new_state=state_event.transition.dest,
msg=state_event.kwargs.get('extra_msg', ''),
traceback=state_event.kwargs.get('traceback', ''))
# and use to instantiate the corresponding TCSDict
event = tcs_event.TCSDict(event)
event.topic = STATE_CHANGE_TOPIC
# finally send the event
zmq_server = utils.get_zmq_server('ocd_main_loop')
zmq_server.send_tcs_event(STATE_CHANGE_TOPIC, event)
[docs] def _msg_fmt(self, state_event):
'''Create a message from the state_event containing the transition
name, the source and destination state name and whether the transition
is forced.
The function looks for the following entries in ``event.kwargs``:
* ``forced``: if the transition is forced; typically the ``to_{state}``
transitions will use it.
Parameters
----------
state_event : :class:`transition.EventData`
happened event
Returns
-------
msg : string
string to containing the message to send
fmt_args : dict
formatting keys for ``msg`` (only positional arguments)
'''
msg = ('{name}: {force}transition "{trans}" between state "{source}"'
' and "{dest}"')
fmt_args = dict(name=self.__class__.__name__, force='',
trans=state_event.event.name,
source=state_event.transition.source,
dest=state_event.transition.dest)
if state_event.kwargs.get('forced', False):
fmt_args['force'] = 'forced '
return msg, fmt_args
[docs]class StateMixin(object):
'''Mixin class that provides implementation for common functionalities that
can be used by state machines'''
[docs] def emit_state_name(self):
'''Emit a TCS event with topic :attr:`STATE_NAME_TOPIC` to document
state the current state.
This method assumes that the class has a :attr:`state` attribute
'''
# instantiate the TCSDict with the necessary keys
event = tcs_event.TCSDict(machine_name=self.__class__.__name__,
state=self.state)
event.topic = STATE_NAME_TOPIC
# finally send the event
zmq_server = utils.get_zmq_server('ocd_main_loop')
zmq_server.send_tcs_event(STATE_NAME_TOPIC, event)
[docs]@six.add_metaclass(abc.ABCMeta)
class BaseState(object):
'''This class defines the mandatory API for state machines used by OCD. The
properties and methods defined here must be reimplemented in derived
classes
Attributes
----------
topics
trigger_topics
machine
'''
@property
def topics(self):
'''list of topics handled by the state machine. Defaults to
:attr:`trigger_topics` plus :data:`TCS_HEARTBEAT_TOPIC`'''
return self.trigger_topics + [TCS_HEARTBEAT_TOPIC, ]
@utils.abstractproperty
def trigger_topics(self): # pragma: no cover
'''list of topics that can trigger a machine state change'''
pass
@utils.abstractproperty
def machine(self): # pragma: no cover
''':class:`transitions.Machine` instance representing the states'''
pass
[docs] @abc.abstractmethod
def handle_event(self, tcs_topic, tcs_event): # pragma: no cover
'''Handle a TCS event and decide whether the status need to be changed
or not.
Parameters
----------
tcs_topic : string
topic of the event
tcs_event : dict
event to handle
Returns
-------
bool
``True`` if the event has been handled, ``False`` otherwise
'''
return False
[docs]class RunShotState(BaseState, CallbackMixin, StateMixin):
'''State machine that keeps track of shots run by OCD.
The method :meth:`handle_event` accepts events from the TCS stream and, if
relevant, use the event payloads to modify the internal state.
See the documentation for a graph of the states and explicit transitions.
.. warning::
all the transitions called ``to_{state}``, with ``{state}`` one of the
state names, are created by the :class:`transitions.Machine` and allow
to force the transition to the corresponding state.
Attributes
----------
exp_f : string
format string to build the exposure name
machine : :class:`transitions.Machine`
underlying class representing the states
topic_handler_map : dict
mapping between the topics and the functions that handle the
corresponding events
topics
trigger_topics
'''
machine = None
def __init__(self):
self.exp_f = 'exp{0:02d}'
self.machine = self._make_machine()
# create the mapping between topics and functions
self.topic_handler_map = {run_shot.RUN_TOPIC: self._handle_run_event,
run_shot.SETUP_TOPIC: self._handle_setup_event,
run_shot.EXP_TOPIC: self._handle_exp_event}
self._log_header = '{}:'.format(self.__class__.__name__)
@property
def trigger_topics(self):
'''Topics that trigger a machine state transition. See
:mod:`ocd.run_shot` for a description of topics.'''
return list(self.topic_handler_map.keys())
[docs] def _make_machine(self):
'''create the machine with all the relevant states and transitions
Returns
-------
machine : :class:`transitions.Machine`
state machine
'''
states = ['idle', 'started', 'setup', 'setup_done']
_transitions = [['start', 'idle', 'started'],
['do_setup', 'started', 'setup'],
['finish_setup', 'setup', 'setup_done'],
['finish', 'exp03_done', 'idle']
]
# add the exposures and their transitions
n_exps = 3
for e in range(1, n_exps+1):
exp = self.exp_f.format(e)
states += [exp, exp + '_done']
# add the transition to start an exposure
if e > 1:
source = self.exp_f.format(e-1) + '_done'
else:
source = 'setup_done'
_transitions.append(['do_' + exp, source, exp])
# add the transition to end an exposure
dest = exp + '_done'
_transitions.append(['finish_' + exp, exp, dest])
machine = transitions.Machine(model=self, states=states,
transitions=_transitions,
initial=states[0],
finalize_event=['log_state_change',
'emit_state_change'],
send_event=True)
# title=self.__class__.__name__) # NOTE: use it only with GraphMachine
# NOTE: in order to properly report the error, the following transition
# should be called with the associated traceback:
# self.abort(traceback='error')
machine.add_transition('abort', '*', 'idle')
return machine
[docs] def on_enter_idle(self, state_event):
'''Callback that is triggered when entering the ``idle`` state.
The function looks for the following entry in ``event.kwargs``:
* ``tcs_event``: if present and dictionary like, try to extract the
``shotid`` key; if successful it updates the database calling
:func:`ocd.shots_db.update_shot`. Failures are logged as warning.
If ``tcs_event`` is not present nothing happens.
Parameters
----------
state_event : :class:`transition.EventData`
happened event
'''
try:
key = 'tcs_event'
tcs_event = state_event.kwargs[key]
except KeyError:
msg = ('{0} To properly handle the transition to "idle" state, the'
' "{1}" keyword argument must be passed to the transition'
' "{2}"')
# this is a programming error
tcs_proxy.tcs_log.log_fatal(msg, self._log_header, key,
state_event.event.name)
return
try:
success = not tcs_event['error']
shots_db.update_shot(tcs_event, success=success)
except (TypeError, KeyError, errors.ShotDoesNotExist,
errors.ShotIntegrityError) as e:
msg = ('{0} The shot database could not be updated because'
' the following error happened:\n{1}: {2}')
tcs_proxy.tcs_log.log_warn(msg, self._log_header, type(e), e)
[docs] def handle_event(self, tcs_topic, tcs_event):
'''Handle a TCS event and:
* update the state, if ``tcs_topic`` is
:data:`ocd.run_shot.ALL_TOPICS`; see :mod:`ocd.run_shot` for a
description of topics and events;
* emit the current the state with :meth:`StateMixin.emit_state_name`
(if ``tcs_topic`` is :data:`TCS_HEARTBEAT_TOPIC`).
Parameters
----------
tcs_topic : string
topic of the event
tcs_event : dict
event to handle
Returns
-------
bool
``True`` if ``tcs_topic`` in :attr:`topics`, ``False`` otherwise
'''
if tcs_topic in self.topic_handler_map:
handler = self.topic_handler_map[tcs_topic]
handler(tcs_event)
return True
elif tcs_topic in self.topics:
self.emit_state_name()
return True
else:
# if the topic is not in the list of interesting one, ignore it
return False
[docs] def _handle_run_event(self, tcs_event):
'''Handle events associated with :attr:`~ocd.run_shot.RUN_TOPIC` with
the following logic:
* if the event is marked as starting, try to execute the :meth:`start`
transition; if it fails force :meth:`to_started`;
* if the event is marked as finished, try to execute the :meth:`finish`
transition; if it fails force :meth:`to_idle`;
* if the event is marked as finished and an error happened, execute the
:meth:`abort` with the traceback contained in the event
Parameters
----------
tcs_event : dict
event to handle
'''
exec_status = tcs_event['exec_status']
if exec_status == run_shot.EXEC_STATUS.START:
self._do_transition('start', 'to_started')
elif exec_status == run_shot.EXEC_STATUS.FINISH:
if tcs_event['error']: # if there is an error, abort
self.abort(traceback=tcs_event['traceback'],
tcs_event=tcs_event)
else:
self._do_transition('finish', 'to_idle', tcs_event=tcs_event)
else:
msg = 'Status {0} of event {1} is unknown'
tcs_proxy.tcs_log.log_debug(msg, exec_status, run_shot.RUN_TOPIC)
[docs] def _handle_setup_event(self, tcs_event):
'''Handle events associated with :attr:`~ocd.run_shot.SETUP_TOPIC` with
the following logic:
* if the event is marked as starting, try to execute the
:meth:`do_setup` transition; if it fails force :meth:`to_setup`;
* if the event is marked as finished, try to execute the
:meth:`finish_setup` transition; if it fails force
:meth:`to_setup_done`;
Parameters
----------
tcs_event : dict
event to handle
'''
exec_status = tcs_event['exec_status']
if exec_status == run_shot.EXEC_STATUS.START:
self._do_transition('do_setup', 'to_setup')
elif exec_status == run_shot.EXEC_STATUS.FINISH:
self._do_transition('finish_setup', 'to_setup_done')
else:
msg = 'Status {0} of event {1} is unknown'
tcs_proxy.tcs_log.log_debug(msg, exec_status, run_shot.SETUP_TOPIC)
[docs] def _handle_exp_event(self, tcs_event):
'''Handle events associated with :attr:`~ocd.run_shot.EXP_TOPIC` with
the following logic:
* if the event is marked as starting, try to execute the
:meth:`do_exp{0:02d}` transition; if it fails force
:meth:`to_exp{0:02d}`;
* if the event is marked as finished, try to execute the
:meth:`finish_exp{0:02d}` transition; if it fails force
:meth:`to_exp{0:02d}_done`.
The format field ``{0:02d}`` is a zero padded two digit integer, whose
value is taken from the ``exposure`` key of the input ``tcs_event``
Parameters
----------
tcs_event : dict
event to handle
'''
exec_status = tcs_event['exec_status']
exp_str = self.exp_f.format(tcs_event['exposure'])
try:
if exec_status == run_shot.EXEC_STATUS.START:
self._do_transition('do_' + exp_str, 'to_' + exp_str)
elif exec_status == run_shot.EXEC_STATUS.FINISH:
self._do_transition('finish_' + exp_str,
'to_' + exp_str + '_done')
else:
msg = 'Status {0} of event {1} for exposure {2} is unknown'
tcs_proxy.tcs_log.log_debug(msg, exec_status,
run_shot.SETUP_TOPIC, exp_str)
except AttributeError:
msg = '{0} does not have states representing exposure {1}'
tcs_proxy.tcs_log.log_debug(msg, self.__class__.__name__, exp_str)
[docs] def _do_transition(self, trigger_name, force_name, *args, **kwargs):
'''Try to execute the ``triggered`` transition: if it fail, run the
forced one.
Parameters
----------
triggered, forced : string
name of the transitions to force
args, kwargs : arguments
positional and keyword arguments to pass to the triggered
transitions
'''
try:
self.trigger(trigger_name, *args, **kwargs)
except transitions.core.MachineError:
kwargs['forced'] = True
self.trigger(force_name, *args, **kwargs)
[docs]class MetrologyState(BaseState, CallbackMixin, StateMixin):
'''State machine that keep track of the metrology.
Any time one of the events that can update containers in the
:class:`ocd.storage.MetrologyVault`, a state change is evaluated.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration
vault : :class:`ocd.storage.MetrologyVault`
vault containing the values of interest
Attributes
----------
machine : :class:`transitions.Machine`
underlying class representing the states
topics
trigger_topics
quantities : list
names of the quantities to check
ref_values : dict
for each quantities, save the minimum and maximum value allowed
'''
machine = None
def __init__(self, conf, vault):
self._conf = conf
self._vault = vault
self.machine = self._make_machine()
# Get the reference values from the configuration file
# quantities to check
self.quantities = ['fwhm', 'skymag', 'transparency']
# get all the reference values
self.ref_values = {}
self._set_ref_values()
[docs] def _set_ref_values(self):
'''set the reference value in :attr:`ref_values`'''
section = 'containers'
option = 'ref_{q}'
for k in self.quantities:
self.ref_values[k] = self._conf.get_list(section,
option.format(q=k),
cast_to=float)[:2]
@property
def trigger_topics(self):
'''Same as the topics used by the input ``vault``'''
return self._vault.topics
[docs] def _make_machine(self):
'''create the machine with all the relevant states and transitions
Returns
-------
machine : :class:`transitions.Machine`
state machine
'''
states = ['bad', 'good']
machine = transitions.Machine(model=self, states=states,
initial=states[0],
finalize_event=['log_state_change',
'emit_state_change'],
send_event=True)
machine.add_transition('improved', '*', 'good')
machine.add_transition('deteriorated', '*', 'bad')
return machine
[docs] def handle_event(self, tcs_topic, tcs_event):
'''Handle a TCS event and:
* update the state, if ``tcs_topic`` is the same of the topics handled
by :class:`ocd.storage.MetrologyVault`.
* emit the current the state with :meth:`StateMixin.emit_state_name`
(if ``tcs_topic`` is :data:`TCS_HEARTBEAT_TOPIC`).
Parameters
----------
tcs_topic : string
topic of the event
tcs_event : dict
event to handle
Returns
-------
bool
whether the topic triggers a state change
'''
if tcs_topic in self.trigger_topics:
probes_conditions, probe_msg = self.compare_probes()
is_good = self.good_conditions(probes_conditions)
if is_good:
self.improved(extra_msg=probe_msg)
else:
self.deteriorated(extra_msg=probe_msg)
return True
elif tcs_topic in self.topics:
self.emit_state_name()
return True
else:
# if the topic is of no interest, skip it
return False
[docs] def compare_probes(self):
'''For each probe, check that the conditions are good enough to run
HETDEX.
Compare the :attr:`~ocd.storage.Container.median_masked` of the
values stored in the :class:`~ocd.storage.MetrologyVault` the
corresponding expected values from the ``[containers]`` section of the
configuration file.
Returns
-------
is_good : list of bool
True is the conditions are in spec for each guide probe
probe_msg : string
string with the guide probe values and their reference values
'''
# check both probes and store if the conditions are good for each one
# independently
is_good = []
container_name = '{q}_g{p}'
# log message
msg = ('Masked median values for gp1 and gp2, compared with'
' the reference values.\n')
for k in self.quantities:
line = ('{k}: {{{k}_g1}}, {{{k}_g2}},'
' ref: [{{{k}_min}}, {{{k}_max}}]\n')
msg += line.format(k=k)
# dictionary with the keys for the log message
msg_keys = {}
for probe in [1, 2]:
comparisons = []
for k in self.quantities:
key = container_name.format(q=k, p=probe)
container = self._vault[key]
value = container.median_masked
min_, max_ = self.ref_values[k]
comparisons.append(min_ <= value <= max_)
# save the values for the log message
msg_keys[key] = value
msg_keys['{k}_min'.format(k=k)] = min_
msg_keys['{k}_max'.format(k=k)] = max_
is_good.append(all(comparisons))
probe_msg = msg.format(**msg_keys)
return is_good, probe_msg
[docs] def good_conditions(self, probes_conditions):
'''From the list of boolean representing the probes conditions, extract
a single value representing good/bad conditions.
If the option ``both_gp_good`` of the ``[containers]`` section is
``yes``/``true``, then all the probes must be in spec, otherwise can be
just one of them.
Parameters
----------
probes_conditions : list of bool
True is the conditions are in spec for each guide probe
Returns
-------
bool
good conditions
'''
if self._conf['containers'].getboolean('both_gp_good'):
compare_func = all
else:
compare_func = any
return compare_func(probes_conditions)
[docs] def update_config(self, conf):
'''Update the local copy of the configuration file and
:attr:`ref_values`.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
new configuration object
Returns
-------
string
message to report back
'''
self._conf = conf
self._set_ref_values()
return ('MetrologyState: the configuration object and the local'
' variable storing "[containers]ref_*" options have been'
' updated')
HETDEX_ALLOWED_TOPIC = 'ocd.states.hetdex_allowed'
'''Topic associated with the events necessary to update the
:class:`HetdexAllowedState` state machine'''
HETDEX_CHANGED_TOPIC = 'ocd.states.hetdex_allowed_changed'
'''Topic of the events emitted after a state is changed'''
_HetdexToggle = namedtuple('ENUM', 'ENABLE, DISABLE')
HETDEX_TOGGLE = _HetdexToggle(ENABLE=0, DISABLE=1)
'''Enumerator to use to signal that HETDEX **is to be** enabled or disabled'''
_HetdexToggleChanged = namedtuple('ENUM', 'ENABLED, DISABLED')
HETDEX_CHANGED = _HetdexToggleChanged(ENABLED=10, DISABLED=11)
'''Enumerator to use to signal that HETDEX **has been** enabled or disabled'''
[docs]class HetdexAllowedState(BaseState, CallbackMixin, StateMixin):
'''This state tracks whether OCD is allowed to execute HETDEX shots even if
all the other states would allow it.
A state change is triggered by tcs events with topic
:data:`HETDEX_ALLOWED_TOPIC`. The associated event needs to contain one
entry:
+------------------------------+------------------------------------------+
| Topic | Event |
+==============================+==========================================+
| ocd.states.hetdex_allowed | action (int): |
| | if it is :data:`HETDEX_TOGGLE.ENABLE` |
| | triggers :meth:`allow`, if it is |
| | :data:`HETDEX_TOGGLE.DISABLE` triggers |
| | :meth:`forbid` |
+------------------------------+------------------------------------------+
This state is triggered externally via the ``ocd allow_hetdex
{start,stop}`` command.
When a state change happens, a TCS-like event with topic
:data:`HETDEX_CHANGED_TOPIC` is emitted. The associated event is:
+---------------------------+-------------+-------------------------------+
| Topic | State | Event |
+===========================+=============+===============================+
| ocd.states.hetdex_allowed | allowed | new_state (int): |
| | | :data:`HETDEX_CHANGED.ENABLED`|
+---------------------------+-------------+-------------------------------+
| ocd.states.hetdex_allowed | not_allowed |new_state (int): |
| | |:data:`HETDEX_CHANGED.DISABLED`|
+---------------------------+-------------+-------------------------------+
.. note::
Since the state is update in the main OCD loop, it make sense to use
the ``ocd_main_loop`` ZeroMQ server here. In the case we need more
flexibility it is possible to add a new server.
Attributes
----------
machine : :class:`transitions.Machine`
underlying class representing the states
topics
trigger_topics
'''
machine = None
def __init__(self):
self.machine = self._make_machine()
@property
def trigger_topics(self):
'''Topics that triggers a machine state change. I.e.
:attr:`HETDEX_ALLOWED_TOPIC`'''
return [HETDEX_ALLOWED_TOPIC, ]
[docs] def _make_machine(self):
'''create the machine with all the relevant states and transitions
Returns
-------
machine : :class:`transitions.Machine`
state machine
'''
states = ['allowed', 'not_allowed']
machine = transitions.Machine(model=self, states=states,
initial=states[1],
finalize_event=['log_state_change',
'emit_state_change'],
send_event=True)
machine.add_transition('allow', '*', 'allowed', after='emit_tcs_event')
machine.add_transition('forbid', '*', 'not_allowed',
after='emit_tcs_event')
return machine
[docs] def handle_event(self, tcs_topic, tcs_event):
'''Handle a TCS event and:
* update the state, if ``tcs_topic`` is :data:`HETDEX_ALLOWED_TOPIC`.
* emit the current the state with :meth:`StateMixin.emit_state_name`
(if ``tcs_topic`` is :data:`TCS_HEARTBEAT_TOPIC`).
Parameters
----------
tcs_topic : string
topic of the event
tcs_event : dict
event to handle
Returns
-------
bool
whether the event has been handled
'''
if tcs_topic in self.trigger_topics:
try:
action = tcs_event['action']
except KeyError: # if the action key does not exist, ignore it
pass
else:
if action == HETDEX_TOGGLE.ENABLE:
self.allow()
elif action == HETDEX_TOGGLE.DISABLE:
self.forbid()
else: # no transition
msg = 'Action {0} of event {1} is unknown'
tcs_proxy.tcs_log.log_debug(msg, action,
HETDEX_ALLOWED_TOPIC)
return True
elif tcs_topic in self.topics:
self.emit_state_name()
return True
else:
return False
[docs] def emit_tcs_event(self, state_event):
'''Emit a TCS-like event using a ZeroMQ server. See
:class:`HetdexAllowedState` for more info.
Parameters
----------
state_event : :class:`transition.EventData`
happened event
'''
dest_state = state_event.transition.dest
if dest_state == 'allowed':
event = {'new_state': HETDEX_CHANGED.ENABLED}
else:
event = {'new_state': HETDEX_CHANGED.DISABLED}
zmq_server = utils.get_zmq_server('ocd_main_loop')
zmq_server.send_tcs_event(HETDEX_CHANGED_TOPIC, event)