Source code for ocd.heart_beat
# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Heartbeat functionality
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import copy
import uuid
from tcs_lib.server import ZMQServer
from . import utils
ENQUIRY_TOPIC = 'ocd.heartbeat.enquiry'
'''Topic sent when enquiring about heartbeat status'''
REPLY_TOPIC = 'ocd.heartbeat.reply'
'''Topic expected as a reply when enquiring about heartbeat status'''
[docs]class HeartBeatQuery(utils.SendAndListen):
'''Specialized version of :class:`~ocd.utils.SendAndListen` to monitor if
the OCD heart beats.
Can also be used to check that a OCD subcommand is in contact with the OCD
main loop (``ocd run``).
Parameters
----------
all : same as :class:`SendAndListen`
Attributes
----------
id_ : str
unique token associated with the current :class:`HeartBeatQuery`
instance. It is send as part of the event sent (``{'id': id_}``) and
it is expected in the answer.
'''
def __init__(self, *args, **kwargs):
super(HeartBeatQuery, self).__init__(*args, **kwargs)
self.id_ = uuid.uuid4().hex
[docs] @classmethod
def from_addresses(cls, server_address, listener_addresses, **kwargs):
'''Same as :meth:`ocd.utils.SendAndListen.from_addresses`, but with the
topic set to :data:`REPLY_TOPIC`.
'''
super_method = super(HeartBeatQuery, cls).from_addresses
return super_method(server_address, listener_addresses,
topics=[REPLY_TOPIC, ], **kwargs)
[docs] @classmethod
def from_names(cls, server_name, listener_name, **kwargs):
'''Same as :meth:`ocd.utils.SendAndListen.from_names`, but with the
topic set to :data:`REPLY_TOPIC`.
'''
super_method = super(HeartBeatQuery, cls).from_names
return super_method(server_name, listener_name,
topics=[REPLY_TOPIC, ], **kwargs)
[docs] def communicate(self):
'''Send a :data:`ENQUIRY_TOPIC` with the :attr:`id_` in the event and
expects a :data:`REPLY_TOPIC` with the same id.
Returns
-------
same as :meth:`ocd.utils.SendAndListen.communicate`
'''
event = {'id': self.id_}
super_method = super(HeartBeatQuery, self).communicate
return super_method(ENQUIRY_TOPIC, event, expected_topic=REPLY_TOPIC,
expected_event=copy.copy(event))
[docs]class HeartBeatHandler(object):
'''The :meth:`handle_event` handles events coming from
:data:`HeartBeatQuery` and send out a reply.
Parameters
----------
server : :class:`tcs_lib.server.ZMQServer`
server to use to send out events
Attributes
----------
topics : list
topics handled by the state machine (:data:`ENQUIRY_TOPIC`)
'''
def __init__(self, server):
self._server = server
self.topics = [ENQUIRY_TOPIC, ]
[docs] @classmethod
def from_address(cls, server_address):
'''Initialize the class from the server address
Parameters
----------
server_address : string
url/path/... use to initialize the server
'''
server = ZMQServer(server_address)
return cls(server)
[docs] @classmethod
def from_name(cls, server_name):
'''Initialize the class from the server name. Uses
:func:`ocd.utils.get_zmq_server`.
Parameters
----------
server_name : string
name of the server as saved by :func:`ocd.utils.init_zmq_servers`
'''
server = utils.get_zmq_server(server_name)
return cls(server)
[docs] def handle_event(self, tcs_topic, tcs_event):
'''Handle a TCS event with :attr:`topics` and re-emit the input event
with topic :data:`REPLY_TOPIC` on the server passed to the constructor.
Parameters
----------
tcs_topic : string
topic of the event
tcs_event : dict
event to handle
Returns
-------
bool
``True`` if the event has been correctly handled, ``False``
otherwise
'''
if tcs_topic in self.topics:
self._server.send_tcs_event(REPLY_TOPIC, tcs_event)
return True
else:
return False