ocd.utils – Miscellaneous utilities

OCD utilities

ocd.utils.STATIC_DIR = 'static'

Static ocd directory

ocd.utils.common_parser_arguments()[source]

Return a command line parser without help nor description. Useful for as parent parser.

Returns:
parser : argparse.ArgumentParser
ocd.utils.override_epilog_msg(parser)[source]

Add an epilogue about overriding configuration options to the parser

Parameters:
parser : argparse.ArgumentParser
Returns:
parser : argparse.ArgumentParser
ocd.utils.compare_with_value(comp_operator, aggregator, value, *keys)[source]

Returns a function that checks some keys in the given event.

Parameters:
comp_operator : callable

executable that compares each element of event to value, e.g. operator.eq()

aggregator : callable

function that compress the information from comp_operator into a single value, e.g. all() or any().

value : object

value to compare to

keys : strings

keys whose values must be checked

Returns:
comp_func : function

function with signature:

comp_func(event) -> bool
Parameters:
  event : dictionary
    dictionary containing the values associated to the keys
Returns:
  bool
    True if the values in event match the the expectation, False otherwise

Examples

>>> from operator import eq
>>> all_equal_true = compare_with_value(eq, all, 'true', 'key1', 'key2')
>>> all_equal_true({'key1': 'true', 'key2': 'true', 'other': 42})
True
>>> all_equal_true({'key1': 'true', 'key2': 'false', 'other': 42})
False
>>> all_equal_true({'key1': 'true', 'key3': 'false', 'other': 42})
True
ocd.utils.cast_dictionary(in_dict, strings=[], floats=[], ints=[], binary_strings=False)[source]

Get the input dictionary and create a new one with the desired keys cast to string, float or int

Parameters:
in_dict : dictionary-like

the values are extracted from here

strings, floats, ints, optional : iterables

list of the in_dict keywords that need to be converted to strings, floats or integers, respectively

binary_strings : bool

if true, all the strings in the output are encoded into binaries

Returns:
out_dict : dictionary

dictionary containing all the key:value pairs lists in strings, floats and ints

Warning

The code does not check for key repetitions and cast values in the same order as in the function signature. So if you use the same key in, e.g., string and ints the corresponding output value will be an integer.

The code does not protect against missing keys.

ocd.utils.binary_dict(in_dict)[source]

Recursively convert all the string/unicode in the dictionary to binary strings.

In python 2 makes sure that keys are not unicode.

Parameters:
in_dict : dictionary

dictionary to convert

Returns:
oud_dict : dictionary

copy of the input dictionary with all strings converted to binary strings

ocd.utils._safe_string_to_bytes(string, encoding='utf-8')[source]

Safe version of tcs_lib.string_helpers.string_to_bytes(). If the input cannot be converted, returns it.

Parameters:
all :

same as string_to_bytes()

Returns:
If string_to_bytes() fails, return the input
as it is, otherwise returns the binary version of the input
ocd.utils.multiwrap(*decorators)[source]

Create a decorator combining the given decorators

ocd.utils.abstractproperty(func)
ocd.utils.tmpdir()[source]

When called the first time creates a temporary directory with py.path.local.make_numbered_dir(), store and return it. Subsequent calls, return the same directory. Keep the last 10 directories, with a lock time of 1 days.

Returns:
string

temporary directory

ocd.utils.get_out_file(out_dir, file_template, default_name, n_keep=-1)[source]

Return the absolute path of the file.

Parameters:
out_dir : string

this can be either a directory on the system or :tmpdir:; in the latter the temporary directory from tmpdir() is used. If the directory does not exist, try to create it

file_template : string

name of the file where to write the shots; it can contain at most one format placeholder, either {} or {}. If file_template.format(0) fails, default_name is used. If the file contains a format placeholder each call creates a new file with a counter using FileNameRotator()

default_name : string

default to use if file_template is not in spec

n_keep : integer, optional

when rotating the files, keep at most n_keep files. Use -1 to keep all the files

Returns:
out_file : string

name of the file

class ocd.utils.MockTime(start_time)[source]

Bases: object

When initialized saves the time to use and the UTC time at initialization.

Parameters:
start_time : string

time to use as starting point

Attributes:
start_time : astropy.time.Time

representation of the input start_time

init_time : astropy.time.Time

time of initialisation of this class

now(self)[source]

Returns the time elapsed from init_time w.r.t start_time.

Returns:
astropy.time.Time instance elapsed from

start_time + (Time.now() - init_time) where Time.now() is when this property is called

ocd.utils.init_times(conf)[source]

Initialize the class returning the time.

Parameters:
conf : pyhetdex.tools.configuration.ConfigParser

configuration.

ocd.utils.get_time_utc()[source]

Get the time from the object initialized by init_times

Returns:
astropy.time.Time

current or mocked UTC time

ocd.utils.get_time_jd()[source]

Get the time from the object initialized by init_times()

Returns:
float

current or mocked Julian Date corresponding to get_utc()

ocd.utils.init_zmq_servers(command, conf)[source]

Extract the urls/paths/… to use for the servers from the conf and store them.

The following options of the [urls] section are required:

  • ocd_run_shot: url(s) used by the ocd.run_shot module
  • ocd_main_loop: url(s) used to emit events from the OCD main loop
  • ocd_allow_hetdex: url(s) used by the ocd allow_hetdex command
  • ocd_db_replay: url(s) used by the ocd db_replay command
  • ocd_config: url(s) used by the ocd config command

The values of each one of them can be a list of comma separated URLs. The above names are the same used as input to the get_zmq_server_urls(), get_zmq_server_url() and get_zmq_server() functions.

All the urls are saved and returned by get_zmq_server_urls(). The urls for the servers that can be initialized for the given command are stored and returned by the get_zmq_server_url().

If multiple urls are provided for any of the above options, the following mechanism is used to decide which one to use:

  • get the value of the option n_{server_url}, with {server_url} one of the above names, in the [{command}] section, with {command} the value passed to this function;
  • if the section or option do not exist or is empty, use the first url of the list;
  • otherwise the value should be an integer, i, and the corresponding entry is stored;

Warning

In principle it is possible to use any ZeroMQ protocol that supports the PUB/SUB pattern. However, we cannot ensure that the publishers are always initialized before the receivers, therefore protocols like inproc will not work.

Parameters:
command : string

name of the OCD subcommand that starts the servers

conf : pyhetdex.tools.configuration.ConfigParser

configuration.

Raises:
OCDValueError

when the index for the url to save is not an integer

OCDIndexError

when the index for the url to save is out of bound

ocd.utils.get_zmq_server(name)[source]

Create a tcs_lib.server.ZMQServer object in PUB mode, store and returns it. Subsequent calls with the same name will always return the same object.

Parameters:
name : string

name of the server

Returns:
tcs_lib.server.ZMQServer
Raises:
KeyError

if name has no associated url/path stored by init_zmq_servers()

ocd.utils.get_zmq_server_url(name)[source]

Retrieve the zmq server url/path.

Parameters:
name : string

name of the server

Returns:
string

url to which the server is bind

ocd.utils.get_zmq_server_urls(name=None)[source]

Retrieve all the urls/paths passed via the configuration.

Parameters:
name : string, optional

if given, returns only the urls/paths for the give name, otherwise returns all

Returns:
list of strings
ocd.utils.timeout_context(timeout)[source]

Context manager that times out. If timeout is zero, timeout gets disabled (waits forever)

Parameters:
timeout : int

seconds to wait before the timeout is hit

Raises:
TimeOutError

if the timeout is reached

Examples

>>> from ocd.errors import TimeOutError
>>> try:
...     with timeout_context(1):
...         # do something very interesting
...         pass
... except TimeOutError:
...     print('Timeout hit')
... else:
...     print('Finish executing')
Finish executing
>>> try:
...     with timeout_context(1):
...         import time
...         time.sleep(2)
... except TimeOutError:
...     print('Timeout hit')
... else:
...     print('Finish executing')
Timeout hit
ocd.utils.function_timeout(seconds, func, *args, **kwargs)[source]

Execute the function for at most seconds seconds. If the execution finishes, gives back the function return values and False, otherwise returns (None, True).

Parameters:
seconds : int

number of seconds to set for the timeout; only integers are allowed

func :

callable to executed

args, kwargs :

positional and keyword arguments to pass to func

Returns:
returned :

return value of func or None if the timeout is triggered

timeout_hit : bool

whether the timeout has been hit

Examples

>>> import time
>>> def my_func(i):
...     time.sleep(i)
...     return 42
>>> function_timeout(1, my_func, 0.1)
(42, False)
>>> function_timeout(1, my_func, 2)
(None, True)
class ocd.utils.SendAndListen(server, listener, n_attempts=5, interval=1, timeout=None)[source]

Bases: object

Send messages through a ZeroMQ server and listen until a reply comes in or a timeout is hit.

Parameters:
server : tcs_lib.server.ZMQServer

server to use to send out events

listener : tcs_lib.tcs_event.TCSEvent

event listener

n_attempts : int, optional

maximum number of attempts, if None or non positive, send forever;

interval : float, optional

interval between event sent, set negative to send events continuously

timeout : int, optional

timeout used interrupt the listener (passed to timeout_context()). If None, is computed from n_attempts and interval, if possible, or set to 10 seconds

classmethod from_addresses(server_address, listener_addresses, topics=None, **kwargs)[source]

Create a tcs_lib.server.ZMQServer instance using server_address and a tcs_lib.tcs_event.TCSEvent using listener_address and topics and then initialize the class

Parameters:
server_address : string

url/path/… to use to initialize the server

listener_addresses : list of strings

url/path/… to use to initialize the listener

topics : list of strings, optional

list of topics to pass to the listener

kwargs :

positional arguments passed to the class constructor

classmethod from_names(server_name, listener_name, topics=None, **kwargs)[source]

Get the server with get_zmq_server() and the urls to listen to with get_zmq_server_urls() and then initialize the class.

Parameters:
server_name, listener_name : string

names of the server and of the urls to listen to as saved in init_zmq_servers()

topics : list of strings, optional

list of topics to pass to the listener

kwargs :

positional arguments passed to the class constructor

send(self, topic, event)[source]

Send an event with topic topic at most n_attempts and wait one every timeout seconds before attempting to send a new one.

listen() can interrupt the loop.

Parameters:
topic : string

topic of the event to send

event : dict

payload of the event to send

listen(self, expected_topic=None, expected_event=None)[source]

Loop the listener until it gets an event confirming the action.

Parameters:
expected_topic : string, optional

topic to listen to; if None accept any topic that the listener returns

expected_event : dict, optional

if not None, tcs events returned by the listener must contain event to be considered valid, otherwise every event is considered as valid; if valid, the send() is interrupted

Returns:
out_topic : string

topic of the first accepted event

out_event : dict

payload of the first accepted event

communicate(self, send_topic, send_event, expected_topic=None, expected_event=None)[source]

Run send(), emitting send_topic and send_event, in a Thread, while listening for expected_topic and expected_event. listen() timed out to avoid locks.

Once listen() returns, also the thread running send() is interrupted, event if n_attempts is set to None or negative. When an expected event is received, the sending thread is finished. This method can be called multiple times.

Parameters:
send_topic, send_event :

same as the input parameters of send()

expected_topic, expected_event :

same as the input parameters of listen()

Returns:
out_topic, out_event :

same as the output parameters of listen(),

timeout_hit : bool

if True, the listener timed out before receiving the expected event. In this case out_topic and out_event are set to None. If False, the expected event has been received and returned in the above parameters