ocd.utils – Miscellaneous utilities¶
OCD utilities
-
ocd.utils.STATIC_DIR= 'static'¶ Static ocd directory
-
ocd.utils.common_parser_arguments()[source]¶ Return a command line parser without help nor description. Useful for as parent parser.
Returns: - parser :
argparse.ArgumentParser
- parser :
-
ocd.utils.override_epilog_msg(parser)[source]¶ Add an epilogue about overriding configuration options to the parser
Parameters: - parser :
argparse.ArgumentParser
Returns: - parser :
argparse.ArgumentParser
- parser :
-
ocd.utils.compare_with_value(comp_operator, aggregator, value, *keys)[source]¶ Returns a function that checks some
keysin the givenevent.Parameters: - comp_operator : callable
executable that compares each element of
eventto value, e.g.operator.eq()- aggregator : callable
function that compress the information from
comp_operatorinto a single value, e.g.all()orany().- value : object
value to compare to
- keys : strings
keys whose values must be checked
Returns: - comp_func : function
function with signature:
comp_func(event) -> bool
Parameters: event : dictionary dictionary containing the values associated to the keysReturns: bool Trueif the values ineventmatch the the expectation,Falseotherwise
Examples
>>> from operator import eq >>> all_equal_true = compare_with_value(eq, all, 'true', 'key1', 'key2') >>> all_equal_true({'key1': 'true', 'key2': 'true', 'other': 42}) True >>> all_equal_true({'key1': 'true', 'key2': 'false', 'other': 42}) False >>> all_equal_true({'key1': 'true', 'key3': 'false', 'other': 42}) True
-
ocd.utils.cast_dictionary(in_dict, strings=[], floats=[], ints=[], binary_strings=False)[source]¶ Get the input dictionary and create a new one with the desired keys cast to string, float or int
Parameters: - in_dict : dictionary-like
the values are extracted from here
- strings, floats, ints, optional : iterables
list of the
in_dictkeywords that need to be converted to strings, floats or integers, respectively- binary_strings : bool
if true, all the strings in the output are encoded into binaries
Returns: - out_dict : dictionary
dictionary containing all the key:value pairs lists in
strings,floatsandintsWarning
The code does not check for key repetitions and cast values in the same order as in the function signature. So if you use the same key in, e.g.,
stringandintsthe corresponding output value will be an integer.The code does not protect against missing keys.
-
ocd.utils.binary_dict(in_dict)[source]¶ Recursively convert all the string/unicode in the dictionary to binary strings.
In python 2 makes sure that keys are not unicode.
Parameters: - in_dict : dictionary
dictionary to convert
Returns: - oud_dict : dictionary
copy of the input dictionary with all strings converted to binary strings
-
ocd.utils._safe_string_to_bytes(string, encoding='utf-8')[source]¶ Safe version of
tcs_lib.string_helpers.string_to_bytes(). If the input cannot be converted, returns it.Parameters: - all :
same as
string_to_bytes()
Returns: - If
string_to_bytes()fails, return the input - as it is, otherwise returns the binary version of the input
-
ocd.utils.abstractproperty(func)¶ - Python 3: decorator from combining
propertyandabc.abstractmethod() - Python 2: alias of
abc.abstractproperty()
- Python 3: decorator from combining
-
ocd.utils.tmpdir()[source]¶ When called the first time creates a temporary directory with
py.path.local.make_numbered_dir(), store and return it. Subsequent calls, return the same directory. Keep the last 10 directories, with a lock time of 1 days.Returns: - string
temporary directory
-
ocd.utils.get_out_file(out_dir, file_template, default_name, n_keep=-1)[source]¶ Return the absolute path of the file.
Parameters: - out_dir : string
this can be either a directory on the system or
:tmpdir:; in the latter the temporary directory fromtmpdir()is used. If the directory does not exist, try to create it- file_template : string
name of the file where to write the shots; it can contain at most one format placeholder, either
{}or{}. Iffile_template.format(0)fails,default_nameis used. If the file contains a format placeholder each call creates a new file with a counter usingFileNameRotator()- default_name : string
default to use if
file_templateis not in spec- n_keep : integer, optional
when rotating the files, keep at most
n_keepfiles. Use-1to keep all the files
Returns: - out_file : string
name of the file
-
class
ocd.utils.MockTime(start_time)[source]¶ Bases:
objectWhen initialized saves the time to use and the UTC time at initialization.
Parameters: - start_time : string
time to use as starting point
Attributes: - start_time :
astropy.time.Time representation of the input
start_time- init_time :
astropy.time.Time time of initialisation of this class
-
now(self)[source]¶ Returns the time elapsed from
init_timew.r.tstart_time.Returns: astropy.time.Timeinstance elapsed fromstart_time + (Time.now() - init_time)whereTime.now()is when this property is called
-
ocd.utils.init_times(conf)[source]¶ Initialize the class returning the time.
Parameters: - conf :
pyhetdex.tools.configuration.ConfigParser configuration.
- conf :
-
ocd.utils.get_time_utc()[source]¶ Get the time from the object initialized by
init_timesReturns: astropy.time.Timecurrent or mocked UTC time
-
ocd.utils.get_time_jd()[source]¶ Get the time from the object initialized by
init_times()Returns: - float
current or mocked Julian Date corresponding to
get_utc()
-
ocd.utils.init_zmq_servers(command, conf)[source]¶ Extract the urls/paths/… to use for the servers from the
confand store them.The following options of the
[urls]section are required:ocd_run_shot: url(s) used by theocd.run_shotmoduleocd_main_loop: url(s) used to emit events from the OCD main loopocd_allow_hetdex: url(s) used by theocd allow_hetdexcommandocd_db_replay: url(s) used by theocd db_replaycommandocd_config: url(s) used by theocd configcommand
The values of each one of them can be a list of comma separated URLs. The above names are the same used as input to the
get_zmq_server_urls(),get_zmq_server_url()andget_zmq_server()functions.All the urls are saved and returned by
get_zmq_server_urls(). The urls for the servers that can be initialized for the givencommandare stored and returned by theget_zmq_server_url().If multiple urls are provided for any of the above options, the following mechanism is used to decide which one to use:
- get the value of the option
n_{server_url}, with{server_url}one of the above names, in the[{command}]section, with{command}the value passed to this function; - if the section or option do not exist or is empty, use the first url of the list;
- otherwise the value should be an integer,
i, and the corresponding entry is stored;
Warning
In principle it is possible to use any ZeroMQ protocol that supports the PUB/SUB pattern. However, we cannot ensure that the publishers are always initialized before the receivers, therefore protocols like
inprocwill not work.Parameters: - command : string
name of the OCD subcommand that starts the servers
- conf :
pyhetdex.tools.configuration.ConfigParser configuration.
Raises: - OCDValueError
when the index for the url to save is not an integer
- OCDIndexError
when the index for the url to save is out of bound
-
ocd.utils.get_zmq_server(name)[source]¶ Create a
tcs_lib.server.ZMQServerobject inPUBmode, store and returns it. Subsequent calls with the samenamewill always return the same object.Parameters: - name : string
name of the server
Returns: Raises: - KeyError
if
namehas no associated url/path stored byinit_zmq_servers()
-
ocd.utils.get_zmq_server_url(name)[source]¶ Retrieve the zmq server url/path.
Parameters: - name : string
name of the server
Returns: - string
url to which the server is bind
-
ocd.utils.get_zmq_server_urls(name=None)[source]¶ Retrieve all the urls/paths passed via the configuration.
Parameters: - name : string, optional
if given, returns only the urls/paths for the give name, otherwise returns all
Returns: - list of strings
-
ocd.utils.timeout_context(timeout)[source]¶ Context manager that times out. If
timeoutis zero, timeout gets disabled (waits forever)Parameters: - timeout : int
seconds to wait before the timeout is hit
Raises: - TimeOutError
if the timeout is reached
Examples
>>> from ocd.errors import TimeOutError >>> try: ... with timeout_context(1): ... # do something very interesting ... pass ... except TimeOutError: ... print('Timeout hit') ... else: ... print('Finish executing') Finish executing >>> try: ... with timeout_context(1): ... import time ... time.sleep(2) ... except TimeOutError: ... print('Timeout hit') ... else: ... print('Finish executing') Timeout hit
-
ocd.utils.function_timeout(seconds, func, *args, **kwargs)[source]¶ Execute the function for at most
secondsseconds. If the execution finishes, gives back the function return values andFalse, otherwise returns(None, True).Parameters: - seconds : int
number of seconds to set for the timeout; only integers are allowed
- func :
callable to executed
- args, kwargs :
positional and keyword arguments to pass to
func
Returns: - returned :
return value of
funcorNoneif the timeout is triggered- timeout_hit : bool
whether the timeout has been hit
Examples
>>> import time >>> def my_func(i): ... time.sleep(i) ... return 42 >>> function_timeout(1, my_func, 0.1) (42, False) >>> function_timeout(1, my_func, 2) (None, True)
-
class
ocd.utils.SendAndListen(server, listener, n_attempts=5, interval=1, timeout=None)[source]¶ Bases:
objectSend messages through a ZeroMQ server and listen until a reply comes in or a timeout is hit.
Parameters: - server :
tcs_lib.server.ZMQServer server to use to send out events
- listener :
tcs_lib.tcs_event.TCSEvent event listener
- n_attempts : int, optional
maximum number of attempts, if
Noneor non positive, send forever;- interval : float, optional
interval between event sent, set negative to send events continuously
- timeout : int, optional
timeout used interrupt the listener (passed to
timeout_context()). IfNone, is computed fromn_attemptsandinterval, if possible, or set to 10 seconds
-
classmethod
from_addresses(server_address, listener_addresses, topics=None, **kwargs)[source]¶ Create a
tcs_lib.server.ZMQServerinstance usingserver_addressand atcs_lib.tcs_event.TCSEventusinglistener_addressandtopicsand then initialize the classParameters: - server_address : string
url/path/… to use to initialize the server
- listener_addresses : list of strings
url/path/… to use to initialize the listener
- topics : list of strings, optional
list of topics to pass to the listener
- kwargs :
positional arguments passed to the class constructor
-
classmethod
from_names(server_name, listener_name, topics=None, **kwargs)[source]¶ Get the server with
get_zmq_server()and the urls to listen to withget_zmq_server_urls()and then initialize the class.Parameters: - server_name, listener_name : string
names of the server and of the urls to listen to as saved in
init_zmq_servers()- topics : list of strings, optional
list of topics to pass to the listener
- kwargs :
positional arguments passed to the class constructor
-
send(self, topic, event)[source]¶ Send an
eventwith topictopicat mostn_attemptsand wait one everytimeoutseconds before attempting to send a new one.listen()can interrupt the loop.Parameters: - topic : string
topic of the event to send
- event : dict
payload of the event to send
-
listen(self, expected_topic=None, expected_event=None)[source]¶ Loop the
listeneruntil it gets an event confirming the action.Parameters: - expected_topic : string, optional
topic to listen to; if
Noneaccept any topic that thelistenerreturns- expected_event : dict, optional
if not
None, tcs events returned by the listener must containeventto be considered valid, otherwise every event is considered as valid; if valid, thesend()is interrupted
Returns: - out_topic : string
topic of the first accepted event
- out_event : dict
payload of the first accepted event
-
communicate(self, send_topic, send_event, expected_topic=None, expected_event=None)[source]¶ Run
send(), emittingsend_topicandsend_event, in aThread, while listening forexpected_topicandexpected_event.listen()timed out to avoid locks.Once
listen()returns, also the thread runningsend()is interrupted, event ifn_attemptsis set toNoneor negative. When an expected event is received, the sending thread is finished. This method can be called multiple times.Parameters: Returns: - out_topic, out_event :
same as the output parameters of
listen(),- timeout_hit : bool
if
True, the listener timed out before receiving the expected event. In this caseout_topicandout_eventare set toNone. IfFalse, the expected event has been received and returned in the above parameters
- server :