Source code for ocd.storage

# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''This module contains the classes that act as a storage for data needed by
OCD to take decisions.'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import abc
from collections import deque
import numbers
import operator

import numpy as np
import six

from .config import get_value
from .errors import OCDTypeError, OCDValueError
from . import utils
from . import tcs_proxy


[docs]@six.add_metaclass(abc.ABCMeta) class OCDVault(object): '''The Vault is handed events and knows which one it can store and where. Each variable is stored into a :class:`Container` instance, or child thereof, and all the instances are exposed as attributes. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration object Attributes ---------- conf same as input topics container_names ''' def __init__(self, conf): self.conf = conf # Map between names and Container instances. __getattr__ allows to # access the Container instances in the dictionary as instance # attributes self._containers = {} # Map between topics that the OCDVault can handle and list of # functions/methods that can handle them. The handlers are either # methods of the classes in _containers or helper functions self._topic_funcs = {} for cname, container, topic, handler in self.create_containers(): self._containers[cname] = container try: self._topic_funcs[topic].append(handler) except KeyError: self._topic_funcs[topic] = [handler, ]
[docs] @abc.abstractmethod def create_containers(self): # pragma: no cover '''Generator that yields the containers, their names, the topics they handlers and the topic handlers. Yields ------ container_name : string name of the container container : :class:`Container` container that stores the information topic : string name of the topic handled by the container handler : callable function that handle the topic. Typically is :meth:`ContainerEventMeta.handle_event` ''' pass
[docs] def store_event(self, topic, event): '''If the topic is relevant, call the functions/methods to store the event. Parameters ---------- topic : string topic of the event event : dict event to handle Returns ------- bool whether the event has been handled or not ''' if topic in self._topic_funcs: event_handlers = self._topic_funcs[topic] for eh in event_handlers: eh(event) return True else: return False
@property def topics(self): '''Returns the list of topics the vault can handle''' return list(self._topic_funcs.keys()) @property def container_names(self): '''Return the list of container names''' return list(self._containers.keys()) def __getattr__(self, name): '''Allow to access the containers as instance attributes Parameters ---------- name : string name to search ''' try: return self._containers[name] except KeyError: msg = "'{}' object has no attribute '{}'" msg = msg.format(self.__class__.__name__, name) raise AttributeError(msg) def __getitem__(self, name): '''Allow to access the containers using dictionary-like notation Parameters ---------- name : string name to search ''' return self._containers[name]
[docs]class MetrologyVault(OCDVault): '''Vault containing metrology data. Examples -------- >>> from ocd.config import load_config >>> from ocd import tcs_proxy >>> conf = load_config() >>> tcs_proxy.init(conf) >>> ocd_vault = MetrologyVault(conf) >>> ocd_vault.store_event('pas.Guider1.metrology_data', ... {'fit.gauss_par(3)': 42, ... 'fit.gauss_par(4)': 42, ... 'filter.magnitude': 42, ... 'photometry.fixed_skymag': 42, ... 'photometry.kron_skymag': 42, ... 'photometry.fixed_mag': 42, ... 'photometry.kron_mag': 42, ... 'image_quality': 1.2, ... 'transparency': 0.8, ... 'plate_scale.x': 0.196, ... 'plate_scale.y': 0.196, ... '__data_time': 43, }) True >>> # get the Container instance for the fwhm quantity >>> fwhm_container1 = ocd_vault.fwhm_g1 >>> fwhm_container2 = ocd_vault['fwhm_g1'] >>> fwhm_container1 == fwhm_container2 True All the relevant metrology values from the ``pas.Guider{1,2}.metrology_data`` are stored, however if the values of any of the following keys is ``true`` the value is masked: * ``photometry.object_at_image_border`` * ``photometry.object_in_bad_image_region`` * ``photometry.star_ambiguous`` * ``photometry.star_not_found`` * ``photometry.unreliable_background`` .. todo:: fix the ``transparency_g*`` description when the missing documentation arrive. See also :class:`ContainerTransparency`. Attributes ---------- fwhm_g1, fwhm_g2 : :class:`ContainerFWHM` converts the values of ``object.fwhm`` to arcsec using ``plate_scale.x`` and ``plate_scale.y`` from the ``pas.Guider1.metrology_data`` and ``pas.Guider2.metrology_data`` event respectively and store it skymag_g1, skymag_g2 : :class:`ContainerEventKey` store the values of the sky magnitude from the ``pas.Guider1.metrology_data`` and ``pas.Guider2.metrology_data`` event respectively. The name of the key is defined by the ``photometry_skymag`` option of the ``[containers]`` section of the configuration transparency_g1, transparency_g2 : :class:`ContainerTransparency` compute and store the transparency from the ``pas.Guider1.metrology_data`` and ``pas.Guider2.metrology_data`` event respectively. It uses the star magnitude, ``filter.magnitude`` and illumination correction keywords. The name of the star magnitude is defined by the ``photometry_trans`` option of the ``[containers]`` section of the configuration; the illumination correction is set to ``0.86`` and can be modified using the ``illumination_correction`` option. '''
[docs] def create_containers(self): '''Generator that yields the containers, their names, the topics they handlers and the topic handlers. Yields ------ container_name : string name of the container container : :class:`Container` container that stores the information topic : string name of the topic handled by the container handler : callable function that handle the topic. Typically is :meth:`ContainerEventMeta.handle_event` ''' # function that returns True if some of the keys in the event that tell # us if the star is bad check_keys = ('photometry.object_at_image_border', 'photometry.object_in_bad_image_region', 'photometry.star_ambiguous', 'photometry.star_not_found', 'photometry.unreliable_background') has_bad_photometry = utils.compare_with_value(operator.eq, any, 'true', *check_keys) # get information from the configuration file sky_phot_type = self.conf['containers']['photometry_skymag'] maxlen = get_value(self.conf, 'containers', 'maxlen', cast_to=int) delta_timestamp = get_value(self.conf, 'containers', 'delta_timestamp', cast_to=float) # add the containers to store the fwhm, sky brightness and transparency for guider in [1, 2]: topic = 'pas.Guider{}.metrology_data'.format(guider) # TODO: cleanup the following code if it not used anymore # fwhm and sky brightness # attr_keys = [['fwhm_g{}'.format(guider), 'object.fwhm'], # ['skymag_g{}'.format(guider), sky_phot_type]] # sky brightness attr_keys = [['skymag_g{}'.format(guider), sky_phot_type]] for attr_name, event_key in attr_keys: container = self._container_event_key(event_key, mask_func=has_bad_photometry) yield attr_name, container, topic, container.handle_event # transparency and fwhm attr_class = [['transparency_g{}'.format(guider), ContainerTransparency], ['fwhm_g{}'.format(guider), ContainerFWHM]] for attr_name, Class_ in attr_class: container = Class_(self.conf, mask_func=has_bad_photometry, maxlen=maxlen, delta_timestamp=delta_timestamp) yield attr_name, container, topic, container.handle_event
[docs] def _container_event_key(self, event_key, mask_func=None): '''Instantiate a :class:`ContainerEventKey` and returns it. Parameters ---------- event_key : string name of the key in the event whose value must be stored mask_func : callable, optional mask function to pass to the ContainerEventKey; defaults to ``None`` Returns ------- container : :class:`ContainerEventKey` ''' maxlen = get_value(self.conf, 'containers', 'maxlen', cast_to=int) delta_timestamp = get_value(self.conf, 'containers', 'delta_timestamp', cast_to=float) container = ContainerEventKey(event_key, mask_func=mask_func, maxlen=maxlen, delta_timestamp=delta_timestamp) return container
[docs]class AzimuthVault(OCDVault): '''Vault for storing azimuth. Attributes ---------- azimuth : :class:`ContainerSkipMasked` store the values of ``az``, if the ``setup`` is done (``"true"``), from the ``tcs.root.ra_dec`` event. Only a few azimuth values are retained by OCD, as only the latest one is used. azimuth_tmp : :class:`ContainerSkipMasked` store the values of ``az``, if the ``setup`` is **not** done (``"false"``), from the ``tcs.root.ra_dec`` event. Only a few azimuth values are retained by OCD, as only the latest one is used. This can be used as a fallback in case the :attr:`azimuth` is empty. '''
[docs] def create_containers(self): '''Generator that yields the containers, their names, the topics they handlers and the topic handlers. Yields ------ container_name : string name of the container container : :class:`Container` container that stores the information topic : string name of the topic handled by the container handler : callable function that handle the topic. Typically is :meth:`ContainerEventMeta.handle_event` ''' for attr_name, setup_mask in [['azimuth', 'false'], ['azimuth_tmp', 'true']]: setup_flag = utils.compare_with_value(operator.eq, any, setup_mask, 'setup') container = ContainerSkipMasked('az', mask_func=setup_flag, maxlen=10) yield (attr_name, container, 'tcs.root.ra_dec', container.handle_event)
[docs]class Container(object): '''Container to store a list of n values, their time stamps and their mask. Quantities like mean, median and standard deviation are exposed as properties. .. warning:: time stamps are assumed to be seconds "since the epoch" as returned e.g. by :func:`time.time`. All the tests are run assuming this. Parameters ---------- maxlen : int, optional maximum length of the underlying storage delta_timestamp : number, optional maximum time difference for the values stored in the class in seconds. It must be a positive number or ``None`` When adding a new value, discard all the values older than ``delta_timestamp`` with respect to the value timestamp; if ``None`` the timestamp is ignored Raises ------ ocd.errors.OCDTypeError if ``delta_timestamp`` is not a number ocd.errors.OCDValueError if ``delta_timestamp`` is negative Attributes ---------- latest latest_masked mean median stddev mean_masked median_masked stddev_masked ''' # The implementation uses collections.deques for ease of use. If this is # too slow the implementation can be updated in the future # deque exampes: # https://stackoverflow.com/questions/4151320/efficient-circular-buffer # numpy example: # https://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/ # in numpy we shouldn't use append/delete method as they make copies def __init__(self, maxlen=50, delta_timestamp=None): self._values = deque(maxlen=maxlen) self._timestamps = deque(maxlen=maxlen) self._masks = deque(maxlen=maxlen) # make sure that the delta timestamp make sense if delta_timestamp is not None: if not isinstance(delta_timestamp, numbers.Number): msg = ('"delta_timestamp" must be a number, not a' ' "{}" with value "{}"') raise OCDTypeError(msg.format(type(delta_timestamp), delta_timestamp)) if delta_timestamp <= 0: raise OCDValueError('"delta_timestamp" must be positive not' ' "{}"'.format(delta_timestamp)) self._delta_timestamp = delta_timestamp
[docs] def add_value(self, value, timestamp, mask=False): '''Store the new value and time stamp. If a ``delta_timestamp`` is given any entry older than ``timestamp - delta_timestamp`` are removed The code assumes that the values are added with increasing time stamp. Parameters ---------- value : number new value to store timestamp : float timestamp in seconds associated to the value mask : bool, optional whether the value is to be masked or not. It follows the convention of :ref:`numpy masked arrays <maskedarray>`. By default no value is masked ''' self._values.append(value) self._timestamps.append(timestamp) self._masks.append(mask) if self._delta_timestamp: min_timestamp = timestamp - self._delta_timestamp while True: # the ``while`` guards the popleft from exceptions if # _delta_timestamp is negative, which would cause the queue to # be (always empty). if self._timestamps[0] < min_timestamp: self._values.popleft() self._timestamps.popleft() self._masks.popleft() else: break
@property def _masked_values(self): '''Returns a :class:`~numpy.ndarray` representation containing only the non masked values.''' if self._masks: return np.array(self._values)[~np.array(self._masks)] else: return [] @property def mean(self): '''Return the mean of the stored values or ``NaN`` if no value is present''' return np.mean(self._values) @property def median(self): '''Return the mean of the stored values or ``NaN`` if no value is present''' return np.median(self._values) @property def stddev(self): '''Return the standard deviation of the stored values or ``NaN`` if no value is present''' return np.std(self._values, ddof=1) @property def mean_masked(self): '''Return the mean of the non masked stored values or ``NaN`` if no value is present or the mask is all ``True``''' return np.mean(self._masked_values) @property def median_masked(self): '''Return the median of the non masked stored values or ``NaN`` if no value is present or the mask is all ``True``''' return np.median(self._masked_values) @property def stddev_masked(self): '''Return the standard deviation of the non masked stored values or ``NaN`` if no value is present or the mask is all ``True``''' return np.std(self._masked_values, ddof=1) @property def latest(self): '''Returns the last value added. Raises an :exc:`AttributeError` if no value is found.''' try: return self._values[-1] except IndexError: msg = ("'{}' object has no attribute 'latest'." " No value is saved in the container.") raise AttributeError(msg.format(self.__class__.__name__)) @property def latest_masked(self): '''Returns the last non masked value. Raises an :exc:`AttributeError` if no value is found.''' try: return self._masked_values[-1] except IndexError: msg = ("'{}' object has no attribute 'latest_masked'." " Either no value is saved in the container or" " all values are all masked.") raise AttributeError(msg.format(self.__class__.__name__))
[docs]@six.add_metaclass(abc.ABCMeta) class ContainerEventMeta(Container): '''ABC class that extend the :class:`Container` class adding a method to handle a TCS event and store a value derived from it. Derived classes must reimplement the :meth:`value_from_event`. Parameters ---------- time_key : string, optional name of the keyword containing the time stamp. Default: ``__data_time`` mask_func : callable, optional function that receives the event passed to :meth:`handle_event` as input and should return ``True`` if the value is to be masked, ``False`` otherwise. If not given or ``None``, no value will be masked args, kwargs : positional and keyword parameters passed to the parent class ''' def __init__(self, *args, **kwargs): self._time_key = kwargs.pop('time_key', '__data_time') self._mask_func = kwargs.pop('mask_func', None) if self._mask_func is None: self._mask_func = lambda x: False super(ContainerEventMeta, self).__init__(*args, **kwargs)
[docs] def handle_event(self, event): '''Call :meth:`value_from_event` to extract a value from the event, get the time stamp and, if required, the mask and save it. Parameters ---------- event : dict event to handle ''' value = self.value_from_event(event) timestamp = event[self._time_key] mask = self._mask_func(event) self.add_value(value, timestamp, mask=mask)
[docs] @abc.abstractmethod def value_from_event(self, event): # pragma: no cover '''Extract one or more values from the event and do the necessary calculations. Returns a single value as result, typically a number. Implement this function in derived classes Parameters ---------- event : dict event to handle Returns ------- result : float or other value ''' pass
[docs]class ContainerEventKey(ContainerEventMeta): '''Implementation of :class:`ContainerEventMeta` class that extract and save a single value for a TCS event. Parameters ---------- key : string when handling the event, get the value for the given key and store it args, kwargs : positional and keyword parameters passed to the parent class ''' def __init__(self, key, *args, **kwargs): super(ContainerEventKey, self).__init__(*args, **kwargs) self._key = key
[docs] def value_from_event(self, event): '''Extract the value of ``key`` from the event and return it. ''' return event[self._key]
[docs]class ContainerTransparency(ContainerEventMeta): '''Implementation of :class:`ContainerEventMeta` class that computes the transparency from a TCS event. On top of the masking described in :class:`MetrologyVault`, this container also masks negative values of the ``filter.magnitude`` value, on the assumption that if no guiding nor photometry is performed on stars with negative magnitude and that the "unset" value is ``-9.999``. The illumination correction has been included in the guider fotometry. .. warning:: We set the default illumination correction to 1. If necessary this value can be overridden with the ``illumination_correction`` option of the ``[containers]`` configuration option. This might be eventually removed. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration object args, kwargs : same as in :class:`ContainerEventMeta` ''' def __init__(self, conf, *args, **kwargs): # get the information key to check cont_sec = conf['containers'] self._trans_key = cont_sec['photometry_trans'] trans_min, trans_max = conf.get_list('containers', 'valid_transparency', cast_to=float) # self._mag_key = cont_sec['photometry_trans'] # self._filter_key = 'filter.magnitude' # key for the illumination # TODO: set it temporarily to the illumination correction value # self._illumination = cont_sec.getfloat('illumination_correction', # fallback=1) # modify the ``mask_func`` to filter also negative magnitudes has_bad_photometry = kwargs.get('mask_func') def mask_func(event): mask = event[self._trans_key] < trans_min or \ event[self._trans_key] > trans_max if has_bad_photometry is not None: mask |= has_bad_photometry(event) return mask kwargs['mask_func'] = mask_func super(ContainerTransparency, self).__init__(*args, **kwargs)
[docs] def value_from_event(self, event): '''Calculate the transparency as: .. math:: \\frac{10 ^ {-0.4 * (mag - filter\\_mag)}}{illum\\_corr} where * ``mag`` is the measured magnitude of the star and is stored in the key defined by the ``photometry_trans`` option of the ``[containers]`` section; * ``filter_mag`` is the reference magnitude of the star, as communicated to TCS, and is stored in the ``filter.magnitude`` key; * ``illum_corr`` is the illumination correction for the current position of the tracker; it is set to 1 or the value of the ``illumination_correction`` option of the ``[containers]`` section; Parameters ---------- event : dict event to handle Returns ------- transparency : float ''' transparency = event[self._trans_key] # filter_mag = event[self._filter_key] # TODO: remove the following when the illumination key is known # illumination_correction = self._illumination # TODO: uncomment the following when the illumination key is known # illumination_correction = event[self._illumination] # exponent = -0.4 * (magnitude - filter_mag) # transparency = 10 ** exponent / illumination_correction tcs_proxy.tcs_log.log_debug('ContainerTransparency: the transparency' ' is {}', transparency) return transparency
[docs]class ContainerFWHM(ContainerEventMeta): '''Implementation of :class:`ContainerEventMeta` class that computes the FWHM in arcseconds as: .. math:: \\sqrt{(\\sigma_x * pscale_x) * (\\sigma_y * pscale_y)} where the ``sigma_x`` and ``sigma_y`` are the x and y standard deviation, in pixels, from the Gaussian fit of the guide star profile (``fit.gauss_par(3)`` and ``fit.gauss_par(3)``) and ``pscale_x`` and ``pscale_y`` are the pixels scale (``plate_scale.x`` and ``plate_scale.y``). If either ``fit.gauss_par(3)`` or ``fit.gauss_par(4)`` is zero or negative, the value of fwhm is masked. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration object args, kwargs : same as in :class:`ContainerEventMeta` ''' def __init__(self, conf, *args, **kwargs): # Key containing the Image quality cont_sec = conf['containers'] self._iq_key = cont_sec['image_quality'] iq_min, iq_max = conf.get_list('containers', 'valid_image_quality', cast_to=float) # keys to use to extract the information # self._sigma_x = 'fit.gauss_par(3)' # self._sigma_y = 'fit.gauss_par(4)' # self._plate_scale_x = 'plate_scale.x' # self._plate_scale_y = 'plate_scale.y' # modify the ``mask_func`` to filter also null fit.gauss_par(*) has_bad_photometry = kwargs.get('mask_func') def mask_func(event): # mask = ((event[self._sigma_x] <= 0) | (event[self._sigma_y] <= 0)) mask = event[self._iq_key] < iq_min or \ event[self._iq_key] > iq_max if has_bad_photometry is not None: mask |= has_bad_photometry(event) return mask kwargs['mask_func'] = mask_func super(ContainerFWHM, self).__init__(*args, **kwargs)
[docs] def value_from_event(self, event): '''Calculate the fwhm in arcseconds. Parameters ---------- event : dict event to handle Returns ------- FWHM : float ''' # sigma_x = event[self._sigma_x] # sigma_y = event[self._sigma_y] # pscale_x = event[self._plate_scale_x] # pscale_y = event[self._plate_scale_y] # fwhm = ((sigma_x * pscale_x) * # (sigma_y * pscale_y)) ** 0.5 fwhm = event[self._iq_key] tcs_proxy.tcs_log.log_debug('ContainerFWHM: the FWHM is {} arcsec', fwhm) return fwhm
[docs]class ContainerSkipMasked(ContainerEventKey): '''Container to store only values that are not masked. '''
[docs] def add_value(self, *args, **kwargs): '''This method calls :meth:`Container.add_value` if ``mask`` is ``False``. Otherwise does nothing Parameters ---------- all : see :meth:`Container.add_value` ''' is_masked = kwargs.get('mask', False) if is_masked: pass else: super(ContainerSkipMasked, self).add_value(*args, **kwargs)