Source code for ocd.heart_beat

# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Heartbeat functionality
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
import copy
import uuid

from tcs_lib.server import ZMQServer

from . import utils

ENQUIRY_TOPIC = 'ocd.heartbeat.enquiry'
'''Topic sent when enquiring about heartbeat status'''
REPLY_TOPIC = 'ocd.heartbeat.reply'
'''Topic expected as a reply when enquiring about heartbeat status'''


[docs]class HeartBeatQuery(utils.SendAndListen): '''Specialized version of :class:`~ocd.utils.SendAndListen` to monitor if the OCD heart beats. Can also be used to check that a OCD subcommand is in contact with the OCD main loop (``ocd run``). Parameters ---------- all : same as :class:`SendAndListen` Attributes ---------- id_ : str unique token associated with the current :class:`HeartBeatQuery` instance. It is send as part of the event sent (``{'id': id_}``) and it is expected in the answer. ''' def __init__(self, *args, **kwargs): super(HeartBeatQuery, self).__init__(*args, **kwargs) self.id_ = uuid.uuid4().hex
[docs] @classmethod def from_addresses(cls, server_address, listener_addresses, **kwargs): '''Same as :meth:`ocd.utils.SendAndListen.from_addresses`, but with the topic set to :data:`REPLY_TOPIC`. ''' super_method = super(HeartBeatQuery, cls).from_addresses return super_method(server_address, listener_addresses, topics=[REPLY_TOPIC, ], **kwargs)
[docs] @classmethod def from_names(cls, server_name, listener_name, **kwargs): '''Same as :meth:`ocd.utils.SendAndListen.from_names`, but with the topic set to :data:`REPLY_TOPIC`. ''' super_method = super(HeartBeatQuery, cls).from_names return super_method(server_name, listener_name, topics=[REPLY_TOPIC, ], **kwargs)
[docs] def communicate(self): '''Send a :data:`ENQUIRY_TOPIC` with the :attr:`id_` in the event and expects a :data:`REPLY_TOPIC` with the same id. Returns ------- same as :meth:`ocd.utils.SendAndListen.communicate` ''' event = {'id': self.id_} super_method = super(HeartBeatQuery, self).communicate return super_method(ENQUIRY_TOPIC, event, expected_topic=REPLY_TOPIC, expected_event=copy.copy(event))
[docs]class HeartBeatHandler(object): '''The :meth:`handle_event` handles events coming from :data:`HeartBeatQuery` and send out a reply. Parameters ---------- server : :class:`tcs_lib.server.ZMQServer` server to use to send out events Attributes ---------- topics : list topics handled by the state machine (:data:`ENQUIRY_TOPIC`) ''' def __init__(self, server): self._server = server self.topics = [ENQUIRY_TOPIC, ]
[docs] @classmethod def from_address(cls, server_address): '''Initialize the class from the server address Parameters ---------- server_address : string url/path/... use to initialize the server ''' server = ZMQServer(server_address) return cls(server)
[docs] @classmethod def from_name(cls, server_name): '''Initialize the class from the server name. Uses :func:`ocd.utils.get_zmq_server`. Parameters ---------- server_name : string name of the server as saved by :func:`ocd.utils.init_zmq_servers` ''' server = utils.get_zmq_server(server_name) return cls(server)
[docs] def handle_event(self, tcs_topic, tcs_event): '''Handle a TCS event with :attr:`topics` and re-emit the input event with topic :data:`REPLY_TOPIC` on the server passed to the constructor. Parameters ---------- tcs_topic : string topic of the event tcs_event : dict event to handle Returns ------- bool ``True`` if the event has been correctly handled, ``False`` otherwise ''' if tcs_topic in self.topics: self._server.send_tcs_event(REPLY_TOPIC, tcs_event) return True else: return False