# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''Module to run a shot.
This module emits the following events:
+------------------------------+----------------------------------------------+
| Topic | Event |
+==============================+==============================================+
| ocd.run_shot.run | exec_status (:attr:`EXEC_STATUS`): whether |
| | it is the start or the end of the run |
+ +----------------------------------------------+
| | error (bool): if ``True`` and error occurred |
| | when running the shot |
+ +----------------------------------------------+
| | exc_type (string): type of the exception |
+ +----------------------------------------------+
| | exc_value (string): representation of the |
| | exception |
+ +----------------------------------------------+
| | traceback (string): full error traceback |
+ +----------------------------------------------+
| | the content of the ``shot_dict`` as passed |
| | to :func:`run` |
+------------------------------+----------------------------------------------+
| ocd.run_shot.setup_telescope | exec_status (:attr:`EXEC_STATUS`): whether |
| | it is the start or the end of the telescope |
| | setup |
+ +----------------------------------------------+
| | the content of the ``shot_dict`` as passed |
| | to :func:`run` |
+------------------------------+----------------------------------------------+
| ocd.run_shot.exp_hetdex | exec_status (:attr:`EXEC_STATUS`): whether |
| | it is the start or the end of a HETDEX |
| | exposure |
+ +----------------------------------------------+
| | exposure (int): exposure number |
+ +----------------------------------------------+
| | seconds (float): exposure time |
+ +----------------------------------------------+
| | observation (int): observation number, same |
| | as obs_number |
+ +----------------------------------------------+
| | object, shotid (str): id of the shot |
+ +----------------------------------------------+
| | recipe (str): script name and version |
+ +----------------------------------------------+
| | x_binning, y_binning (int): binning of the |
| | output fits files |
+ +----------------------------------------------+
| | type (str): type of observation |
+ +----------------------------------------------+
| | recipe (str): script name and version |
+ +----------------------------------------------+
| | the content of the ``shot_dict`` as passed |
| | to :func:`run` |
+------------------------------+----------------------------------------------+
'''
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from collections import namedtuple, OrderedDict
import configparser
import contextlib
import copy
import json
import os
import shutil
import shlex
import subprocess as sp
import time
import traceback
import numpy as np
from . import config as ocd_config
from . import errors
from . import heart_beat
from . import utils
from . import tcs_proxy
from . import __version__
track_mapping = {0: 'EAST', 1: 'WEST'}
'''Map 0/1 to EAST/WEST'''
track_mapping_initial = {k: v[0] for k, v in track_mapping.items()}
'''Map the keys of :attr:`track_mapping` to the first letter of the values'''
inverse_track_mapping = {v: k for k, v in track_mapping.items()}
'''Invert key <-> value of :attr:`track_mapping`'''
RUN_TOPIC = 'ocd.run_shot.run'
'''Topic of the events emitted by :func:`run`'''
SETUP_TOPIC = 'ocd.run_shot.setup_telescope'
'''Topic of the events emitted by :func:`setup_telescope`'''
EXP_TOPIC = 'ocd.run_shot.exp_hetdex'
'''Topic of the events emitted by :func:`do_exposure`'''
ALL_TOPICS = [RUN_TOPIC, SETUP_TOPIC, EXP_TOPIC]
'''List of all the topics emitted in this module'''
ExecStatus = namedtuple('ExecStatus', 'START, FINISH')
EXEC_STATUS = ExecStatus(START=0, FINISH=1)
'''Enumerator that tracks the execution status of each of the tasks that emit
events'''
# name of the configuration option of the ``run_shot`` configuration section
# that is used to know whether to send out events.
# NOTE: this option should be used only internally to switch the sending of
# events off when running a shot for the command line. It is set in
# ``_run_shot_cmd`` and not documented anywhere else on purpose. If it is set
# to ``false`` when running OCD proper will very likely break it.
_emit_events = 'emit_events'
# run the shot from the configuration file
_arguments = OrderedDict()
_arguments['shotid'] = {'help': 'ID of the shot to run'}
_arguments['obs_number'] = dict(help='Counter for the current observation.'
' The number is used to set the directory'
' name where the output fits files are'
' saved.', type=int)
_arguments['ra'] = dict(help='Right ascension of the shot', type=float)
_arguments['dec'] = dict(help='Declination of the shot', type=float)
_arguments['azimuth'] = dict(help='Azimuth at which the observation'
'is performed', type=float)
_arguments['track'] = dict(help='Whether it is EAST or WEST track',
choices=[0, 1], type=int)
_arguments['exp01'] = dict(help='Exposure time of the first exposure in'
' seconds', type=float)
_arguments['exp02'] = dict(help='Exposure time of the second exposure in'
' seconds', type=float)
_arguments['exp03'] = dict(help='Exposure time of the third exposure in'
' seconds', type=float)
shot_dict_keys = list(_arguments.keys())
'''List of the mandatory keys of the ``shot_dict`` used as input in
:func:`run`'''
[docs]def run(conf, shot_dict):
'''Retrieve the information precomputed by shuffle, build the commands to
setup the instrument and do the exposure.
Send out an event when starting and when finishing. Any error is captured
and logged.
Some of the file and directory names can be build dynamically using `format
syntax <https://docs.python.org/3/library/string.html#formatstrings>`_. The
following fields are used: ``{shotid}``, ``{ra}``, ``{dec}``,
``{track_name}``, ``{track_number}``, ``{track_initial}``.
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot:
* shotid (string): id of the shot
* obs_number (int): observation number in the night (used for the
name of the directory where the files are saved)
* ra, dec (float): ra and dec of the shot
* track (int): 0: East, 1: West
* azimuth (float): azimuth at which to execute the shot
* exp01, exp02, exp03 (floats): exposure times for the three exposures
* sleep_for (float), optional: sleep for ``sleep_for`` seconds between
sending the starting event and beginning to setup the telescope
* metadata (dict), optional: set of metadata to send to TCS
'''
send_event = conf['run_shot'].getboolean(_emit_events, fallback=True)
if send_event:
internal_server = utils.get_zmq_server('ocd_run_shot')
# create the start event and send it
event = dict(exec_status=EXEC_STATUS.START, error=False, exc_type=None,
exc_value=None, traceback=None)
event.update(shot_dict)
if send_event:
internal_server.send_tcs_event(RUN_TOPIC, event)
time.sleep(shot_dict.get('sleep_for', 0))
event = copy.deepcopy(event)
try: # run the shot
_run(conf, shot_dict)
except Exception as e: # if any exception is raised, report it
play_sound(conf, 'failure_sound')
tb = traceback.format_exc()
event.update(error=True, exc_type=type(e).__name__,
exc_value=repr(e), traceback=tb)
msg = 'HETDEX shot {0} failed with error {1} and traceback:\n{2}'
tcs_proxy.tcs_log.log_error(msg, shot_dict['shotid'], e, tb)
# If required re-raise the error.
if conf['run_shot'].getboolean('raise_exception'):
raise
finally: # just mark the event as finished
# make sure that the metadata are reset whatever it happens
tcs_proxy.tcs.set_metadata()
event['exec_status'] = EXEC_STATUS.FINISH
if send_event:
internal_server.send_tcs_event(RUN_TOPIC, event)
[docs]def _run(conf, shot_dict):
'''Actual implementation of :func:`run`.
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot
'''
shotid = shot_dict['shotid']
track = shot_dict['track']
# keywords used to format the file names
format_map = {k: shot_dict[k] for k in ['shotid', 'ra', 'dec']}
format_map.update(track_number=track,
track_name=track_mapping[track],
track_initial=track_mapping_initial[track])
format_map.update(shot_dict.get('metadata', {}))
# load the shuffle configuration file
fname_template = conf['run_shot']['shuffle_conf_template']
shuffle_conf = retrieve_shuffle_info(fname_template.format(**format_map))
validate_shuffle_conf(conf, shuffle_conf, shot_dict)
tcs_proxy.tcs_log.log_info('Shot {0}: shuffle configuration file loaded',
shotid)
# copy the acam image
acam_dest = copy_acam(conf, shuffle_conf, format_map)
tcs_proxy.tcs_log.log_info('Shot {0}: ACAM image copied to {1}',
shotid, acam_dest)
# setup the telescope
tcs_proxy.tcs_log.log_info('Communicating the new coordinates to the'
' telescope')
setup_telescope(conf, shuffle_conf, shot_dict)
tcs_proxy.tcs_log.log_info('Telescope ready to execute HETDEX shot {0}',
shotid)
# run a HETDEX shot
with reset_probes(conf):
run_hetdex(conf, shot_dict)
tcs_proxy.tcs_log.log_info('HETDEX shot {0} done', shotid)
[docs]def retrieve_shuffle_info(fname):
'''Search and load the configuration file created by shuffle.
Parameters
----------
fname: string
name of the configuration file
Returns
-------
conf : :class:`configparser.ConfigParser`
loaded configuration file from shuffle
Raises
------
IOError
if the configuration file does not exist
configparser.ParsingError
Exception raised when errors occur attempting to parse a file
'''
conf = configparser.ConfigParser()
with open(fname) as f:
conf.read_file(f)
return conf
[docs]def validate_shuffle_conf(conf, shuffle_conf, shot_dict):
'''Make sure that the configuration is correct.
Compare the expected ``ra``, ``dec``, ``azimuth`` and ``track`` with the
corresponding entries in the ``[trajectory]`` section.
Parameters
----------
conf : :class:`configparser.ConfigParser`
application configuration file (for now not used, future proofed)
shuffle_conf : :class:`configparser.ConfigParser`
configuration file from shuffle
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot.
Same as :func:`run`
Raises
------
ocd.errors.ShuffleConfigError
if the ra, dec and track from shuffle do not agree with the ones from
OCD
'''
shuffle_dict = utils.cast_dictionary(shuffle_conf['trajectory'],
strings=['dir', ],
floats=['ra', 'dec', 'az'])
shuffle_dict['dir'] = inverse_track_mapping[shuffle_dict['dir']]
tol_dict = utils.cast_dictionary(conf['run_shot'],
floats=['abs_tol_ra', 'abs_tol_dec',
'abs_tol_azimuth',
'abs_tol_track'])
for k_shot, k_shuffle in zip(['ra', 'dec', 'azimuth', 'track'],
['ra', 'dec', 'az', 'dir']):
shot_value = shot_dict[k_shot]
shuffle_value = shuffle_dict[k_shuffle]
abs_tol = tol_dict['abs_tol_{}'.format(k_shot)]
same_value = utils.isclose(shot_value, shuffle_value, abs_tol=abs_tol)
if not same_value:
msg = ('The values of {k} from OCD {v} and from shuffle {v_s}'
' differ more than the allowed absolute tolerance {at}')
raise errors.ShuffleConfigError(msg.format(k=k_shot,
v=shot_value,
v_s=shuffle_value,
at=abs_tol))
[docs]def copy_acam(conf, shuffle_conf, format_map):
'''Copy the acam image from the shuffle directory to to a target file.
Parameters
----------
conf : :class:`configparser.ConfigParser`
application configuration file
shuffle_conf : :class:`configparser.ConfigParser`
configuration file from shuffle
format_map : dict
dictionary used to format the file names from the configuration files
Returns
-------
acam_dest
destination file name
'''
# construct the ACAM file name
shuffle_shot_dir = conf['run_shot']['shuffle_shot'].format(**format_map)
acam_file_name = shuffle_conf['image']['acam_output']
acam_image = os.path.join(shuffle_shot_dir, acam_file_name)
# destination file name for the acam image
acam_dest = conf['run_shot']['acam_dest_file']
# copy the file
shutil.copy(acam_image, acam_dest)
return acam_dest
[docs]def setup_telescope(conf, shuffle_conf, shot_dict):
'''Set up the telescope and wait for the TO/RA to confirm that the setup is
complete.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
application configuration file
shuffle_conf : :class:`configparser.ConfigParser`
configuration file from shuffle
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot.
Same as :func:`run`
'''
run_sec = conf['run_shot']
send_event = run_sec.getboolean(_emit_events, fallback=True)
if send_event:
internal_server = utils.get_zmq_server('ocd_run_shot')
# create and event and send it
event = dict(exec_status=EXEC_STATUS.START)
event.update(shot_dict)
if send_event:
internal_server.send_tcs_event(SETUP_TOPIC, event)
event = copy.deepcopy(event)
# try to cancel the existing trajectory
_cancel_trajectory()
# load the next trajectory
_load_tracjectory(shuffle_conf, shot_dict)
# for each probe give the name (as used in tcssubsystem method names) and
# the name of the corresponding section in the shuffle config file
probe_sections = [['Guider1', 'guider1'], ['Guider2', 'guider2'],
['WFS1', 'wfs1'], ['WFS2', 'wfs2']]
# set the position of the probes
for probe, section in probe_sections:
_set_probe_position(conf, shuffle_conf, shot_dict, probe, section)
# go to the next position
go_next_sec = shuffle_conf['go_next']
go_next_kwargs = utils.cast_dictionary(go_next_sec,
strings=['move_structure',
'move_dome',
'move_probes'],
binary_strings=True)
tcs_proxy.tcs.go_next(**go_next_kwargs)
# set the object id and the magnitudes of the probe stars
for probe, section in probe_sections:
_set_probe_object(conf, shuffle_conf, shot_dict, probe, section)
# then do some more setups
_set_probes_fiductial(conf, shuffle_conf, shot_dict)
# set the metadata
_set_metadata(conf, shot_dict)
# and then play a sound
play_sound(conf, 'setup_sound')
# wait for the TO/RA to mark the setup as complete
_wait_for_setup(conf, shot_dict)
event['exec_status'] = EXEC_STATUS.FINISH
if send_event:
internal_server.send_tcs_event(SETUP_TOPIC, event)
[docs]def _cancel_trajectory():
'''Try to cancel an existing trajectory. If the command fails because no
active trajectory is found, send a warning log. Any other failure is
re-raised.
'''
try:
tcs_proxy.tcs.cancel_trajectory()
except tcs_proxy.errors.error as e:
if 'No currently active trajectory' in str(e):
tcs_proxy.tcs_log.log_warn(str(e))
else:
raise
[docs]def _load_tracjectory(shuffle_conf, shot_dict):
'''Load the trajectory
Parameters
----------
shuffle_conf : :class:`configparser.ConfigParser`
configuration file from shuffle
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot.
Same as :func:`run`
'''
# load the next trajectory
# save the parameters of interest
traj_kwargs = utils.cast_dictionary(shuffle_conf['trajectory'],
floats=['equinox', ])
traj_kwargs.update(utils.cast_dictionary(shot_dict, ints=['track', ],
floats=['ra', 'dec', 'azimuth']))
traj_kwargs['dir'] = track_mapping[traj_kwargs.pop('track')]
traj_kwargs['az'] = traj_kwargs.pop('azimuth')
tcs_proxy.tcs.load_trajectory(**utils.binary_dict(traj_kwargs))
[docs]def _set_probe_position(conf, shuffle_conf, shot_dict, probe_name,
section_name):
'''Execute the commands necessary to setup the probes ra and dec
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
application configuration file
shuffle_conf : :class:`configparser.ConfigParser`
configuration file from shuffle
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot.
Same as :func:`run`
probe_name, section_name : string
name of the probe and of the section of the ``shuffle_conf`` containing
the information from the probe
'''
conf_sec = shuffle_conf[section_name]
# set the position
set_position = getattr(tcs_proxy.tcs,
'{p}_set_position'.format(p=probe_name))
set_pos_kwargs = utils.cast_dictionary(conf_sec, floats=['ra', 'dec',
'equinox'],
strings=['trajectory', ],
binary_strings=True)
set_position(**set_pos_kwargs)
[docs]def _set_probe_object(conf, shuffle_conf, shot_dict, probe_name, section_name):
'''Execute the commands necessary to setup the probes object id and
magnitudes
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
application configuration file
shuffle_conf : :class:`configparser.ConfigParser`
configuration file from shuffle
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot.
Same as :func:`run`
probe_name, section_name : string
name of the probe and of the section of the ``shuffle_conf`` containing
the information from the probe
'''
conf_sec = shuffle_conf[section_name]
# set the object name and the magnitude
probe_type = section_name.strip('12')
shuffle_filters_opt = '{}_shuffle_filters'.format(probe_type)
pas_filters_opt = '{}_pas_filters'.format(probe_type)
if probe_type == 'guider':
shuffle_filters = conf.get_list('run_shot', shuffle_filters_opt)
pas_filters = conf.get_list('run_shot', pas_filters_opt)
else: # no filters passed to wfs
shuffle_filters, pas_filters = [], []
set_mag = getattr(tcs_proxy.pas,
'{p}_SetObjectAndMagnitudes'.format(p=probe_name))
set_mag_kwargs = utils.cast_dictionary(conf_sec, strings=['id', ],
floats=shuffle_filters)
# reassign "id" to "object" and the filter names
for from_key, to_key in zip(['id', ] + shuffle_filters,
['object', ] + pas_filters):
set_mag_kwargs[to_key] = set_mag_kwargs.pop(from_key)
set_mag(**utils.binary_dict(set_mag_kwargs))
[docs]def _set_probes_fiductial(conf, shuffle_conf, shot_dict):
'''Setup the analysis region and the fiducial position and initialize the
WFS exposure times.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
application configuration file
shuffle_conf : :class:`configparser.ConfigParser`
configuration file from shuffle
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot.
Same as :func:`run`
'''
tcs_proxy.tcs.Guider1_set_analysis_region(xmin=55, ymin=25,
xmax=132, ymax=111)
tcs_proxy.tcs.Guider1_set_fiducial(xpixel=93.5, ypixel=67.5)
tcs_proxy.tcs.Guider2_set_analysis_region(xmin=46, ymin=29,
xmax=129, ymax=113)
tcs_proxy.tcs.Guider2_set_fiducial(xpixel=87.5, ypixel=70.5)
tcs_proxy.pas.WFS1_SetExposureTime(time=5)
tcs_proxy.pas.WFS2_SetExposureTime(time=5)
[docs]def message_expstart(conf, exptime, inst='VIRUS'):
'''
Trigger the exposure time clock
The name of the binary to execute and the options come from the
``message_board`` option of the ``[run_shot]`` configuration section.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
application configuration file
exptime : float
Exposure time to display
inst : string (optional)
Instrument name to send to the message boad command
'''
sec = conf['run_shot']
cmd = shlex.split(sec['message_board'].format(exptime=exptime))
try:
p = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
except Exception as e:
tcs_proxy.tcs_log.log_warn('There was a problem displaying the '
'exposure time on the message board with '
'the command "{}". The exception was',
' '.join(cmd), str(e))
return
stdout, stderr = p.communicate()
if p.returncode != 0:
tcs_proxy.tcs_log.log_warn('There was a problem displaying the '
'exposure time on the message board with'
' the command "{}". The error message is'
' "{}"', ' '.join(cmd), stderr)
[docs]def play_sound(conf, file_option):
'''Play a sound.
The name of the binary to execute the sounds and the options come from the
``play_exe`` option of the ``[run_shot]`` configuration section.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
application configuration file
file_option : string
name of the configuration option of the ``[run_shot]`` section
containing the name of the file to play
'''
sec = conf['run_shot']
play_exe = shlex.split(sec['play_exe'])
sound_file = sec[file_option]
cmd = play_exe + [sound_file, ]
try:
p = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE)
except Exception as e:
tcs_proxy.tcs_log.log_warn('There was a problem playing the sound with'
'the command "{}". The exception was: ',
' '.join(cmd), str(e))
return
stdout, stderr = p.communicate()
if p.returncode != 0:
tcs_proxy.tcs_log.log_warn('There was a problem playing the sound with'
' the command "{}". The error message is'
' "{}"', ' '.join(cmd), stderr)
[docs]def _wait_for_setup(conf, shot_dict):
'''Wait for the confirmation that the setup is done. If the
``wait_for_setup_timeout`` option of the ``[run_shot]`` configuration
section is positive, add a timeout to the waiting. If the
``continue_on_timeout`` option of the ``[run_shot]`` configuration section
is ``false``, abort the shot if the timeout is hit; if it is ``true`` run
the shot even if the timeout is hit. Any other exception is re-raised.
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
application configuration file
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot.
Same as :func:`run`
'''
run_sec = conf['run_shot']
tcs_proxy.tcs_log.log_info('Wait for confirmation that the setup for the'
' shot {} is ready', shot_dict['shotid'])
timeout = run_sec.getfloat('wait_for_setup_timeout')
try:
tcs_proxy.tcs.wait_for_setup(timeout=timeout)
except tcs_proxy.errors.error as e:
continue_on_timeout = run_sec.getboolean('continue_on_timeout')
if 'Timeout waiting for setup' in str(e) and continue_on_timeout:
msg = ('The setup has not be confirmed, but the shot execution'
' continues, as instructed by the ``continue_on_timeout``'
' configuration variable.')
tcs_proxy.tcs_log.log_warn(msg)
else:
raise
[docs]@contextlib.contextmanager
def reset_probes(conf):
'''Before yielding stop the ACAM and set the storage of guider images.
On return from yield, deactivate the probe images storage and deploy the
ACAM mirror.
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration
'''
try:
tcs_proxy.tcs.ACQ_stop()
store_kwargs = utils.binary_dict({'store': 'true'})
tcs_proxy.pas.Guider1_SetStoreImages(**store_kwargs)
tcs_proxy.pas.Guider2_SetStoreImages(**store_kwargs)
yield
finally:
no_store_kwargs = utils.binary_dict({'store': 'false'})
tcs_proxy.pas.Guider1_SetStoreImages(**no_store_kwargs)
tcs_proxy.pas.Guider2_SetStoreImages(**no_store_kwargs)
tcs_proxy.pas.WFS1_SetStoreImages(**no_store_kwargs)
tcs_proxy.pas.WFS2_SetStoreImages(**no_store_kwargs)
state_kwargs = utils.binary_dict({'state': 'false'})
tcs_proxy.tcs.update_setup(**state_kwargs)
tcs_proxy.pfip.DeployACQCameraMirror()
[docs]def run_hetdex(conf, shot_dict):
'''Run a hetdex shot
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot.
Same as :func:`run`
'''
send_event = conf['run_shot'].getboolean(_emit_events, fallback=True)
shotid = shot_dict['shotid']
if send_event:
internal_server = utils.get_zmq_server('ocd_run_shot')
event = dict(exec_status=EXEC_STATUS.START)
event.update(shot_dict)
retract_mirrors()
tcs_proxy.tcs_log.log_info("Enable the shutter.")
tcs_proxy.virus.enable_shutter()
dithers, delta_shifts = get_dithers(conf)
# common options to pass to the ``virus.expose`` command
expose_kwargs = dict(observation=shot_dict['obs_number'], object=shotid,
recipe=__name__ + " v" + __version__,
x_binning=2, y_binning=1, type='sci')
# make sure that whatever happens the dither mechanism start with 1 and is
# reset to 1
with ensure_first_dither():
exposure_times = [shot_dict[k] for k in ['exp01', 'exp02', 'exp03']]
zipped = zip(dithers, delta_shifts, exposure_times)
# loop over the exposures
for n_exposure, (dither_pos, shift, exp_time) in enumerate(zipped, 1):
# update the exposure arguments with the things for the current
# shot
expose_kwargs.update(exposure=n_exposure, seconds=exp_time)
exp_event = copy.deepcopy(event)
exp_event.update(expose_kwargs)
if send_event:
internal_server.send_tcs_event(EXP_TOPIC, exp_event)
exp_event = copy.deepcopy(exp_event)
message_expstart(conf, exp_time)
do_exposure(conf, shot_dict, expose_kwargs, dither_pos,
shift, len(dithers))
play_sound(conf, 'finish_exp_{}_sound'.format(n_exposure))
exp_event['exec_status'] = EXEC_STATUS.FINISH
if send_event:
internal_server.send_tcs_event(EXP_TOPIC, exp_event)
[docs]def retract_mirrors():
'''Retract the ACAM and CWFS mirrors
'''
tcs_proxy.tcs_log.log_info("Retract ACAM and the CWFS/PV pickoff mirror.")
tcs_proxy.pfip.RetractACQCameraMirror()
tcs_proxy.pfip.RetractCWFSCameraMirror()
[docs]@contextlib.contextmanager
def ensure_first_dither():
'''Make sure that the dither position is set to 1 at the beginning and at
the end, no matter what '''
try:
tcs_proxy.pfip.AdjustDither(pos=1)
yield
finally:
tcs_proxy.pfip.AdjustDither(pos=1)
[docs]def get_dithers(conf):
'''Get the dither positions and dither shifts
Examples
--------
>>> import configparser
>>> conf = configparser.ConfigParser()
>>> conf.read_dict({'run_shot': {'dither_with_probes': 'false'}})
>>> get_dithers(conf)
([1, 2, 3], [None, None, None])
>>> conf['run_shot']['dither_with_probes'] = 'true'
>>> dithers, delta_shifts = get_dithers(conf)
>>> dithers
[1, 1, 1]
>>> delta_shifts
array([[ 1.27, -0.73],
[ 0. , 1.46],
[-1.27, -0.73]])
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration. Uses the ``dither_with_probes`` option of the
``[general]`` section
Returns
-------
dither_pos : list
dither positions, either ``[1, 2, 3]`` or ``[1, 1, 1]``
delta_shifts : ndarray
shifts to apply to the fiducial position of the guide probes. If the
dither positions are different, a list of three ``None`` is returned,
otherwise the following::
[[x_shift_1, y_shift_1],
[x_shift_2, y_shift_2],
[x_shift_3, y_shift_3]]
'''
dither_with_probes = conf.getboolean('run_shot', 'dither_with_probes')
if dither_with_probes:
dither_pos = [1, 1, 1]
# get the offset to apply to the guider for each dither
dithers = np.array([[0, 0], [-1.27, 0.73], [-1.27, -0.73]])
delta_shifts = dithers - np.roll(dithers, -1, axis=0)
else:
dither_pos = [1, 2, 3]
delta_shifts = [None, ]*3
return dither_pos, delta_shifts
[docs]def do_exposure(conf, shot_dict, expose_kwargs, dither_pos, guider_shift,
tot_exposures):
'''Run a single exposure
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration
shot_dict : dictionary
dictionary with the relevant information needed to run the next shot.
Same as :func:`run`
expose_kwargs : dictionary
keyword arguments for the ``virus.expose`` call
dither_pos : int
dither position
guider_shift : None or list of two elements
if not ``None``, shift the fiducial position of the guide probes by
``dx = guider_shift[1]`` and ``dy = guider_shift[0]``
tot_exposures : int
total number of exposures
'''
shotid = shot_dict['shotid']
n_exposure = expose_kwargs['exposure']
# set the dither mechanism to the correct position
tcs_proxy.pfip.AdjustDither(pos=dither_pos)
tcs_proxy.tcs_log.log_info("Starting exposure number {1} of shot"
" {0}. Expose for {2} seconds.", shotid,
n_exposure, expose_kwargs['seconds'])
# do the exposure and wait for it to finish
exp_response = tcs_proxy.virus.expose(**utils.binary_dict(expose_kwargs))
exp_response_kwargs = utils.cast_dictionary(exp_response,
ints=['observation',
'exposure'],
binary_strings=True)
tcs_proxy.virus.wait_for_shutter_close(**exp_response_kwargs)
tcs_proxy.tcs_log.log_info("Completed exposure {1} of shot {0}",
shotid, n_exposure)
is_last_exp = (n_exposure == tot_exposures)
# if the guide probe offset is required do it
if (guider_shift is not None) and (not is_last_exp):
offset_kwargs = dict(dx_asec=guider_shift[1], dy_asec=guider_shift[0],
compensate='true')
msg = ('Offsetting the guide probes fiducial position by dx={0}'
' and dy={1}')
tcs_proxy.tcs_log.log_info(msg, offset_kwargs['dx_asec'],
offset_kwargs['dy_asec'])
offset_kwargs = utils.binary_dict(offset_kwargs)
tcs_proxy.tcs.Guider1_offset_fiducial(**offset_kwargs)
tcs_proxy.tcs.Guider2_offset_fiducial(**offset_kwargs)
wait_last_readout = conf.getboolean('run_shot', 'wait_last_readout')
wait_for_readout = (not is_last_exp) or wait_last_readout
if wait_for_readout:
msg = 'Wait for exposure {0} to be read out'
tcs_proxy.tcs_log.log_info(msg, n_exposure)
try:
tcs_proxy.virus.wait_for_readout(**exp_response_kwargs)
except tcs_proxy.errors.error as e:
if "No active exposure" in str(e):
tcs_proxy.tcs_log.log_warn('The data has been read out'
' so fast that exposure {} of'
' observation {} is not active'
' anymore',
exp_response_kwargs['exposure'],
exp_response_kwargs['observation'])
else:
raise
else:
msg = '*Do not* wait for exposure {0} to be read out'
tcs_proxy.tcs_log.log_info(msg, n_exposure)
[docs]def check_fiter_map(conf):
'''Make sure that the shuffle and the pas filters from the configuration
file have the same number of names
Parameters
----------
conf : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration entries
Raises
------
OCDValueError
if there is missmatch between the two lists
'''
for probe in ['guider', ]:
# do a loop so that if we end up adding also WFS the work is minimal
shuffle_filters = conf.get_list('run_shot',
'{}_shuffle_filters'.format(probe))
pas_filters = conf.get_list('run_shot', '{}_pas_filters'.format(probe))
if len(shuffle_filters) != len(pas_filters):
msg = ('The number of filters for the probe {} to read from the'
' shuffle configuration file ({}) must be the same as the'
' number of filter names used by pas ({})')
msg = msg.format(probe, len(shuffle_filters), len(pas_filters))
raise errors.OCDValueError(msg)
[docs]def run_shot_parser(subparsers):
'''Add a subcommand "run_shot" to run a hetdex shot.
Parameters
----------
subparsers : argparse subparsers object
subparser to use to generate new parsers
Returns
-------
parser : :class:`argparse.ArgumentParser`
modified parser
'''
import argparse as ap
parser = subparsers.add_parser('run_shot', help='Run a hetdex shot',
description='''Subcommand to run a single
hetdex shot''',
formatter_class=ap.ArgumentDefaultsHelpFormatter)
parser.set_defaults(func=_run_shot_cmd)
parser = utils.override_epilog_msg(parser)
for name, options in _arguments.items():
parser.add_argument(name, **options)
parser = ocd_config.config_file_argument(parser)
parser.add_argument('-e', '--emit-events',
help='''Emit events via ZeroMQ. If used, emits a number
of events using the internal OCD ZeroMQ servers. Use
with caution, in particular if OCD is running.''',
action='store_true')
parser.add_argument('-c', '--wait-for-connection', action='store_true',
help='''Try to establish a connection with the OCD main
loop before starting to run the shot. This option is
used only if ``-e/--emit-events`` is activated. If the
connection is not established within 10 seconds, abort
the execution.''')
parser.add_argument('-t', '--timeout', type=int, default=10,
help='''If ``-c/--wait-for-connection`` is given, wait
at most '%(dest)s' seconds before aborting''')
parser.add_argument('-m', '--metadata', type=json.loads,
help='''Dictionary of metadata to send to TCS. It must
be a JSON string encoding a dictionary. ``json.loads``
is used to deserialize the string to a python
dictionary. Example: -m '{"test": 42, "key":
"value"}'.''')
title = 'Override options in the [run_shot] section'
overrides_run_shot = parser.add_argument_group(title=title)
overrides_run_shot.add_argument('-d', '--dither-with-probes',
choices=['yes', 'no'],
dest='setting__run_shot__dither_with_probes',
help='''Use the guide probes for
dithering''')
overrides_run_shot.add_argument('-w', '--wait-last-readout',
choices=['yes', 'no'],
dest='setting__run_shot__wait_last_readout',
help='''Wait for the last readout to happen
before exiting''')
overrides_run_shot.add_argument('-N', '--n-ocd-run-shot',
dest='setting__run_shot__n_ocd_run_shot',
metavar='N_OCD_RUN_SHOT', help='''Which of
the urls for the ``ocd_run_shot`` option of
the ``[urls]`` section use to setup a ZMQ
server''')
overrides_run_shot.add_argument('--shuffle-shot',
dest='setting__run_shot__shuffle_shot',
metavar='SHUFFLE_SHOT', help='''Directory
containing the shuffle output
directories''')
overrides_run_shot.add_argument('--shuffle-conf-template',
dest='setting__run_shot__shuffle_conf_template',
metavar='SHUFFLE_CONF_TEMPLATE',
help='''Template for the name of the
configuration files generated by
shuffle''')
overrides_run_shot.add_argument('--acam-dest-file',
dest='setting__run_shot__acam_dest_file',
metavar='ACAM_DEST_FILE', help='''File
where to copy the ACAM image''')
title = 'Override options in the [urls] section'
overrides_urls = parser.add_argument_group(title=title)
overrides_urls.add_argument('--ocd-run-shot',
dest='setting__urls__ocd_run_shot',
metavar='OCD_RUN_SHOT', help='''Urls/paths used
to send the signal when running a shot. Used
only if ``-e/--emit-events`` is enabled.''',
nargs='+')
return subparsers
[docs]def _run_shot_cmd(args):
'''Entry point for running a shot from the command line
Parameters
----------
args : :class:`~argparse.Namespace`
parsed command line arguments
'''
config = ocd_config.load_config(config_file=args.config, args=args)
# Check that the filter names make sense
check_fiter_map(config)
# inject into the configuration file, under the ``[run_shot]`` section, the
# ``emit_event`` option.
# NOTE: this option is not in the OCD configuration file on purpose.
config['run_shot'][_emit_events] = 'yes' if args.emit_events else 'no'
# inject into the configuration file, under the ``[run_shot]`` section, the
# ``raise_exception`` option.
# NOTE: this option is not in the OCD configuration file on purpose.
config['run_shot']['raise_exception'] = 'yes'
# initialize various pieces and bits necessary to run OCD
# the proxy must always be the first one to initialize after loading the
# configuration file as it provides the logging functionality to the rest
# of the code
tcs_proxy.init(config)
# If required, initialized the ocd servers and add their urls to the list
# to pass to the listener
if args.emit_events:
setup_for_events(args.subcommand, config, args.wait_for_connection,
timeout=args.timeout)
shot_dict = {k: getattr(args, k) for k in shot_dict_keys}
if args.metadata:
shot_dict['metadata'] = args.metadata
# run the shot
run(config, shot_dict)
[docs]def setup_for_events(subcommand, config, wait_for_connection, timeout=10):
'''Initialise the zmq servers and, if necessary wait for the connection to
be established.
Parameters
----------
subcommand : string
name of the subcommand
config : :class:`pyhetdex.tools.configuration.ConfigParser`
configuration entries
wait_for_connection : bool
whether it has to wait for the connection to be established
timeout : int, option
if ``wait_for_connection`` is ``True`` wait at most ``timeout`` seconds
before aborting
Raises
------
ocd.errors.TimeOutError
if the timeout is hit
'''
utils.init_zmq_servers(subcommand, config)
tcs_proxy.tcs_log.log_info('OCD ZeroMQ servers initialized')
if wait_for_connection:
hb = heart_beat.HeartBeatQuery.from_names('ocd_run_shot',
'ocd_main_loop',
n_attempts=None,
interval=0.2,
timeout=timeout)
_, _, timeout_hit = hb.communicate()
if timeout_hit:
msg = ('No connection with the OCD main loop could be established.'
'\nIf ``ocd run`` is nor running, do not use the '
' ``-c/--wait-for-connection``.\n'
'If ``ocd run`` is running, make sure that the'
' urls/addresses of the servers and listeners match'
' and that there are no network (if using TCP) or file'
' permission (if using IPC) problems')
raise errors.TimeOutError(msg)
[docs]def run_shot_subprocess(conf, shot_dict):
'''Run the shot in a subprocess.
Parameters
----------
conf : :class:`configparser.ConfigParser`
configuration. Passed to :func:`ocd.shots_db.create_shot_file`,
:func:`run_autoschedule`
shot_dict : dictionary
dictionary of options; see :func:`run` for more info
Returns
-------
p : :class:`subprocess.Popen`
process running the shot
cmd : string
command executed
'''
conf_file = ocd_config.save_config(conf)
exe = ['ocd', 'run_shot']
arguments = [shot_dict[k] for k in shot_dict_keys]
options = ['-e', '-c', '--config', conf_file]
metadata = shot_dict.get('metadata', None)
if metadata is not None:
options += ['--metadata', json.dumps(metadata)]
cmd = exe + options + arguments
cmd = [str(i) for i in cmd]
p = sp.Popen(cmd) # , stdout=sp.PIPE, stderr=sp.PIPE)
return p, ' '.join(cmd)