# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''Functionality to schedule the next shot goes in here'''
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import subprocess as sp
import traceback
import astropy.io.ascii as astro_ascii
from astropy.time import TimeDelta
import numpy as np
from . import errors
from . import run_shot
from . import shots_db
from . import tcs_proxy
from . import utils
[docs]def run_autoschedule(conf, shot_list, julian_date, azimuth, seeing,
transparency, sky_background):
'''Run ``autoschedule_main`` and parse the output.
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration. It uses the following options of the ``[autoschedule]``
section:
* ``exe_name``: name of the executable
* ``init_file``, ``lae_dec``, ``moon_loc``, ``illum_tables``: name of
the files necessary to run autoschedule
shot_list : string
name of the file with the shot positions
julian_date : float
Julian date for the start of the planning
azimuth : float
current telescope azimuth
seeing, transparency, sky_background : float
current conditions
Returns
-------
table : :class:`astropy.table.Table`
parsed output from autoschedule
Raises
------
ocd.errors.AutoscheduleError
if the ``autoschedule_main`` command fails
'''
sec = conf['autoschedule']
exe_name = sec['exe_name']
optionals = ['-i', sec['init_file'], '-l', sec['lae_dec'],
'-m', sec['moon_loc'], '-t', sec['illum_tables']]
positionals = [julian_date, azimuth, seeing, transparency, sky_background]
if any(np.isnan(positionals)):
msg = ('Some of the input numerical parameters is a "nan".'
' JD :{0}; az: {1}; seeing: {2}; transparency {3};'
' sky background: {4}'.format(*positionals))
raise errors.AutoscheduleError(msg)
positionals = [shot_list] + [str(p) for p in positionals]
command = [exe_name, ] + optionals + positionals
process = sp.Popen(command, stdout=sp.PIPE, stderr=sp.PIPE,
universal_newlines=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
msg = '"{}" returned a non-zero code ({}) because of\n"{}"'
raise errors.AutoscheduleError(msg.format(' '.join(command),
process.returncode,
stderr))
if stderr != '':
tcs_proxy.tcs_log.log_warn('Autoschedule reported an error: {}',
stderr)
table = astro_ascii.read(stdout, format='commented_header')
return table
[docs]def get_next_shots(conf, azimuth, seeing, transparency, sky_background):
'''Dump the shot list and run autoschedule.
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration. Passed to :func:`ocd.shots_db.create_shot_file`,
:func:`run_autoschedule`
azimuth : float
current telescope azimuth
seeing, transparency, sky_background : float
current conditions
Returns
-------
table : :class:`astropy.table.Table`
parsed output from autoschedule
'''
shot_file = shots_db.create_shot_file(conf)
jd = utils.get_time_jd()
table = run_autoschedule(conf, shot_file, jd, azimuth, seeing,
transparency, sky_background)
return table
[docs]def prepare_shot_params(conf, autoschedule_shot):
'''
* Convert a shot as produced by ``autoschedule_main`` to a dictionary
passed to :func:`ocd.run_shot.run`.
* Convert the exposure times from minutes to seconds.
* Add the metadata stored in the :class:`~ocd.shots_db.ShotMetadata` table
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration. Passed to :func:`ocd.shots_db.create_shot_file`,
:func:`run_autoschedule`
autoschedule_shot : class:`astropy.table.Row`
row of to convert to a dictionary
Returns
-------
shot_dict : dictionary
dictionary of options
'''
shot_dict = {}
# some keys are not the same
_key_mapping = {'track': 'ew'}
for k in run_shot.shot_dict_keys + ['jd']:
try: # first try to copy verbatim
value = autoschedule_shot[k]
except KeyError: # if the key is not found
if k in _key_mapping: # use a different name
value = autoschedule_shot[_key_mapping[k]]
elif k == 'obs_number': # or get the observation number
value = shots_db.get_obsnumber(conf)
else:
raise
shot_dict[k] = value
exp_keys = ['exp01', 'exp02', 'exp03']
for k in exp_keys:
shot_dict[k] *= 60
# save the metadata in the shot_dict
try:
shot_dict['metadata'] = shots_db.get_metadata(shot_dict['shotid'])
except errors.ShotDoesNotExist:
tcs_proxy.tcs_log.log_warn('There are no metadata for shotid {}',
shot_dict['shotid'])
return shot_dict
[docs]class ShotRunner(object):
'''Prepare and run a shot.
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration
state_machine : :class:`ocd.states.BaseState`
state machine used to trigger the shot run
trigger_state : sting
name of the state of ``state_machine`` to use as trigger for running
the shot
metrology : :class:`ocd.storage.MetrologyVault`
vault containing the metrology information
azimuth : :class:`ocd.storage.AzimuthVault`
azimuth values from the telescope
Attributes
----------
topics : list
name of topics that trigger :meth:`handle_event`. It's taken from
:attr:`state_machine.trigger_topics`
'''
def __init__(self, conf, state_machine, trigger_state, metrology, azimuth):
self._conf = conf
self._state_machine = state_machine
self._trigger_state = trigger_state
self._metrology = metrology
self._azimuth = azimuth
# copy the topics
self.topics = state_machine.trigger_topics[:]
# log messages header
self._log_header = '{}:'.format(self.__class__.__name__)
# save the processes running a shot
self._processes = {}
# get the configuration options
self._store_config_options()
[docs] def _store_config_options(self):
'''Save the ``skip_shot_submission``, ``skip_shot_delta_sec`` and
``wait_shot_delta_sec`` options of the ``[autoschedule]`` section into
:attr:`_skip_shot`, :attr:`_skip_shot_delta_sec` and
:attr:`_wait_shot_delta_sec`'''
# save configuration options
autoschedule_sec = self._conf['autoschedule']
self._skip_shot = autoschedule_sec.getboolean('skip_shot_submission',
fallback=False)
self._skip_shot_delta_sec = autoschedule_sec['skip_shot_delta_sec']
self._skip_shot_delta_sec = TimeDelta(float(self._skip_shot_delta_sec),
format='sec')
self._wait_shot_delta_sec = autoschedule_sec['wait_shot_delta_sec']
self._wait_shot_delta_sec = TimeDelta(float(self._wait_shot_delta_sec),
format='sec')
[docs] def handle_event(self, tcs_topic, tcs_event):
'''When an event with the given topic arrives, decide whether it's time
to run a shot.
Parameters
----------
tcs_topic : string
topic of the event
tcs_event : dict
event to handle, ignored
Returns
-------
bool
whether the event triggers some computation
'''
if tcs_topic not in self.topics:
return False
else:
if self._state_machine.state == self._trigger_state:
try:
self._shot_runner()
except errors.AutoscheduleError:
msg = '{0} Autoschedule could not run because of:\n{1}'
tcs_proxy.tcs_log.log_error(msg, self._log_header,
traceback.format_exc())
except errors.ObsnumIntegrityError:
msg = '{0} The shot cannot be run because of:\n{1}'
tcs_proxy.tcs_log.log_error(msg, self._log_header,
traceback.format_exc())
else:
msg = '{0} It is not yet time to start a shot.'
tcs_proxy.tcs_log.log_debug(msg, self._log_header)
return True
[docs] def _pending_processes(self):
'''Check if there are running processes and clear the them if they are
done.
Returns
-------
pending : bool
whether there are still running processes.
'''
if any(p.poll() is None for (p, _) in self._processes.values()):
msg = ('{0} A shot is already running (probably doing the'
' initial setup)')
tcs_proxy.tcs_log.log_info(msg, self._log_header)
return True
for (k, (p, cmd)) in self._processes.items():
if p.poll() != 0:
stdout, stderr = p.communicate()
msg = ('{0} Running {1}.\n'
'Return code: {2}\n'
'stdout: {3}\nstderr: {4}')
tcs_proxy.tcs_log.log_error(msg, self._log_header, cmd,
p.returncode, stdout, stderr)
# clear the processes dictionary
self._processes.clear()
return False
[docs] def _shot_runner(self):
'''Collect all the information necessary and run a shot.
In details:
* check that a ``ocd run_shot`` process is not running
* cleanup finished shots
* call :func:`get_next_shots` to get the next shots
* call :func:`prepare_shot_params` to prepare the values to pass to
``ocd run_shot``
* log the number of scheduled shots and the first available one;
* run ``ocd run_shot`` in a :class:`subprocess.Popen` and store the
process
'''
if self._pending_processes():
return
msg = ('{0} Running autoschedule_main for JD {1}, azimuth'
' {2}, seeing {3}, transparency {4} and sky magnitude'
' {5}')
tcs_proxy.tcs_log.log_info(msg, self._log_header, utils.get_time_jd(),
self._az, self._fwhm, self._transparency,
self._skymag)
next_shots = get_next_shots(self._conf, self._az, self._fwhm,
self._transparency, self._skymag)
if not len(next_shots):
msg = '{0} Autoschedule did not schedule any shot'
tcs_proxy.tcs_log.log_warn(msg, self._log_header)
return
first_shot = next_shots[0] # get the first shot of the list
# get the time to the first shot in seconds
time_delta = TimeDelta(first_shot['jd'] - utils.get_time_jd(),
format='jd')
# skip if the shot is too much in the future (issue #2228)
if time_delta >= self._skip_shot_delta_sec:
msg = ('{0} The next shot is planned to happen in {1:.1f} seconds.'
' Since this is too far in the future, the shot execution '
' is skipped. The maximum number of seconds to wait for the'
' shot execution is {2:.0f} seconds')
tcs_proxy.tcs_log.log_warn(msg, self._log_header, time_delta.sec,
self._skip_shot_delta_sec.sec)
return
elif time_delta >= self._wait_shot_delta_sec:
sleep_for = (time_delta - self._wait_shot_delta_sec).sec
else:
sleep_for = 0
# Create the dictionary to pass to :func:`ocd.run_shot.run` from the
# first autoschedule shot
shot_dict = prepare_shot_params(self._conf, first_shot)
shot_dict['sleep_for'] = sleep_for
msg = ('{0} Autoschedule planned the next {1} shots. The next shot,'
' planned to start in about {2} seconds, is:\n{3}')
if sleep_for > 1e-2:
msg += '\nThe telescope setup will start in about {4} seconds'
if self._skip_shot:
msg += ('\nThe shot execution is skipped, according to the'
' "skip_shot_submission" configuration variable')
tcs_proxy.tcs_log.log_info(msg, self._log_header, len(next_shots),
time_delta.sec, shot_dict, sleep_for)
if not self._skip_shot:
process_cmd = run_shot.run_shot_subprocess(self._conf, shot_dict)
self._processes[shot_dict['shotid']] = process_cmd
@property
def _fwhm(self):
'''Get the fwhm to use to run the shot.
Returns
-------
float
mean FWHM from the guiders
'''
gp1 = self._metrology.fwhm_g1.median_masked
gp2 = self._metrology.fwhm_g2.median_masked
return np.nanmean([gp1, gp2])
@property
def _skymag(self):
'''Get the sky magnitude to use to run the shot.
Returns
-------
float
mean sky magnitude from the guiders
'''
gp1 = self._metrology.skymag_g1.median_masked
gp2 = self._metrology.skymag_g2.median_masked
return np.nanmean([gp1, gp2])
@property
def _transparency(self):
'''Get the sky magnitude to use to run the shot.
Returns
-------
float
mean sky magnitude from the guiders
'''
gp1 = self._metrology.transparency_g1.median_masked
gp2 = self._metrology.transparency_g2.median_masked
return np.nanmean([gp1, gp2])
@property
def _az(self):
'''Get and return the azimuth from the :attr:`AzimuthVault.azimuth`. If
this is not found, try to get :attr:`AzimuthVault.azimuth_tmp` or
return 180
Returns
-------
float
current azimuth
'''
try:
return self._azimuth.azimuth.latest
except AttributeError:
try:
return self._azimuth.azimuth_tmp.latest
except AttributeError:
return 180
[docs] def update_config(self, conf):
'''Update the local copy of the configuration file and
:attr:`ref_values`.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
new configuration object
Returns
-------
string
message to report back
'''
self._conf = conf
self._store_config_options()
return ('ShotRunner: the configuration object and the local'
' variable storing "[autoschedule]skip_shot_submission",'
' "[autoschedule]skip_shot_delta_sec" and'
' "[autoschedule]wait_shot_delta_sec" options have been'
' updated')