Source code for ocd.utils

# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''OCD utilities'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import abc
import argparse as ap
import contextlib
import itertools as it
import math
import os
import signal
import sys
import threading
import warnings

from astropy.time import Time
from pyhetdex.tools.files import file_tools
from py.path import local
import six
from tcs_lib import server as tcs_server
from tcs_lib import tcs_event, string_helpers
from tcs_lib.errors import ConvertTypeError

from . import errors


# private storage for tmp directory (only the first element is used)
_tmpdirs = []
# private storage for mock time object
_mock_time = []

# private storage for the zmq servers and their urls/paths
# key: name associated with the server
_zmq_servers = {}
_zmq_server_urls = {}
# all the urls that OCD knows about
_zmq_all_urls = {}


STATIC_DIR = 'static'
'''Static ocd directory'''


###########################
# common command line parse arguments
[docs]def common_parser_arguments(): '''Return a command line parser without help nor description. Useful for as parent parser. Returns ------- parser : :class:`argparse.ArgumentParser` ''' from . import __version__ parser = ap.ArgumentParser(add_help=False) parser.add_argument('-V', '--version', action='version', version=__version__) return parser
[docs]def override_epilog_msg(parser): '''Add an epilogue about overriding configuration options to the parser Parameters ---------- parser : :class:`argparse.ArgumentParser` Returns ------- parser : :class:`argparse.ArgumentParser` ''' parser.epilog = '''The configuration overriding mechanism works only on existing options: make sure that the corresponding sections and options exist. The section names are given in the description of the command and the option names are, unless otherwise stated, the same as the command line options long names.''' return parser
########################### # misc functionalities
[docs]def compare_with_value(comp_operator, aggregator, value, *keys): '''Returns a function that checks some ``keys`` in the given ``event``. Parameters ---------- comp_operator : callable executable that compares each element of ``event`` to value, e.g. :func:`operator.eq` aggregator : callable function that compress the information from ``comp_operator`` into a single value, e.g. :func:`all` or :func:`any`. value : object value to compare to keys : strings keys whose values must be checked Returns ------- comp_func : function function with signature:: comp_func(event) -> bool +---------+------+----------------------------------------------------+ | **Parameters**: | +---------+------+----------------------------------------------------+ | | **event** : dictionary | +---------+------+----------------------------------------------------+ | | | dictionary containing the values associated to the | | | | ``keys`` | +---------+------+----------------------------------------------------+ | **Returns**: | +---------+------+----------------------------------------------------+ | | bool | +---------+------+----------------------------------------------------+ | | | ``True`` if the values in ``event`` match the | | | | the expectation, ``False`` otherwise | +---------+------+----------------------------------------------------+ Examples -------- >>> from operator import eq >>> all_equal_true = compare_with_value(eq, all, 'true', 'key1', 'key2') >>> all_equal_true({'key1': 'true', 'key2': 'true', 'other': 42}) True >>> all_equal_true({'key1': 'true', 'key2': 'false', 'other': 42}) False >>> all_equal_true({'key1': 'true', 'key3': 'false', 'other': 42}) True ''' def _compare_with_value(event): equals = [] for k in keys: try: equals.append(comp_operator(event[k], value)) except KeyError: pass return aggregator(equals) return _compare_with_value
try: # from python 3.5 on isclose = math.isclose except AttributeError: def isclose(a, b, rel_tol=1e-09, abs_tol=0.0): ''' Determine whether two floating point numbers are close in value. For the values to be considered close, the difference between them must be smaller than at least one of the tolerances. Python 2 implementation of Python 3.5 math.isclose() https://hg.python.org/cpython/file/tip/Modules/mathmodule.c#l1993 Copied from `here <https://stackoverflow.com/a/39347138/1860757>`_ Parameters ---------- rel_tol maximum difference for being considered "close", relative to the magnitude of the input values abs_tol maximum difference for being considered "close", regardless of the magnitude of the input values Returns ------- bool True if a is close in value to b, and False otherwise. ''' # sanity check on the inputs if rel_tol < 0 or abs_tol < 0: raise ValueError("tolerances must be non-negative") # short circuit exact equality -- needed to catch two infinities of # the same sign. And perhaps speeds things up a bit sometimes. if a == b: return True # This catches the case of two infinities of opposite sign, or # one infinity and one finite number. Two infinities of opposite # sign would otherwise have an infinite relative tolerance. # Two infinities of the same sign are caught by the equality check # above. if math.isinf(a) or math.isinf(b): return False # now do the regular computation # this is essentially the "weak" test from the Boost library diff = math.fabs(b - a) result = (((diff <= math.fabs(rel_tol * b)) or (diff <= math.fabs(rel_tol * a))) or (diff <= abs_tol)) return result
[docs]def cast_dictionary(in_dict, strings=[], floats=[], ints=[], binary_strings=False): '''Get the input dictionary and create a new one with the desired keys cast to string, float or int Parameters ---------- in_dict : dictionary-like the values are extracted from here strings, floats, ints, optional : iterables list of the ``in_dict`` keywords that need to be converted to strings, floats or integers, respectively binary_strings : bool if true, all the strings in the output are encoded into binaries Returns ------- out_dict : dictionary dictionary containing all the key:value pairs lists in ``strings``, ``floats`` and ``ints`` .. warning:: The code does not check for key repetitions and cast values in the same order as in the function signature. So if you use the same key in, e.g., ``string`` and ``ints`` the corresponding output value will be an integer. The code does not protect against missing keys. ''' out_dict = {} for cast_func, keys in [[str, strings], [float, floats], [int, ints]]: out_dict.update({k: cast_func(in_dict[k]) for k in keys}) if binary_strings: return binary_dict(out_dict) else: return out_dict
[docs]def binary_dict(in_dict): '''Recursively convert all the string/unicode in the dictionary to binary strings. In python 2 makes sure that keys are not unicode. Parameters ---------- in_dict : dictionary dictionary to convert Returns ------- oud_dict : dictionary copy of the input dictionary with all strings converted to binary strings ''' out_dict = {} for k, v in in_dict.items(): # make sure that the key is not unicode if six.PY2 and isinstance(k, six.text_type): key = _safe_string_to_bytes(k) else: key = k if isinstance(v, dict): value = binary_dict(v) else: value = _safe_string_to_bytes(v) out_dict[key] = value return out_dict
[docs]def _safe_string_to_bytes(string, encoding='utf-8'): '''Safe version of :func:`tcs_lib.string_helpers.string_to_bytes`. If the input cannot be converted, returns it. Parameters ---------- all : same as :func:`~tcs_lib.string_helpers.string_to_bytes` Returns ------- If :func:`~tcs_lib.string_helpers.string_to_bytes` fails, return the input as it is, otherwise returns the binary version of the input ''' try: return string_helpers.string_to_bytes(string, encoding=encoding) except ConvertTypeError: return string
[docs]def multiwrap(*decorators): """Create a decorator combining the given decorators""" def wrapper(func): for d in decorators[::-1]: func = d(func) return func return wrapper
if six.PY2: abstractproperty = abc.abstractproperty else: abstractproperty = multiwrap(property, abc.abstractmethod) """* Python 3: decorator from combining :class:`property` and :func:`abc.abstractmethod` * Python 2: alias of :func:`abc.abstractproperty` """ # lock temporary directory for this amount of time. Only directories older than # this can be automatically removed _lock_timeout = 2*24*60*60 # prefix for the temporary directories _prefix = 'ocd-'
[docs]def tmpdir(): '''When called the first time creates a temporary directory with :meth:`py.path.local.make_numbered_dir`, store and return it. Subsequent calls, return the same directory. Keep the last 10 directories, with a lock time of 1 days. Returns ------- string temporary directory ''' try: _tmpdir = _tmpdirs[0] except IndexError: p = local.make_numbered_dir(prefix=_prefix, keep=10, lock_timeout=_lock_timeout) _tmpdir = p.strpath _tmpdirs.append(_tmpdir) return _tmpdir
[docs]def get_out_file(out_dir, file_template, default_name, n_keep=-1): '''Return the absolute path of the file. Parameters ---------- out_dir : string this can be either a directory on the system or ``:tmpdir:``; in the latter the temporary directory from :func:`tmpdir` is used. If the directory does not exist, try to create it file_template : string name of the file where to write the shots; it can contain at most one format placeholder, either ``{}`` or ``{}``. If ``file_template.format(0)`` fails, ``default_name`` is used. If the file contains a format placeholder each call creates a new file with a counter using :func:`~pyhetdex.tools.files.file_tools.FileNameRotator` default_name : string default to use if ``file_template`` is not in spec n_keep : integer, optional when rotating the files, keep at most ``n_keep`` files. Use ``-1`` to keep all the files Returns ------- out_file : string name of the file ''' if out_dir == ':tmpdir:': out_dir = tmpdir() else: if not os.path.exists(out_dir): os.makedirs(out_dir) do_rotation = True try: _fname = file_template.format(0) # try to use format # if it succeeds, either one or zero placeholders are present if _fname == file_template: # no placeholder do_rotation = False except (IndexError, KeyError): # if the formatting fails, resort to a default file name file_template = default_name if do_rotation: rotator = file_tools.FileNameRotator(out_dir, keep=n_keep, file_name=file_template) out_file = rotator.file_name else: out_file = os.path.join(out_dir, file_template) return out_file
[docs]class MockTime(object): '''When initialized saves the time to use and the UTC time at initialization. Parameters ---------- start_time : string time to use as starting point Attributes ---------- start_time : :class:`astropy.time.Time` representation of the input ``start_time`` init_time : :class:`astropy.time.Time` time of initialisation of this class ''' def __init__(self, start_time): self.start_time = Time(start_time, scale='utc') self.init_time = Time.now()
[docs] def now(self): '''Returns the time elapsed from ``init_time`` w.r.t ``start_time``. Returns ------- :class:`astropy.time.Time` instance elapsed from ``start_time + (Time.now() - init_time)`` where ``Time.now()`` is when this property is called ''' return self.start_time + (Time.now() - self.init_time)
[docs]def init_times(conf): '''Initialize the class returning the time. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration. ''' mock_time = conf['dates'].get('mock_time', fallback=None) if mock_time is not None: input_ = six.moves.input answer = input_('You are about to start OCD with mock times' ' starting at {}. This feature is enabled for' ' tests and should not be used when running OCD' ' to drive observations. If you really want to' ' proceed? [y/n] '.format(mock_time)) if answer.strip().lower() in ['y', 'yes']: _mock_time.append(MockTime(mock_time)) else: sys.exit('OCD interrupted by the user')
[docs]def get_time_utc(): '''Get the time from the object initialized by :class:`init_times` Returns ------- :class:`astropy.time.Time` current or mocked UTC time ''' try: mock_time = _mock_time[0] return mock_time.now() except IndexError: return Time.now()
[docs]def get_time_jd(): '''Get the time from the object initialized by :func:`init_times` Returns ------- float current or mocked Julian Date corresponding to :func:`get_utc` ''' return get_time_utc().jd
# Internal zmq servers
[docs]def init_zmq_servers(command, conf): '''Extract the urls/paths/... to use for the servers from the ``conf`` and store them. The following options of the ``[urls]`` section are required: * ``ocd_run_shot``: url(s) used by the :mod:`ocd.run_shot` module * ``ocd_main_loop``: url(s) used to emit events from the OCD main loop * ``ocd_allow_hetdex``: url(s) used by the ``ocd allow_hetdex`` command * ``ocd_db_replay``: url(s) used by the ``ocd db_replay`` command * ``ocd_config``: url(s) used by the ``ocd config`` command The values of each one of them can be a list of comma separated URLs. The above names are the same used as input to the :func:`get_zmq_server_urls`, :func:`get_zmq_server_url` and :func:`get_zmq_server` functions. All the urls are saved and returned by :func:`get_zmq_server_urls`. The urls for the servers that can be initialized for the given ``command`` are stored and returned by the :func:`get_zmq_server_url`. If multiple urls are provided for any of the above options, the following mechanism is used to decide which one to use: * get the value of the option ``n_{server_url}``, with ``{server_url}`` one of the above names, in the ``[{command}]`` section, with ``{command}`` the value passed to this function; * if the section or option do not exist or is empty, use the first url of the list; * otherwise the value should be an integer, ``i``, and the corresponding entry is stored; .. warning:: In principle it is possible to use any `ZeroMQ protocol <http://zeromq.org/docs:features>`_ that supports the `PUB/SUB pattern <https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/patterns/pubsub.html#publish-subscribe>`_. However, we cannot ensure that the publishers are always initialized before the receivers, therefore protocols like ``inproc`` `will not work <http://zguide.zeromq.org/php:chapter2#Unicast-Transports>`_. Parameters ---------- command : string name of the OCD subcommand that starts the servers conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration. Raises ------ OCDValueError when the index for the url to save is not an integer OCDIndexError when the index for the url to save is out of bound ''' # NOTE: use a loop to allow future extensions for name in ['ocd_main_loop', 'ocd_run_shot', 'ocd_allow_hetdex', 'ocd_db_replay', 'ocd_config']: # get the urls urls = conf.get_list('urls', name) # save all the urls _zmq_all_urls[name] = urls n_option = 'n_{}'.format(name) try: index = conf[command][n_option] except KeyError: index = 0 try: index = int(index) if index else 0 except ValueError as e: msg = ('The value provided for "{o} = {i}" is an invalid literal' ' for int() with base 10.') msg = msg.format(o=n_option, i=index) six.raise_from(errors.OCDValueError(msg), e) try: server_url = urls[index] except IndexError as e: if urls: msg = ('The value provided for "{o} = {i}" is out of range. It' ' should between 0 and {m}') msg = msg.format(o=n_option, i=index, m=len(urls) - 1) six.raise_from(errors.OCDIndexError(msg), e) else: msg = ('The option "{o}" in the ``[urls]`` section has no' ' value. The corresponding server will not be' ' available') msg = msg.format(o=name) warnings.warn(msg, category=errors.OCDZMQWarning) continue # Of all the urls available use only one to initialize the servers _zmq_server_urls[name] = server_url
[docs]def get_zmq_server(name): '''Create a :class:`tcs_lib.server.ZMQServer` object in ``PUB`` mode, store and returns it. Subsequent calls with the same ``name`` will always return the same object. Parameters ---------- name : string name of the server Returns ------- :class:`tcs_lib.server.ZMQServer` Raises ------ KeyError if ``name`` has no associated url/path stored by :func:`init_zmq_servers` ''' try: server = _zmq_servers[name] except KeyError: # try to create the server url = get_zmq_server_url(name) server = tcs_server.ZMQServer(url) _zmq_servers[name] = server return server
[docs]def get_zmq_server_url(name): '''Retrieve the zmq server url/path. Parameters ---------- name : string name of the server Returns ------- string url to which the server is bind ''' return _zmq_server_urls[name]
[docs]def get_zmq_server_urls(name=None): '''Retrieve all the urls/paths passed via the configuration. Parameters ---------- name : string, optional if given, returns only the urls/paths for the give name, otherwise returns all Returns ------- list of strings ''' if name: return _zmq_all_urls[name] else: return sum(_zmq_all_urls.values(), [])
###################### # Timeout functionality
[docs]@contextlib.contextmanager def timeout_context(timeout): """Context manager that times out. If ``timeout`` is zero, timeout gets disabled (waits forever) Examples -------- >>> from ocd.errors import TimeOutError >>> try: ... with timeout_context(1): ... # do something very interesting ... pass ... except TimeOutError: ... print('Timeout hit') ... else: ... print('Finish executing') Finish executing >>> try: ... with timeout_context(1): ... import time ... time.sleep(2) ... except TimeOutError: ... print('Timeout hit') ... else: ... print('Finish executing') Timeout hit Parameters ---------- timeout : int seconds to wait before the timeout is hit Raises ------ TimeOutError if the timeout is reached """ def _handler(signum, frame): raise errors.TimeOutError("{} seconds have passed".format(timeout)) signal.signal(signal.SIGALRM, _handler) signal.alarm(timeout) try: yield finally: signal.alarm(0)
[docs]def function_timeout(seconds, func, *args, **kwargs): '''Execute the function for at most ``seconds`` seconds. If the execution finishes, gives back the function return values and ``False``, otherwise returns ``(None, True)``. Examples -------- >>> import time >>> def my_func(i): ... time.sleep(i) ... return 42 >>> function_timeout(1, my_func, 0.1) (42, False) >>> function_timeout(1, my_func, 2) (None, True) Parameters ---------- seconds : int number of seconds to set for the timeout; only integers are allowed func : callable to executed args, kwargs : positional and keyword arguments to pass to ``func`` Returns ------- returned : return value of ``func`` or ``None`` if the timeout is triggered timeout_hit : bool whether the timeout has been hit ''' try: with timeout_context(seconds): returned = func(*args, **kwargs) timeout_hit = False except errors.TimeOutError: returned = None timeout_hit = True return returned, timeout_hit
###################### # communication with other processes
[docs]class SendAndListen(object): '''Send messages through a ZeroMQ server and listen until a reply comes in or a timeout is hit. Parameters ---------- server : :class:`tcs_lib.server.ZMQServer` server to use to send out events listener : :class:`tcs_lib.tcs_event.TCSEvent` event listener n_attempts : int, optional maximum number of attempts, if ``None`` or non positive, send forever; interval : float, optional interval between event sent, set negative to send events continuously timeout : int, optional timeout used interrupt the listener (passed to :func:`timeout_context`). If ``None``, is computed from ``n_attempts`` and ``interval``, if possible, or set to 10 seconds ''' def __init__(self, server, listener, n_attempts=5, interval=1, timeout=None): # save the input self._server = server self._listener = listener if n_attempts is None or n_attempts < 1: self._n_attempts = None else: self._n_attempts = int(n_attempts) self._interval = float(interval) # compute the timeout to use when listening. It must be a positive # integer if timeout: self._timeout = int(timeout) elif self._n_attempts is None or interval < 0: self._timeout = 10 else: self._timeout = int(math.ceil(n_attempts * interval)) # prepare the event to connect the sender thread with the listener self._interrupt_event = threading.Event()
[docs] @classmethod def from_addresses(cls, server_address, listener_addresses, topics=None, **kwargs): '''Create a :class:`tcs_lib.server.ZMQServer` instance using ``server_address`` and a :class:`tcs_lib.tcs_event.TCSEvent` using ``listener_address`` and ``topics`` and then initialize the class Parameters ---------- server_address : string url/path/... to use to initialize the server listener_addresses : list of strings url/path/... to use to initialize the listener topics : list of strings, optional list of topics to pass to the listener kwargs : positional arguments passed to the class constructor ''' server = tcs_server.ZMQServer(server_address) listener = tcs_event.TCSEvent(urls=listener_addresses, topics=topics) return cls(server, listener, **kwargs)
[docs] @classmethod def from_names(cls, server_name, listener_name, topics=None, **kwargs): '''Get the server with :func:`get_zmq_server` and the urls to listen to with :func:`get_zmq_server_urls` and then initialize the class. Parameters ---------- server_name, listener_name : string names of the server and of the urls to listen to as saved in :func:`init_zmq_servers` topics : list of strings, optional list of topics to pass to the listener kwargs : positional arguments passed to the class constructor ''' server = get_zmq_server(server_name) listener_addresses = get_zmq_server_urls(name=listener_name) listener = tcs_event.TCSEvent(urls=listener_addresses, topics=topics) return cls(server, listener, **kwargs)
[docs] def send(self, topic, event): '''Send an ``event`` with topic ``topic`` at most ``n_attempts`` and wait one every ``timeout`` seconds before attempting to send a new one. :meth:`listen` can interrupt the loop. Parameters ---------- topic : string topic of the event to send event : dict payload of the event to send ''' if self._n_attempts is None: iterator = it.count() else: iterator = range(self._n_attempts) for _ in iterator: self._server.send_tcs_event(topic, event) # wait until the event is set and break out of the loop, or until # the timeout is hit and resend the event is_event_set = self._interrupt_event.wait(timeout=self._interval) if is_event_set: break
[docs] def listen(self, expected_topic=None, expected_event=None): '''Loop the ``listener`` until it gets an event confirming the action. Parameters ---------- expected_topic : string, optional topic to listen to; if ``None`` accept any topic that the ``listener`` returns expected_event : dict, optional if not ``None``, tcs events returned by the listener must contain ``event`` to be considered valid, otherwise every event is considered as valid; if valid, the :meth:`send` is interrupted Returns ------- out_topic : string topic of the first accepted event out_event : dict payload of the first accepted event ''' if expected_event is None: expected_event = {} out_topic, out_event = None, None for topic, event in self._listener: if expected_topic and expected_topic != topic: continue is_subdict = all((k in event and event[k] == v) for k, v in expected_event.items()) # if ``expected_event`` is fully contained in ``event``, then we # are done and we can set the event and break out of the loop if is_subdict: self._interrupt_event.set() out_topic = topic out_event = event break return out_topic, out_event
[docs] def communicate(self, send_topic, send_event, expected_topic=None, expected_event=None): '''Run :meth:`send`, emitting ``send_topic`` and ``send_event``, in a :class:`~threading.Thread`, while listening for ``expected_topic`` and ``expected_event``. :meth:`listen` timed out to avoid locks. Once :meth:`listen` returns, also the thread running :meth:`send` is interrupted, event if ``n_attempts`` is set to ``None`` or negative. When an expected event is received, the sending thread is finished. This method can be called multiple times. Parameters ---------- send_topic, send_event : same as the input parameters of :meth:`send` expected_topic, expected_event : same as the input parameters of :meth:`listen` Returns ------- out_topic, out_event : same as the output parameters of :meth:`listen`, timeout_hit : bool if ``True``, the listener timed out before receiving the expected event. In this case ``out_topic`` and ``out_event`` are set to ``None``. If ``False``, the expected event has been received and returned in the above parameters ''' # clear the event, in order to be able to reuse it self._interrupt_event.clear() thread = threading.Thread(target=self.send, args=(send_topic, send_event)) thread.start() output, timeout_hit = function_timeout(self._timeout, self.listen, expected_topic=expected_topic, expected_event=expected_event) # make sure to finish sending self._interrupt_event.set() thread.join() if timeout_hit: out_topic = out_event = None else: out_topic, out_event = output return out_topic, out_event, timeout_hit