# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''OCD utilities'''
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import abc
import argparse as ap
import contextlib
import itertools as it
import math
import os
import signal
import sys
import threading
import warnings
from astropy.time import Time
from pyhetdex.tools.files import file_tools
from py.path import local
import six
from tcs_lib import server as tcs_server
from tcs_lib import tcs_event, string_helpers
from tcs_lib.errors import ConvertTypeError
from . import errors
# private storage for tmp directory (only the first element is used)
_tmpdirs = []
# private storage for mock time object
_mock_time = []
# private storage for the zmq servers and their urls/paths
# key: name associated with the server
_zmq_servers = {}
_zmq_server_urls = {}
# all the urls that OCD knows about
_zmq_all_urls = {}
STATIC_DIR = 'static'
'''Static ocd directory'''
###########################
# common command line parse arguments
[docs]def common_parser_arguments():
'''Return a command line parser without help nor description. Useful for
as parent parser.
Returns
-------
parser : :class:`argparse.ArgumentParser`
'''
from . import __version__
parser = ap.ArgumentParser(add_help=False)
parser.add_argument('-V', '--version', action='version',
version=__version__)
return parser
[docs]def override_epilog_msg(parser):
'''Add an epilogue about overriding configuration options to the parser
Parameters
----------
parser : :class:`argparse.ArgumentParser`
Returns
-------
parser : :class:`argparse.ArgumentParser`
'''
parser.epilog = '''The configuration overriding mechanism works only on
existing options: make sure that the corresponding sections and options
exist. The section names are given in the description of the command and
the option names are, unless otherwise stated, the same as the command line
options long names.'''
return parser
###########################
# misc functionalities
[docs]def compare_with_value(comp_operator, aggregator, value, *keys):
'''Returns a function that checks some ``keys`` in the given ``event``.
Parameters
----------
comp_operator : callable
executable that compares each element of ``event`` to value, e.g.
:func:`operator.eq`
aggregator : callable
function that compress the information from ``comp_operator`` into a
single value, e.g. :func:`all` or :func:`any`.
value : object
value to compare to
keys : strings
keys whose values must be checked
Returns
-------
comp_func : function
function with signature::
comp_func(event) -> bool
+---------+------+----------------------------------------------------+
| **Parameters**: |
+---------+------+----------------------------------------------------+
| | **event** : dictionary |
+---------+------+----------------------------------------------------+
| | | dictionary containing the values associated to the |
| | | ``keys`` |
+---------+------+----------------------------------------------------+
| **Returns**: |
+---------+------+----------------------------------------------------+
| | bool |
+---------+------+----------------------------------------------------+
| | | ``True`` if the values in ``event`` match the |
| | | the expectation, ``False`` otherwise |
+---------+------+----------------------------------------------------+
Examples
--------
>>> from operator import eq
>>> all_equal_true = compare_with_value(eq, all, 'true', 'key1', 'key2')
>>> all_equal_true({'key1': 'true', 'key2': 'true', 'other': 42})
True
>>> all_equal_true({'key1': 'true', 'key2': 'false', 'other': 42})
False
>>> all_equal_true({'key1': 'true', 'key3': 'false', 'other': 42})
True
'''
def _compare_with_value(event):
equals = []
for k in keys:
try:
equals.append(comp_operator(event[k], value))
except KeyError:
pass
return aggregator(equals)
return _compare_with_value
try:
# from python 3.5 on
isclose = math.isclose
except AttributeError:
def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
'''
Determine whether two floating point numbers are close in value.
For the values to be considered close, the difference between them
must be smaller than at least one of the tolerances.
Python 2 implementation of Python 3.5 math.isclose()
https://hg.python.org/cpython/file/tip/Modules/mathmodule.c#l1993
Copied from `here <https://stackoverflow.com/a/39347138/1860757>`_
Parameters
----------
rel_tol
maximum difference for being considered "close", relative to the
magnitude of the input values
abs_tol
maximum difference for being considered "close", regardless of the
magnitude of the input values
Returns
-------
bool
True if a is close in value to b, and False otherwise.
'''
# sanity check on the inputs
if rel_tol < 0 or abs_tol < 0:
raise ValueError("tolerances must be non-negative")
# short circuit exact equality -- needed to catch two infinities of
# the same sign. And perhaps speeds things up a bit sometimes.
if a == b:
return True
# This catches the case of two infinities of opposite sign, or
# one infinity and one finite number. Two infinities of opposite
# sign would otherwise have an infinite relative tolerance.
# Two infinities of the same sign are caught by the equality check
# above.
if math.isinf(a) or math.isinf(b):
return False
# now do the regular computation
# this is essentially the "weak" test from the Boost library
diff = math.fabs(b - a)
result = (((diff <= math.fabs(rel_tol * b)) or
(diff <= math.fabs(rel_tol * a))) or
(diff <= abs_tol))
return result
[docs]def cast_dictionary(in_dict, strings=[], floats=[], ints=[],
binary_strings=False):
'''Get the input dictionary and create a new one with the desired keys
cast to string, float or int
Parameters
----------
in_dict : dictionary-like
the values are extracted from here
strings, floats, ints, optional : iterables
list of the ``in_dict`` keywords that need to be converted to strings,
floats or integers, respectively
binary_strings : bool
if true, all the strings in the output are encoded into binaries
Returns
-------
out_dict : dictionary
dictionary containing all the key:value pairs lists in ``strings``,
``floats`` and ``ints``
.. warning::
The code does not check for key repetitions and cast values in the
same order as in the function signature. So if you use the same key
in, e.g., ``string`` and ``ints`` the corresponding output value
will be an integer.
The code does not protect against missing keys.
'''
out_dict = {}
for cast_func, keys in [[str, strings], [float, floats], [int, ints]]:
out_dict.update({k: cast_func(in_dict[k]) for k in keys})
if binary_strings:
return binary_dict(out_dict)
else:
return out_dict
[docs]def binary_dict(in_dict):
'''Recursively convert all the string/unicode in the dictionary to binary
strings.
In python 2 makes sure that keys are not unicode.
Parameters
----------
in_dict : dictionary
dictionary to convert
Returns
-------
oud_dict : dictionary
copy of the input dictionary with all strings converted to binary
strings
'''
out_dict = {}
for k, v in in_dict.items():
# make sure that the key is not unicode
if six.PY2 and isinstance(k, six.text_type):
key = _safe_string_to_bytes(k)
else:
key = k
if isinstance(v, dict):
value = binary_dict(v)
else:
value = _safe_string_to_bytes(v)
out_dict[key] = value
return out_dict
[docs]def _safe_string_to_bytes(string, encoding='utf-8'):
'''Safe version of :func:`tcs_lib.string_helpers.string_to_bytes`. If the
input cannot be converted, returns it.
Parameters
----------
all :
same as :func:`~tcs_lib.string_helpers.string_to_bytes`
Returns
-------
If :func:`~tcs_lib.string_helpers.string_to_bytes` fails, return the input
as it is, otherwise returns the binary version of the input
'''
try:
return string_helpers.string_to_bytes(string, encoding=encoding)
except ConvertTypeError:
return string
[docs]def multiwrap(*decorators):
"""Create a decorator combining the given decorators"""
def wrapper(func):
for d in decorators[::-1]:
func = d(func)
return func
return wrapper
if six.PY2:
abstractproperty = abc.abstractproperty
else:
abstractproperty = multiwrap(property, abc.abstractmethod)
"""* Python 3: decorator from combining :class:`property` and
:func:`abc.abstractmethod`
* Python 2: alias of :func:`abc.abstractproperty`
"""
# lock temporary directory for this amount of time. Only directories older than
# this can be automatically removed
_lock_timeout = 2*24*60*60
# prefix for the temporary directories
_prefix = 'ocd-'
[docs]def tmpdir():
'''When called the first time creates a temporary directory with
:meth:`py.path.local.make_numbered_dir`, store and return it. Subsequent
calls, return the same directory. Keep the last 10 directories, with a
lock time of 1 days.
Returns
-------
string
temporary directory
'''
try:
_tmpdir = _tmpdirs[0]
except IndexError:
p = local.make_numbered_dir(prefix=_prefix, keep=10,
lock_timeout=_lock_timeout)
_tmpdir = p.strpath
_tmpdirs.append(_tmpdir)
return _tmpdir
[docs]def get_out_file(out_dir, file_template, default_name, n_keep=-1):
'''Return the absolute path of the file.
Parameters
----------
out_dir : string
this can be either a directory on the system or ``:tmpdir:``; in the
latter the temporary directory from :func:`tmpdir` is used. If the
directory does not exist, try to create it
file_template : string
name of the file where to write the shots; it can contain at most one
format placeholder, either ``{}`` or ``{}``. If
``file_template.format(0)`` fails, ``default_name`` is used. If the
file contains a format placeholder each call creates a new file with a
counter using :func:`~pyhetdex.tools.files.file_tools.FileNameRotator`
default_name : string
default to use if ``file_template`` is not in spec
n_keep : integer, optional
when rotating the files, keep at most ``n_keep`` files. Use ``-1`` to
keep all the files
Returns
-------
out_file : string
name of the file
'''
if out_dir == ':tmpdir:':
out_dir = tmpdir()
else:
if not os.path.exists(out_dir):
os.makedirs(out_dir)
do_rotation = True
try:
_fname = file_template.format(0) # try to use format
# if it succeeds, either one or zero placeholders are present
if _fname == file_template: # no placeholder
do_rotation = False
except (IndexError, KeyError):
# if the formatting fails, resort to a default file name
file_template = default_name
if do_rotation:
rotator = file_tools.FileNameRotator(out_dir, keep=n_keep,
file_name=file_template)
out_file = rotator.file_name
else:
out_file = os.path.join(out_dir, file_template)
return out_file
[docs]class MockTime(object):
'''When initialized saves the time to use and the UTC time at
initialization.
Parameters
----------
start_time : string
time to use as starting point
Attributes
----------
start_time : :class:`astropy.time.Time`
representation of the input ``start_time``
init_time : :class:`astropy.time.Time`
time of initialisation of this class
'''
def __init__(self, start_time):
self.start_time = Time(start_time, scale='utc')
self.init_time = Time.now()
[docs] def now(self):
'''Returns the time elapsed from ``init_time`` w.r.t ``start_time``.
Returns
-------
:class:`astropy.time.Time` instance elapsed from
``start_time + (Time.now() - init_time)`` where ``Time.now()`` is
when this property is called
'''
return self.start_time + (Time.now() - self.init_time)
[docs]def init_times(conf):
'''Initialize the class returning the time.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration.
'''
mock_time = conf['dates'].get('mock_time', fallback=None)
if mock_time is not None:
input_ = six.moves.input
answer = input_('You are about to start OCD with mock times'
' starting at {}. This feature is enabled for'
' tests and should not be used when running OCD'
' to drive observations. If you really want to'
' proceed? [y/n] '.format(mock_time))
if answer.strip().lower() in ['y', 'yes']:
_mock_time.append(MockTime(mock_time))
else:
sys.exit('OCD interrupted by the user')
[docs]def get_time_utc():
'''Get the time from the object initialized by :class:`init_times`
Returns
-------
:class:`astropy.time.Time`
current or mocked UTC time
'''
try:
mock_time = _mock_time[0]
return mock_time.now()
except IndexError:
return Time.now()
[docs]def get_time_jd():
'''Get the time from the object initialized by :func:`init_times`
Returns
-------
float
current or mocked Julian Date corresponding to :func:`get_utc`
'''
return get_time_utc().jd
# Internal zmq servers
[docs]def init_zmq_servers(command, conf):
'''Extract the urls/paths/... to use for the servers from the ``conf`` and
store them.
The following options of the ``[urls]`` section are required:
* ``ocd_run_shot``: url(s) used by the :mod:`ocd.run_shot` module
* ``ocd_main_loop``: url(s) used to emit events from the OCD main loop
* ``ocd_allow_hetdex``: url(s) used by the ``ocd allow_hetdex`` command
* ``ocd_db_replay``: url(s) used by the ``ocd db_replay`` command
* ``ocd_config``: url(s) used by the ``ocd config`` command
The values of each one of them can be a list of comma separated URLs. The
above names are the same used as input to the :func:`get_zmq_server_urls`,
:func:`get_zmq_server_url` and :func:`get_zmq_server` functions.
All the urls are saved and returned by :func:`get_zmq_server_urls`.
The urls for the servers that can be initialized for the given ``command``
are stored and returned by the :func:`get_zmq_server_url`.
If multiple urls are provided for any of the above options, the following
mechanism is used to decide which one to use:
* get the value of the option ``n_{server_url}``, with ``{server_url}`` one
of the above names, in the ``[{command}]`` section, with ``{command}``
the value passed to this function;
* if the section or option do not exist or is empty, use the first url of
the list;
* otherwise the value should be an integer, ``i``, and the corresponding
entry is stored;
.. warning::
In principle it is possible to use any `ZeroMQ protocol
<http://zeromq.org/docs:features>`_ that supports the `PUB/SUB pattern
<https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/patterns/pubsub.html#publish-subscribe>`_.
However, we cannot ensure that the publishers are always initialized
before the receivers, therefore protocols like ``inproc`` `will not
work <http://zguide.zeromq.org/php:chapter2#Unicast-Transports>`_.
Parameters
----------
command : string
name of the OCD subcommand that starts the servers
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration.
Raises
------
OCDValueError
when the index for the url to save is not an integer
OCDIndexError
when the index for the url to save is out of bound
'''
# NOTE: use a loop to allow future extensions
for name in ['ocd_main_loop', 'ocd_run_shot', 'ocd_allow_hetdex',
'ocd_db_replay', 'ocd_config']:
# get the urls
urls = conf.get_list('urls', name)
# save all the urls
_zmq_all_urls[name] = urls
n_option = 'n_{}'.format(name)
try:
index = conf[command][n_option]
except KeyError:
index = 0
try:
index = int(index) if index else 0
except ValueError as e:
msg = ('The value provided for "{o} = {i}" is an invalid literal'
' for int() with base 10.')
msg = msg.format(o=n_option, i=index)
six.raise_from(errors.OCDValueError(msg), e)
try:
server_url = urls[index]
except IndexError as e:
if urls:
msg = ('The value provided for "{o} = {i}" is out of range. It'
' should between 0 and {m}')
msg = msg.format(o=n_option, i=index, m=len(urls) - 1)
six.raise_from(errors.OCDIndexError(msg), e)
else:
msg = ('The option "{o}" in the ``[urls]`` section has no'
' value. The corresponding server will not be'
' available')
msg = msg.format(o=name)
warnings.warn(msg, category=errors.OCDZMQWarning)
continue
# Of all the urls available use only one to initialize the servers
_zmq_server_urls[name] = server_url
[docs]def get_zmq_server(name):
'''Create a :class:`tcs_lib.server.ZMQServer` object in ``PUB`` mode, store
and returns it. Subsequent calls with the same ``name`` will always return
the same object.
Parameters
----------
name : string
name of the server
Returns
-------
:class:`tcs_lib.server.ZMQServer`
Raises
------
KeyError
if ``name`` has no associated url/path stored by
:func:`init_zmq_servers`
'''
try:
server = _zmq_servers[name]
except KeyError: # try to create the server
url = get_zmq_server_url(name)
server = tcs_server.ZMQServer(url)
_zmq_servers[name] = server
return server
[docs]def get_zmq_server_url(name):
'''Retrieve the zmq server url/path.
Parameters
----------
name : string
name of the server
Returns
-------
string
url to which the server is bind
'''
return _zmq_server_urls[name]
[docs]def get_zmq_server_urls(name=None):
'''Retrieve all the urls/paths passed via the configuration.
Parameters
----------
name : string, optional
if given, returns only the urls/paths for the give name, otherwise
returns all
Returns
-------
list of strings
'''
if name:
return _zmq_all_urls[name]
else:
return sum(_zmq_all_urls.values(), [])
######################
# Timeout functionality
[docs]@contextlib.contextmanager
def timeout_context(timeout):
"""Context manager that times out. If ``timeout`` is zero, timeout gets
disabled (waits forever)
Examples
--------
>>> from ocd.errors import TimeOutError
>>> try:
... with timeout_context(1):
... # do something very interesting
... pass
... except TimeOutError:
... print('Timeout hit')
... else:
... print('Finish executing')
Finish executing
>>> try:
... with timeout_context(1):
... import time
... time.sleep(2)
... except TimeOutError:
... print('Timeout hit')
... else:
... print('Finish executing')
Timeout hit
Parameters
----------
timeout : int
seconds to wait before the timeout is hit
Raises
------
TimeOutError
if the timeout is reached
"""
def _handler(signum, frame):
raise errors.TimeOutError("{} seconds have passed".format(timeout))
signal.signal(signal.SIGALRM, _handler)
signal.alarm(timeout)
try:
yield
finally:
signal.alarm(0)
[docs]def function_timeout(seconds, func, *args, **kwargs):
'''Execute the function for at most ``seconds`` seconds. If the execution
finishes, gives back the function return values and ``False``, otherwise
returns ``(None, True)``.
Examples
--------
>>> import time
>>> def my_func(i):
... time.sleep(i)
... return 42
>>> function_timeout(1, my_func, 0.1)
(42, False)
>>> function_timeout(1, my_func, 2)
(None, True)
Parameters
----------
seconds : int
number of seconds to set for the timeout; only integers are allowed
func :
callable to executed
args, kwargs :
positional and keyword arguments to pass to ``func``
Returns
-------
returned :
return value of ``func`` or ``None`` if the timeout is triggered
timeout_hit : bool
whether the timeout has been hit
'''
try:
with timeout_context(seconds):
returned = func(*args, **kwargs)
timeout_hit = False
except errors.TimeOutError:
returned = None
timeout_hit = True
return returned, timeout_hit
######################
# communication with other processes
[docs]class SendAndListen(object):
'''Send messages through a ZeroMQ server and listen until a reply comes in
or a timeout is hit.
Parameters
----------
server : :class:`tcs_lib.server.ZMQServer`
server to use to send out events
listener : :class:`tcs_lib.tcs_event.TCSEvent`
event listener
n_attempts : int, optional
maximum number of attempts, if ``None`` or non positive, send forever;
interval : float, optional
interval between event sent, set negative to send events continuously
timeout : int, optional
timeout used interrupt the listener (passed to
:func:`timeout_context`). If ``None``, is computed from ``n_attempts``
and ``interval``, if possible, or set to 10 seconds
'''
def __init__(self, server, listener, n_attempts=5, interval=1,
timeout=None):
# save the input
self._server = server
self._listener = listener
if n_attempts is None or n_attempts < 1:
self._n_attempts = None
else:
self._n_attempts = int(n_attempts)
self._interval = float(interval)
# compute the timeout to use when listening. It must be a positive
# integer
if timeout:
self._timeout = int(timeout)
elif self._n_attempts is None or interval < 0:
self._timeout = 10
else:
self._timeout = int(math.ceil(n_attempts * interval))
# prepare the event to connect the sender thread with the listener
self._interrupt_event = threading.Event()
[docs] @classmethod
def from_addresses(cls, server_address, listener_addresses, topics=None,
**kwargs):
'''Create a :class:`tcs_lib.server.ZMQServer` instance using
``server_address`` and a :class:`tcs_lib.tcs_event.TCSEvent` using
``listener_address`` and ``topics`` and then initialize the class
Parameters
----------
server_address : string
url/path/... to use to initialize the server
listener_addresses : list of strings
url/path/... to use to initialize the listener
topics : list of strings, optional
list of topics to pass to the listener
kwargs :
positional arguments passed to the class constructor
'''
server = tcs_server.ZMQServer(server_address)
listener = tcs_event.TCSEvent(urls=listener_addresses, topics=topics)
return cls(server, listener, **kwargs)
[docs] @classmethod
def from_names(cls, server_name, listener_name, topics=None, **kwargs):
'''Get the server with :func:`get_zmq_server` and the urls to listen to
with :func:`get_zmq_server_urls` and then initialize the class.
Parameters
----------
server_name, listener_name : string
names of the server and of the urls to listen to as saved in
:func:`init_zmq_servers`
topics : list of strings, optional
list of topics to pass to the listener
kwargs :
positional arguments passed to the class constructor
'''
server = get_zmq_server(server_name)
listener_addresses = get_zmq_server_urls(name=listener_name)
listener = tcs_event.TCSEvent(urls=listener_addresses, topics=topics)
return cls(server, listener, **kwargs)
[docs] def send(self, topic, event):
'''Send an ``event`` with topic ``topic`` at most ``n_attempts`` and
wait one every ``timeout`` seconds before attempting to send a new one.
:meth:`listen` can interrupt the loop.
Parameters
----------
topic : string
topic of the event to send
event : dict
payload of the event to send
'''
if self._n_attempts is None:
iterator = it.count()
else:
iterator = range(self._n_attempts)
for _ in iterator:
self._server.send_tcs_event(topic, event)
# wait until the event is set and break out of the loop, or until
# the timeout is hit and resend the event
is_event_set = self._interrupt_event.wait(timeout=self._interval)
if is_event_set:
break
[docs] def listen(self, expected_topic=None, expected_event=None):
'''Loop the ``listener`` until it gets an event confirming the action.
Parameters
----------
expected_topic : string, optional
topic to listen to; if ``None`` accept any topic that the
``listener`` returns
expected_event : dict, optional
if not ``None``, tcs events returned by the listener must contain
``event`` to be considered valid, otherwise every event is
considered as valid; if valid, the :meth:`send` is interrupted
Returns
-------
out_topic : string
topic of the first accepted event
out_event : dict
payload of the first accepted event
'''
if expected_event is None:
expected_event = {}
out_topic, out_event = None, None
for topic, event in self._listener:
if expected_topic and expected_topic != topic:
continue
is_subdict = all((k in event and event[k] == v)
for k, v in expected_event.items())
# if ``expected_event`` is fully contained in ``event``, then we
# are done and we can set the event and break out of the loop
if is_subdict:
self._interrupt_event.set()
out_topic = topic
out_event = event
break
return out_topic, out_event
[docs] def communicate(self, send_topic, send_event, expected_topic=None,
expected_event=None):
'''Run :meth:`send`, emitting ``send_topic`` and ``send_event``, in a
:class:`~threading.Thread`, while listening for ``expected_topic`` and
``expected_event``. :meth:`listen` timed out to avoid locks.
Once :meth:`listen` returns, also the thread running :meth:`send` is
interrupted, event if ``n_attempts`` is set to ``None`` or negative.
When an expected event is received, the sending thread is finished.
This method can be called multiple times.
Parameters
----------
send_topic, send_event :
same as the input parameters of :meth:`send`
expected_topic, expected_event :
same as the input parameters of :meth:`listen`
Returns
-------
out_topic, out_event :
same as the output parameters of :meth:`listen`,
timeout_hit : bool
if ``True``, the listener timed out before receiving the expected
event. In this case ``out_topic`` and ``out_event`` are set to
``None``. If ``False``, the expected event has been received and
returned in the above parameters
'''
# clear the event, in order to be able to reuse it
self._interrupt_event.clear()
thread = threading.Thread(target=self.send,
args=(send_topic, send_event))
thread.start()
output, timeout_hit = function_timeout(self._timeout, self.listen,
expected_topic=expected_topic,
expected_event=expected_event)
# make sure to finish sending
self._interrupt_event.set()
thread.join()
if timeout_hit:
out_topic = out_event = None
else:
out_topic, out_event = output
return out_topic, out_event, timeout_hit