Source code for ocd.auto_schedule

# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''Functionality to schedule the next shot goes in here'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import subprocess as sp
import traceback

import astropy.io.ascii as astro_ascii
from astropy.time import TimeDelta
import numpy as np

from . import errors
from . import run_shot
from . import shots_db
from . import tcs_proxy
from . import utils


[docs]def run_autoschedule(conf, shot_list, julian_date, azimuth, seeing, transparency, sky_background): '''Run ``autoschedule_main`` and parse the output. Parameters ---------- conf : :class:`configparser.ConfigParser` configuration. It uses the following options of the ``[autoschedule]`` section: * ``exe_name``: name of the executable * ``init_file``, ``lae_dec``, ``moon_loc``, ``illum_tables``: name of the files necessary to run autoschedule shot_list : string name of the file with the shot positions julian_date : float Julian date for the start of the planning azimuth : float current telescope azimuth seeing, transparency, sky_background : float current conditions Returns ------- table : :class:`astropy.table.Table` parsed output from autoschedule Raises ------ ocd.errors.AutoscheduleError if the ``autoschedule_main`` command fails ''' sec = conf['autoschedule'] exe_name = sec['exe_name'] optionals = ['-i', sec['init_file'], '-l', sec['lae_dec'], '-m', sec['moon_loc'], '-t', sec['illum_tables']] positionals = [julian_date, azimuth, seeing, transparency, sky_background] if any(np.isnan(positionals)): msg = ('Some of the input numerical parameters is a "nan".' ' JD :{0}; az: {1}; seeing: {2}; transparency {3};' ' sky background: {4}'.format(*positionals)) raise errors.AutoscheduleError(msg) positionals = [shot_list] + [str(p) for p in positionals] command = [exe_name, ] + optionals + positionals process = sp.Popen(command, stdout=sp.PIPE, stderr=sp.PIPE, universal_newlines=True) stdout, stderr = process.communicate() if process.returncode != 0: msg = '"{}" returned a non-zero code ({}) because of\n"{}"' raise errors.AutoscheduleError(msg.format(' '.join(command), process.returncode, stderr)) if stderr != '': tcs_proxy.tcs_log.log_warn('Autoschedule reported an error: {}', stderr) table = astro_ascii.read(stdout, format='commented_header') return table
[docs]def get_next_shots(conf, azimuth, seeing, transparency, sky_background): '''Dump the shot list and run autoschedule. Parameters ---------- conf : :class:`configparser.ConfigParser` configuration. Passed to :func:`ocd.shots_db.create_shot_file`, :func:`run_autoschedule` azimuth : float current telescope azimuth seeing, transparency, sky_background : float current conditions Returns ------- table : :class:`astropy.table.Table` parsed output from autoschedule ''' shot_file = shots_db.create_shot_file(conf) jd = utils.get_time_jd() table = run_autoschedule(conf, shot_file, jd, azimuth, seeing, transparency, sky_background) return table
[docs]def prepare_shot_params(conf, autoschedule_shot): ''' * Convert a shot as produced by ``autoschedule_main`` to a dictionary passed to :func:`ocd.run_shot.run`. * Convert the exposure times from minutes to seconds. * Add the metadata stored in the :class:`~ocd.shots_db.ShotMetadata` table Parameters ---------- conf : :class:`configparser.ConfigParser` configuration. Passed to :func:`ocd.shots_db.create_shot_file`, :func:`run_autoschedule` autoschedule_shot : class:`astropy.table.Row` row of to convert to a dictionary Returns ------- shot_dict : dictionary dictionary of options ''' shot_dict = {} # some keys are not the same _key_mapping = {'track': 'ew'} for k in run_shot.shot_dict_keys + ['jd']: try: # first try to copy verbatim value = autoschedule_shot[k] except KeyError: # if the key is not found if k in _key_mapping: # use a different name value = autoschedule_shot[_key_mapping[k]] elif k == 'obs_number': # or get the observation number value = shots_db.get_obsnumber(conf) else: raise shot_dict[k] = value exp_keys = ['exp01', 'exp02', 'exp03'] for k in exp_keys: shot_dict[k] *= 60 # save the metadata in the shot_dict try: shot_dict['metadata'] = shots_db.get_metadata(shot_dict['shotid']) except errors.ShotDoesNotExist: tcs_proxy.tcs_log.log_warn('There are no metadata for shotid {}', shot_dict['shotid']) return shot_dict
[docs]class ShotRunner(object): '''Prepare and run a shot. Parameters ---------- conf : :class:`configparser.ConfigParser` configuration state_machine : :class:`ocd.states.BaseState` state machine used to trigger the shot run trigger_state : sting name of the state of ``state_machine`` to use as trigger for running the shot metrology : :class:`ocd.storage.MetrologyVault` vault containing the metrology information azimuth : :class:`ocd.storage.AzimuthVault` azimuth values from the telescope Attributes ---------- topics : list name of topics that trigger :meth:`handle_event`. It's taken from :attr:`state_machine.trigger_topics` ''' def __init__(self, conf, state_machine, trigger_state, metrology, azimuth): self._conf = conf self._state_machine = state_machine self._trigger_state = trigger_state self._metrology = metrology self._azimuth = azimuth # copy the topics self.topics = state_machine.trigger_topics[:] # log messages header self._log_header = '{}:'.format(self.__class__.__name__) # save the processes running a shot self._processes = {} # get the configuration options self._store_config_options()
[docs] def _store_config_options(self): '''Save the ``skip_shot_submission``, ``skip_shot_delta_sec`` and ``wait_shot_delta_sec`` options of the ``[autoschedule]`` section into :attr:`_skip_shot`, :attr:`_skip_shot_delta_sec` and :attr:`_wait_shot_delta_sec`''' # save configuration options autoschedule_sec = self._conf['autoschedule'] self._skip_shot = autoschedule_sec.getboolean('skip_shot_submission', fallback=False) self._skip_shot_delta_sec = autoschedule_sec['skip_shot_delta_sec'] self._skip_shot_delta_sec = TimeDelta(float(self._skip_shot_delta_sec), format='sec') self._wait_shot_delta_sec = autoschedule_sec['wait_shot_delta_sec'] self._wait_shot_delta_sec = TimeDelta(float(self._wait_shot_delta_sec), format='sec')
[docs] def handle_event(self, tcs_topic, tcs_event): '''When an event with the given topic arrives, decide whether it's time to run a shot. Parameters ---------- tcs_topic : string topic of the event tcs_event : dict event to handle, ignored Returns ------- bool whether the event triggers some computation ''' if tcs_topic not in self.topics: return False else: if self._state_machine.state == self._trigger_state: try: self._shot_runner() except errors.AutoscheduleError: msg = '{0} Autoschedule could not run because of:\n{1}' tcs_proxy.tcs_log.log_error(msg, self._log_header, traceback.format_exc()) except errors.ObsnumIntegrityError: msg = '{0} The shot cannot be run because of:\n{1}' tcs_proxy.tcs_log.log_error(msg, self._log_header, traceback.format_exc()) else: msg = '{0} It is not yet time to start a shot.' tcs_proxy.tcs_log.log_debug(msg, self._log_header) return True
[docs] def _pending_processes(self): '''Check if there are running processes and clear the them if they are done. Returns ------- pending : bool whether there are still running processes. ''' if any(p.poll() is None for (p, _) in self._processes.values()): msg = ('{0} A shot is already running (probably doing the' ' initial setup)') tcs_proxy.tcs_log.log_info(msg, self._log_header) return True for (k, (p, cmd)) in self._processes.items(): if p.poll() != 0: stdout, stderr = p.communicate() msg = ('{0} Running {1}.\n' 'Return code: {2}\n' 'stdout: {3}\nstderr: {4}') tcs_proxy.tcs_log.log_error(msg, self._log_header, cmd, p.returncode, stdout, stderr) # clear the processes dictionary self._processes.clear() return False
[docs] def _shot_runner(self): '''Collect all the information necessary and run a shot. In details: * check that a ``ocd run_shot`` process is not running * cleanup finished shots * call :func:`get_next_shots` to get the next shots * call :func:`prepare_shot_params` to prepare the values to pass to ``ocd run_shot`` * log the number of scheduled shots and the first available one; * run ``ocd run_shot`` in a :class:`subprocess.Popen` and store the process ''' if self._pending_processes(): return msg = ('{0} Running autoschedule_main for JD {1}, azimuth' ' {2}, seeing {3}, transparency {4} and sky magnitude' ' {5}') tcs_proxy.tcs_log.log_info(msg, self._log_header, utils.get_time_jd(), self._az, self._fwhm, self._transparency, self._skymag) next_shots = get_next_shots(self._conf, self._az, self._fwhm, self._transparency, self._skymag) if not len(next_shots): msg = '{0} Autoschedule did not schedule any shot' tcs_proxy.tcs_log.log_warn(msg, self._log_header) return first_shot = next_shots[0] # get the first shot of the list # get the time to the first shot in seconds time_delta = TimeDelta(first_shot['jd'] - utils.get_time_jd(), format='jd') # skip if the shot is too much in the future (issue #2228) if time_delta >= self._skip_shot_delta_sec: msg = ('{0} The next shot is planned to happen in {1:.1f} seconds.' ' Since this is too far in the future, the shot execution ' ' is skipped. The maximum number of seconds to wait for the' ' shot execution is {2:.0f} seconds') tcs_proxy.tcs_log.log_warn(msg, self._log_header, time_delta.sec, self._skip_shot_delta_sec.sec) return elif time_delta >= self._wait_shot_delta_sec: sleep_for = (time_delta - self._wait_shot_delta_sec).sec else: sleep_for = 0 # Create the dictionary to pass to :func:`ocd.run_shot.run` from the # first autoschedule shot shot_dict = prepare_shot_params(self._conf, first_shot) shot_dict['sleep_for'] = sleep_for msg = ('{0} Autoschedule planned the next {1} shots. The next shot,' ' planned to start in about {2} seconds, is:\n{3}') if sleep_for > 1e-2: msg += '\nThe telescope setup will start in about {4} seconds' if self._skip_shot: msg += ('\nThe shot execution is skipped, according to the' ' "skip_shot_submission" configuration variable') tcs_proxy.tcs_log.log_info(msg, self._log_header, len(next_shots), time_delta.sec, shot_dict, sleep_for) if not self._skip_shot: process_cmd = run_shot.run_shot_subprocess(self._conf, shot_dict) self._processes[shot_dict['shotid']] = process_cmd
@property def _fwhm(self): '''Get the fwhm to use to run the shot. Returns ------- float mean FWHM from the guiders ''' gp1 = self._metrology.fwhm_g1.median_masked gp2 = self._metrology.fwhm_g2.median_masked return np.nanmean([gp1, gp2]) @property def _skymag(self): '''Get the sky magnitude to use to run the shot. Returns ------- float mean sky magnitude from the guiders ''' gp1 = self._metrology.skymag_g1.median_masked gp2 = self._metrology.skymag_g2.median_masked return np.nanmean([gp1, gp2]) @property def _transparency(self): '''Get the sky magnitude to use to run the shot. Returns ------- float mean sky magnitude from the guiders ''' gp1 = self._metrology.transparency_g1.median_masked gp2 = self._metrology.transparency_g2.median_masked return np.nanmean([gp1, gp2]) @property def _az(self): '''Get and return the azimuth from the :attr:`AzimuthVault.azimuth`. If this is not found, try to get :attr:`AzimuthVault.azimuth_tmp` or return 180 Returns ------- float current azimuth ''' try: return self._azimuth.azimuth.latest except AttributeError: try: return self._azimuth.azimuth_tmp.latest except AttributeError: return 180
[docs] def update_config(self, conf): '''Update the local copy of the configuration file and :attr:`ref_values`. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` new configuration object Returns ------- string message to report back ''' self._conf = conf self._store_config_options() return ('ShotRunner: the configuration object and the local' ' variable storing "[autoschedule]skip_shot_submission",' ' "[autoschedule]skip_shot_delta_sec" and' ' "[autoschedule]wait_shot_delta_sec" options have been' ' updated')