Source code for ocd.run_shot


# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''Module to run a shot.

This module emits the following events:

+------------------------------+----------------------------------------------+
| Topic                        | Event                                        |
+==============================+==============================================+
| ocd.run_shot.run             | exec_status (:attr:`EXEC_STATUS`): whether   |
|                              | it is the start or the end of the run        |
+                              +----------------------------------------------+
|                              | error (bool): if ``True`` and error occurred |
|                              | when running the shot                        |
+                              +----------------------------------------------+
|                              | exc_type (string): type of the exception     |
+                              +----------------------------------------------+
|                              | exc_value (string): representation of the    |
|                              | exception                                    |
+                              +----------------------------------------------+
|                              | traceback (string): full error traceback     |
+                              +----------------------------------------------+
|                              | the content of the ``shot_dict`` as passed   |
|                              | to :func:`run`                               |
+------------------------------+----------------------------------------------+
| ocd.run_shot.setup_telescope | exec_status (:attr:`EXEC_STATUS`): whether   |
|                              | it is the start or the end of the telescope  |
|                              | setup                                        |
+                              +----------------------------------------------+
|                              | the content of the ``shot_dict`` as passed   |
|                              | to :func:`run`                               |
+------------------------------+----------------------------------------------+
| ocd.run_shot.exp_hetdex      | exec_status (:attr:`EXEC_STATUS`): whether   |
|                              | it is the start or the end of a HETDEX       |
|                              | exposure                                     |
+                              +----------------------------------------------+
|                              | exposure (int): exposure number              |
+                              +----------------------------------------------+
|                              | seconds (float): exposure time               |
+                              +----------------------------------------------+
|                              | observation (int): observation number, same  |
|                              | as obs_number                                |
+                              +----------------------------------------------+
|                              | object, shotid (str): id of the shot         |
+                              +----------------------------------------------+
|                              | recipe (str): script name and version        |
+                              +----------------------------------------------+
|                              | x_binning, y_binning (int): binning of the   |
|                              | output fits files                            |
+                              +----------------------------------------------+
|                              | type (str): type of observation              |
+                              +----------------------------------------------+
|                              | recipe (str): script name and version        |
+                              +----------------------------------------------+
|                              | the content of the ``shot_dict`` as passed   |
|                              | to :func:`run`                               |
+------------------------------+----------------------------------------------+
'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

from collections import namedtuple, OrderedDict
import configparser
import contextlib
import copy
import json
import os
import shutil
import shlex
import subprocess as sp
import time
import traceback

import numpy as np

from . import config as ocd_config
from . import errors
from . import heart_beat
from . import utils
from . import tcs_proxy
from . import __version__

track_mapping = {0: 'EAST', 1: 'WEST'}
'''Map 0/1 to EAST/WEST'''
track_mapping_initial = {k: v[0] for k, v in track_mapping.items()}
'''Map the keys of :attr:`track_mapping` to the first letter of the values'''
inverse_track_mapping = {v: k for k, v in track_mapping.items()}
'''Invert key <-> value of :attr:`track_mapping`'''

RUN_TOPIC = 'ocd.run_shot.run'
'''Topic of the events emitted by :func:`run`'''
SETUP_TOPIC = 'ocd.run_shot.setup_telescope'
'''Topic of the events emitted by :func:`setup_telescope`'''
EXP_TOPIC = 'ocd.run_shot.exp_hetdex'
'''Topic of the events emitted by :func:`do_exposure`'''

ALL_TOPICS = [RUN_TOPIC, SETUP_TOPIC, EXP_TOPIC]
'''List of all the topics emitted in this module'''

ExecStatus = namedtuple('ExecStatus', 'START, FINISH')
EXEC_STATUS = ExecStatus(START=0, FINISH=1)
'''Enumerator that tracks the execution status of each of the tasks that emit
events'''

# name of the configuration option of the ``run_shot`` configuration section
# that is used to know whether to send out events.
# NOTE: this option should be used only internally to switch the sending of
# events off when running a shot for the command line. It is set in
# ``_run_shot_cmd`` and not documented anywhere else on purpose. If it is set
# to ``false`` when running OCD proper will very likely break it.
_emit_events = 'emit_events'

# run the shot from the configuration file

_arguments = OrderedDict()
_arguments['shotid'] = {'help': 'ID of the shot to run'}
_arguments['obs_number'] = dict(help='Counter for the current observation.'
                                ' The number is used to set the directory'
                                ' name where the output fits files are'
                                ' saved.', type=int)
_arguments['ra'] = dict(help='Right ascension of the shot', type=float)
_arguments['dec'] = dict(help='Declination of the shot', type=float)
_arguments['azimuth'] = dict(help='Azimuth at which the observation'
                             'is performed', type=float)
_arguments['track'] = dict(help='Whether it is EAST or WEST track',
                           choices=[0, 1], type=int)
_arguments['exp01'] = dict(help='Exposure time of the first exposure in'
                           ' seconds', type=float)
_arguments['exp02'] = dict(help='Exposure time of the second exposure in'
                           ' seconds', type=float)
_arguments['exp03'] = dict(help='Exposure time of the third exposure in'
                           ' seconds', type=float)

shot_dict_keys = list(_arguments.keys())
'''List of the mandatory keys of the ``shot_dict`` used as input in
:func:`run`'''


[docs]def run(conf, shot_dict): '''Retrieve the information precomputed by shuffle, build the commands to setup the instrument and do the exposure. Send out an event when starting and when finishing. Any error is captured and logged. Some of the file and directory names can be build dynamically using `format syntax <https://docs.python.org/3/library/string.html#formatstrings>`_. The following fields are used: ``{shotid}``, ``{ra}``, ``{dec}``, ``{track_name}``, ``{track_number}``, ``{track_initial}``. Parameters ---------- conf : :class:`configparser.ConfigParser` configuration shot_dict : dictionary dictionary with the relevant information needed to run the next shot: * shotid (string): id of the shot * obs_number (int): observation number in the night (used for the name of the directory where the files are saved) * ra, dec (float): ra and dec of the shot * track (int): 0: East, 1: West * azimuth (float): azimuth at which to execute the shot * exp01, exp02, exp03 (floats): exposure times for the three exposures * sleep_for (float), optional: sleep for ``sleep_for`` seconds between sending the starting event and beginning to setup the telescope * metadata (dict), optional: set of metadata to send to TCS ''' send_event = conf['run_shot'].getboolean(_emit_events, fallback=True) if send_event: internal_server = utils.get_zmq_server('ocd_run_shot') # create the start event and send it event = dict(exec_status=EXEC_STATUS.START, error=False, exc_type=None, exc_value=None, traceback=None) event.update(shot_dict) if send_event: internal_server.send_tcs_event(RUN_TOPIC, event) time.sleep(shot_dict.get('sleep_for', 0)) event = copy.deepcopy(event) try: # run the shot _run(conf, shot_dict) except Exception as e: # if any exception is raised, report it play_sound(conf, 'failure_sound') tb = traceback.format_exc() event.update(error=True, exc_type=type(e).__name__, exc_value=repr(e), traceback=tb) msg = 'HETDEX shot {0} failed with error {1} and traceback:\n{2}' tcs_proxy.tcs_log.log_error(msg, shot_dict['shotid'], e, tb) # If required re-raise the error. if conf['run_shot'].getboolean('raise_exception'): raise finally: # just mark the event as finished # make sure that the metadata are reset whatever it happens tcs_proxy.tcs.set_metadata() event['exec_status'] = EXEC_STATUS.FINISH if send_event: internal_server.send_tcs_event(RUN_TOPIC, event)
[docs]def _run(conf, shot_dict): '''Actual implementation of :func:`run`. Parameters ---------- conf : :class:`configparser.ConfigParser` configuration shot_dict : dictionary dictionary with the relevant information needed to run the next shot ''' shotid = shot_dict['shotid'] track = shot_dict['track'] # keywords used to format the file names format_map = {k: shot_dict[k] for k in ['shotid', 'ra', 'dec']} format_map.update(track_number=track, track_name=track_mapping[track], track_initial=track_mapping_initial[track]) format_map.update(shot_dict.get('metadata', {})) # load the shuffle configuration file fname_template = conf['run_shot']['shuffle_conf_template'] shuffle_conf = retrieve_shuffle_info(fname_template.format(**format_map)) validate_shuffle_conf(conf, shuffle_conf, shot_dict) tcs_proxy.tcs_log.log_info('Shot {0}: shuffle configuration file loaded', shotid) # copy the acam image acam_dest = copy_acam(conf, shuffle_conf, format_map) tcs_proxy.tcs_log.log_info('Shot {0}: ACAM image copied to {1}', shotid, acam_dest) # setup the telescope tcs_proxy.tcs_log.log_info('Communicating the new coordinates to the' ' telescope') setup_telescope(conf, shuffle_conf, shot_dict) tcs_proxy.tcs_log.log_info('Telescope ready to execute HETDEX shot {0}', shotid) # run a HETDEX shot with reset_probes(conf): run_hetdex(conf, shot_dict) tcs_proxy.tcs_log.log_info('HETDEX shot {0} done', shotid)
[docs]def retrieve_shuffle_info(fname): '''Search and load the configuration file created by shuffle. Parameters ---------- fname: string name of the configuration file Returns ------- conf : :class:`configparser.ConfigParser` loaded configuration file from shuffle Raises ------ IOError if the configuration file does not exist configparser.ParsingError Exception raised when errors occur attempting to parse a file ''' conf = configparser.ConfigParser() with open(fname) as f: conf.read_file(f) return conf
[docs]def validate_shuffle_conf(conf, shuffle_conf, shot_dict): '''Make sure that the configuration is correct. Compare the expected ``ra``, ``dec``, ``azimuth`` and ``track`` with the corresponding entries in the ``[trajectory]`` section. Parameters ---------- conf : :class:`configparser.ConfigParser` application configuration file (for now not used, future proofed) shuffle_conf : :class:`configparser.ConfigParser` configuration file from shuffle shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` Raises ------ ocd.errors.ShuffleConfigError if the ra, dec and track from shuffle do not agree with the ones from OCD ''' shuffle_dict = utils.cast_dictionary(shuffle_conf['trajectory'], strings=['dir', ], floats=['ra', 'dec', 'az']) shuffle_dict['dir'] = inverse_track_mapping[shuffle_dict['dir']] tol_dict = utils.cast_dictionary(conf['run_shot'], floats=['abs_tol_ra', 'abs_tol_dec', 'abs_tol_azimuth', 'abs_tol_track']) for k_shot, k_shuffle in zip(['ra', 'dec', 'azimuth', 'track'], ['ra', 'dec', 'az', 'dir']): shot_value = shot_dict[k_shot] shuffle_value = shuffle_dict[k_shuffle] abs_tol = tol_dict['abs_tol_{}'.format(k_shot)] same_value = utils.isclose(shot_value, shuffle_value, abs_tol=abs_tol) if not same_value: msg = ('The values of {k} from OCD {v} and from shuffle {v_s}' ' differ more than the allowed absolute tolerance {at}') raise errors.ShuffleConfigError(msg.format(k=k_shot, v=shot_value, v_s=shuffle_value, at=abs_tol))
[docs]def copy_acam(conf, shuffle_conf, format_map): '''Copy the acam image from the shuffle directory to to a target file. Parameters ---------- conf : :class:`configparser.ConfigParser` application configuration file shuffle_conf : :class:`configparser.ConfigParser` configuration file from shuffle format_map : dict dictionary used to format the file names from the configuration files Returns ------- acam_dest destination file name ''' # construct the ACAM file name shuffle_shot_dir = conf['run_shot']['shuffle_shot'].format(**format_map) acam_file_name = shuffle_conf['image']['acam_output'] acam_image = os.path.join(shuffle_shot_dir, acam_file_name) # destination file name for the acam image acam_dest = conf['run_shot']['acam_dest_file'] # copy the file shutil.copy(acam_image, acam_dest) return acam_dest
[docs]def setup_telescope(conf, shuffle_conf, shot_dict): '''Set up the telescope and wait for the TO/RA to confirm that the setup is complete. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` application configuration file shuffle_conf : :class:`configparser.ConfigParser` configuration file from shuffle shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` ''' run_sec = conf['run_shot'] send_event = run_sec.getboolean(_emit_events, fallback=True) if send_event: internal_server = utils.get_zmq_server('ocd_run_shot') # create and event and send it event = dict(exec_status=EXEC_STATUS.START) event.update(shot_dict) if send_event: internal_server.send_tcs_event(SETUP_TOPIC, event) event = copy.deepcopy(event) # try to cancel the existing trajectory _cancel_trajectory() # load the next trajectory _load_tracjectory(shuffle_conf, shot_dict) # for each probe give the name (as used in tcssubsystem method names) and # the name of the corresponding section in the shuffle config file probe_sections = [['Guider1', 'guider1'], ['Guider2', 'guider2'], ['WFS1', 'wfs1'], ['WFS2', 'wfs2']] # set the position of the probes for probe, section in probe_sections: _set_probe_position(conf, shuffle_conf, shot_dict, probe, section) # go to the next position go_next_sec = shuffle_conf['go_next'] go_next_kwargs = utils.cast_dictionary(go_next_sec, strings=['move_structure', 'move_dome', 'move_probes'], binary_strings=True) tcs_proxy.tcs.go_next(**go_next_kwargs) # set the object id and the magnitudes of the probe stars for probe, section in probe_sections: _set_probe_object(conf, shuffle_conf, shot_dict, probe, section) # then do some more setups _set_probes_fiductial(conf, shuffle_conf, shot_dict) # set the metadata _set_metadata(conf, shot_dict) # and then play a sound play_sound(conf, 'setup_sound') # wait for the TO/RA to mark the setup as complete _wait_for_setup(conf, shot_dict) event['exec_status'] = EXEC_STATUS.FINISH if send_event: internal_server.send_tcs_event(SETUP_TOPIC, event)
[docs]def _cancel_trajectory(): '''Try to cancel an existing trajectory. If the command fails because no active trajectory is found, send a warning log. Any other failure is re-raised. ''' try: tcs_proxy.tcs.cancel_trajectory() except tcs_proxy.errors.error as e: if 'No currently active trajectory' in str(e): tcs_proxy.tcs_log.log_warn(str(e)) else: raise
[docs]def _load_tracjectory(shuffle_conf, shot_dict): '''Load the trajectory Parameters ---------- shuffle_conf : :class:`configparser.ConfigParser` configuration file from shuffle shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` ''' # load the next trajectory # save the parameters of interest traj_kwargs = utils.cast_dictionary(shuffle_conf['trajectory'], floats=['equinox', ]) traj_kwargs.update(utils.cast_dictionary(shot_dict, ints=['track', ], floats=['ra', 'dec', 'azimuth'])) traj_kwargs['dir'] = track_mapping[traj_kwargs.pop('track')] traj_kwargs['az'] = traj_kwargs.pop('azimuth') tcs_proxy.tcs.load_trajectory(**utils.binary_dict(traj_kwargs))
[docs]def _set_probe_position(conf, shuffle_conf, shot_dict, probe_name, section_name): '''Execute the commands necessary to setup the probes ra and dec Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` application configuration file shuffle_conf : :class:`configparser.ConfigParser` configuration file from shuffle shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` probe_name, section_name : string name of the probe and of the section of the ``shuffle_conf`` containing the information from the probe ''' conf_sec = shuffle_conf[section_name] # set the position set_position = getattr(tcs_proxy.tcs, '{p}_set_position'.format(p=probe_name)) set_pos_kwargs = utils.cast_dictionary(conf_sec, floats=['ra', 'dec', 'equinox'], strings=['trajectory', ], binary_strings=True) set_position(**set_pos_kwargs)
[docs]def _set_probe_object(conf, shuffle_conf, shot_dict, probe_name, section_name): '''Execute the commands necessary to setup the probes object id and magnitudes Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` application configuration file shuffle_conf : :class:`configparser.ConfigParser` configuration file from shuffle shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` probe_name, section_name : string name of the probe and of the section of the ``shuffle_conf`` containing the information from the probe ''' conf_sec = shuffle_conf[section_name] # set the object name and the magnitude probe_type = section_name.strip('12') shuffle_filters_opt = '{}_shuffle_filters'.format(probe_type) pas_filters_opt = '{}_pas_filters'.format(probe_type) if probe_type == 'guider': shuffle_filters = conf.get_list('run_shot', shuffle_filters_opt) pas_filters = conf.get_list('run_shot', pas_filters_opt) else: # no filters passed to wfs shuffle_filters, pas_filters = [], [] set_mag = getattr(tcs_proxy.pas, '{p}_SetObjectAndMagnitudes'.format(p=probe_name)) set_mag_kwargs = utils.cast_dictionary(conf_sec, strings=['id', ], floats=shuffle_filters) # reassign "id" to "object" and the filter names for from_key, to_key in zip(['id', ] + shuffle_filters, ['object', ] + pas_filters): set_mag_kwargs[to_key] = set_mag_kwargs.pop(from_key) set_mag(**utils.binary_dict(set_mag_kwargs))
[docs]def _set_probes_fiductial(conf, shuffle_conf, shot_dict): '''Setup the analysis region and the fiducial position and initialize the WFS exposure times. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` application configuration file shuffle_conf : :class:`configparser.ConfigParser` configuration file from shuffle shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` ''' tcs_proxy.tcs.Guider1_set_analysis_region(xmin=55, ymin=25, xmax=132, ymax=111) tcs_proxy.tcs.Guider1_set_fiducial(xpixel=93.5, ypixel=67.5) tcs_proxy.tcs.Guider2_set_analysis_region(xmin=46, ymin=29, xmax=129, ymax=113) tcs_proxy.tcs.Guider2_set_fiducial(xpixel=87.5, ypixel=70.5) tcs_proxy.pas.WFS1_SetExposureTime(time=5) tcs_proxy.pas.WFS2_SetExposureTime(time=5)
[docs]def _set_metadata(conf, shot_dict): '''Set the metadata stored in ``shot_dict['metadata']``. If the key is not present, skip the step. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` application configuration file shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` ''' try: _metadata_dict = copy.copy(shot_dict['metadata']) except KeyError: tcs_proxy.tcs_log.log_warn('No metadata set') return metadata_comments = dict(QPMRA_COMMENT='RA proper motion', QPMDEC_COMMENT='DEC proper motion', QIFU_COMMENT='Requested IFU', QPROG_COMMENT='HET program number', QIDX_COMMENT='HET trimester index number') metadata_dict = {} for k in _metadata_dict.keys(): comm_key = '{}_COMMENT'.format(k) try: metadata_dict[comm_key] = metadata_comments[comm_key] except KeyError: # Ignore missing comments pass metadata_dict.update(_metadata_dict) tcs_proxy.tcs.set_metadata(**utils.binary_dict(metadata_dict))
[docs]def message_expstart(conf, exptime, inst='VIRUS'): ''' Trigger the exposure time clock The name of the binary to execute and the options come from the ``message_board`` option of the ``[run_shot]`` configuration section. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` application configuration file exptime : float Exposure time to display inst : string (optional) Instrument name to send to the message boad command ''' sec = conf['run_shot'] cmd = shlex.split(sec['message_board'].format(exptime=exptime)) try: p = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE) except Exception as e: tcs_proxy.tcs_log.log_warn('There was a problem displaying the ' 'exposure time on the message board with ' 'the command "{}". The exception was', ' '.join(cmd), str(e)) return stdout, stderr = p.communicate() if p.returncode != 0: tcs_proxy.tcs_log.log_warn('There was a problem displaying the ' 'exposure time on the message board with' ' the command "{}". The error message is' ' "{}"', ' '.join(cmd), stderr)
[docs]def play_sound(conf, file_option): '''Play a sound. The name of the binary to execute the sounds and the options come from the ``play_exe`` option of the ``[run_shot]`` configuration section. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` application configuration file file_option : string name of the configuration option of the ``[run_shot]`` section containing the name of the file to play ''' sec = conf['run_shot'] play_exe = shlex.split(sec['play_exe']) sound_file = sec[file_option] cmd = play_exe + [sound_file, ] try: p = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE) except Exception as e: tcs_proxy.tcs_log.log_warn('There was a problem playing the sound with' 'the command "{}". The exception was: ', ' '.join(cmd), str(e)) return stdout, stderr = p.communicate() if p.returncode != 0: tcs_proxy.tcs_log.log_warn('There was a problem playing the sound with' ' the command "{}". The error message is' ' "{}"', ' '.join(cmd), stderr)
[docs]def _wait_for_setup(conf, shot_dict): '''Wait for the confirmation that the setup is done. If the ``wait_for_setup_timeout`` option of the ``[run_shot]`` configuration section is positive, add a timeout to the waiting. If the ``continue_on_timeout`` option of the ``[run_shot]`` configuration section is ``false``, abort the shot if the timeout is hit; if it is ``true`` run the shot even if the timeout is hit. Any other exception is re-raised. Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` application configuration file shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` ''' run_sec = conf['run_shot'] tcs_proxy.tcs_log.log_info('Wait for confirmation that the setup for the' ' shot {} is ready', shot_dict['shotid']) timeout = run_sec.getfloat('wait_for_setup_timeout') try: tcs_proxy.tcs.wait_for_setup(timeout=timeout) except tcs_proxy.errors.error as e: continue_on_timeout = run_sec.getboolean('continue_on_timeout') if 'Timeout waiting for setup' in str(e) and continue_on_timeout: msg = ('The setup has not be confirmed, but the shot execution' ' continues, as instructed by the ``continue_on_timeout``' ' configuration variable.') tcs_proxy.tcs_log.log_warn(msg) else: raise
[docs]@contextlib.contextmanager def reset_probes(conf): '''Before yielding stop the ACAM and set the storage of guider images. On return from yield, deactivate the probe images storage and deploy the ACAM mirror. Parameters ---------- conf : :class:`configparser.ConfigParser` configuration ''' try: tcs_proxy.tcs.ACQ_stop() store_kwargs = utils.binary_dict({'store': 'true'}) tcs_proxy.pas.Guider1_SetStoreImages(**store_kwargs) tcs_proxy.pas.Guider2_SetStoreImages(**store_kwargs) yield finally: no_store_kwargs = utils.binary_dict({'store': 'false'}) tcs_proxy.pas.Guider1_SetStoreImages(**no_store_kwargs) tcs_proxy.pas.Guider2_SetStoreImages(**no_store_kwargs) tcs_proxy.pas.WFS1_SetStoreImages(**no_store_kwargs) tcs_proxy.pas.WFS2_SetStoreImages(**no_store_kwargs) state_kwargs = utils.binary_dict({'state': 'false'}) tcs_proxy.tcs.update_setup(**state_kwargs) tcs_proxy.pfip.DeployACQCameraMirror()
[docs]def run_hetdex(conf, shot_dict): '''Run a hetdex shot Parameters ---------- conf : :class:`configparser.ConfigParser` configuration shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` ''' send_event = conf['run_shot'].getboolean(_emit_events, fallback=True) shotid = shot_dict['shotid'] if send_event: internal_server = utils.get_zmq_server('ocd_run_shot') event = dict(exec_status=EXEC_STATUS.START) event.update(shot_dict) retract_mirrors() tcs_proxy.tcs_log.log_info("Enable the shutter.") tcs_proxy.virus.enable_shutter() dithers, delta_shifts = get_dithers(conf) # common options to pass to the ``virus.expose`` command expose_kwargs = dict(observation=shot_dict['obs_number'], object=shotid, recipe=__name__ + " v" + __version__, x_binning=2, y_binning=1, type='sci') # make sure that whatever happens the dither mechanism start with 1 and is # reset to 1 with ensure_first_dither(): exposure_times = [shot_dict[k] for k in ['exp01', 'exp02', 'exp03']] zipped = zip(dithers, delta_shifts, exposure_times) # loop over the exposures for n_exposure, (dither_pos, shift, exp_time) in enumerate(zipped, 1): # update the exposure arguments with the things for the current # shot expose_kwargs.update(exposure=n_exposure, seconds=exp_time) exp_event = copy.deepcopy(event) exp_event.update(expose_kwargs) if send_event: internal_server.send_tcs_event(EXP_TOPIC, exp_event) exp_event = copy.deepcopy(exp_event) message_expstart(conf, exp_time) do_exposure(conf, shot_dict, expose_kwargs, dither_pos, shift, len(dithers)) play_sound(conf, 'finish_exp_{}_sound'.format(n_exposure)) exp_event['exec_status'] = EXEC_STATUS.FINISH if send_event: internal_server.send_tcs_event(EXP_TOPIC, exp_event)
[docs]def retract_mirrors(): '''Retract the ACAM and CWFS mirrors ''' tcs_proxy.tcs_log.log_info("Retract ACAM and the CWFS/PV pickoff mirror.") tcs_proxy.pfip.RetractACQCameraMirror() tcs_proxy.pfip.RetractCWFSCameraMirror()
[docs]@contextlib.contextmanager def ensure_first_dither(): '''Make sure that the dither position is set to 1 at the beginning and at the end, no matter what ''' try: tcs_proxy.pfip.AdjustDither(pos=1) yield finally: tcs_proxy.pfip.AdjustDither(pos=1)
[docs]def get_dithers(conf): '''Get the dither positions and dither shifts Examples -------- >>> import configparser >>> conf = configparser.ConfigParser() >>> conf.read_dict({'run_shot': {'dither_with_probes': 'false'}}) >>> get_dithers(conf) ([1, 2, 3], [None, None, None]) >>> conf['run_shot']['dither_with_probes'] = 'true' >>> dithers, delta_shifts = get_dithers(conf) >>> dithers [1, 1, 1] >>> delta_shifts array([[ 1.27, -0.73], [ 0. , 1.46], [-1.27, -0.73]]) Parameters ---------- conf : :class:`configparser.ConfigParser` configuration. Uses the ``dither_with_probes`` option of the ``[general]`` section Returns ------- dither_pos : list dither positions, either ``[1, 2, 3]`` or ``[1, 1, 1]`` delta_shifts : ndarray shifts to apply to the fiducial position of the guide probes. If the dither positions are different, a list of three ``None`` is returned, otherwise the following:: [[x_shift_1, y_shift_1], [x_shift_2, y_shift_2], [x_shift_3, y_shift_3]] ''' dither_with_probes = conf.getboolean('run_shot', 'dither_with_probes') if dither_with_probes: dither_pos = [1, 1, 1] # get the offset to apply to the guider for each dither dithers = np.array([[0, 0], [-1.27, 0.73], [-1.27, -0.73]]) delta_shifts = dithers - np.roll(dithers, -1, axis=0) else: dither_pos = [1, 2, 3] delta_shifts = [None, ]*3 return dither_pos, delta_shifts
[docs]def do_exposure(conf, shot_dict, expose_kwargs, dither_pos, guider_shift, tot_exposures): '''Run a single exposure Parameters ---------- conf : :class:`configparser.ConfigParser` configuration shot_dict : dictionary dictionary with the relevant information needed to run the next shot. Same as :func:`run` expose_kwargs : dictionary keyword arguments for the ``virus.expose`` call dither_pos : int dither position guider_shift : None or list of two elements if not ``None``, shift the fiducial position of the guide probes by ``dx = guider_shift[1]`` and ``dy = guider_shift[0]`` tot_exposures : int total number of exposures ''' shotid = shot_dict['shotid'] n_exposure = expose_kwargs['exposure'] # set the dither mechanism to the correct position tcs_proxy.pfip.AdjustDither(pos=dither_pos) tcs_proxy.tcs_log.log_info("Starting exposure number {1} of shot" " {0}. Expose for {2} seconds.", shotid, n_exposure, expose_kwargs['seconds']) # do the exposure and wait for it to finish exp_response = tcs_proxy.virus.expose(**utils.binary_dict(expose_kwargs)) exp_response_kwargs = utils.cast_dictionary(exp_response, ints=['observation', 'exposure'], binary_strings=True) tcs_proxy.virus.wait_for_shutter_close(**exp_response_kwargs) tcs_proxy.tcs_log.log_info("Completed exposure {1} of shot {0}", shotid, n_exposure) is_last_exp = (n_exposure == tot_exposures) # if the guide probe offset is required do it if (guider_shift is not None) and (not is_last_exp): offset_kwargs = dict(dx_asec=guider_shift[1], dy_asec=guider_shift[0], compensate='true') msg = ('Offsetting the guide probes fiducial position by dx={0}' ' and dy={1}') tcs_proxy.tcs_log.log_info(msg, offset_kwargs['dx_asec'], offset_kwargs['dy_asec']) offset_kwargs = utils.binary_dict(offset_kwargs) tcs_proxy.tcs.Guider1_offset_fiducial(**offset_kwargs) tcs_proxy.tcs.Guider2_offset_fiducial(**offset_kwargs) wait_last_readout = conf.getboolean('run_shot', 'wait_last_readout') wait_for_readout = (not is_last_exp) or wait_last_readout if wait_for_readout: msg = 'Wait for exposure {0} to be read out' tcs_proxy.tcs_log.log_info(msg, n_exposure) try: tcs_proxy.virus.wait_for_readout(**exp_response_kwargs) except tcs_proxy.errors.error as e: if "No active exposure" in str(e): tcs_proxy.tcs_log.log_warn('The data has been read out' ' so fast that exposure {} of' ' observation {} is not active' ' anymore', exp_response_kwargs['exposure'], exp_response_kwargs['observation']) else: raise else: msg = '*Do not* wait for exposure {0} to be read out' tcs_proxy.tcs_log.log_info(msg, n_exposure)
[docs]def check_fiter_map(conf): '''Make sure that the shuffle and the pas filters from the configuration file have the same number of names Parameters ---------- conf : :class:`pyhetdex.tools.configuration.ConfigParser` configuration entries Raises ------ OCDValueError if there is missmatch between the two lists ''' for probe in ['guider', ]: # do a loop so that if we end up adding also WFS the work is minimal shuffle_filters = conf.get_list('run_shot', '{}_shuffle_filters'.format(probe)) pas_filters = conf.get_list('run_shot', '{}_pas_filters'.format(probe)) if len(shuffle_filters) != len(pas_filters): msg = ('The number of filters for the probe {} to read from the' ' shuffle configuration file ({}) must be the same as the' ' number of filter names used by pas ({})') msg = msg.format(probe, len(shuffle_filters), len(pas_filters)) raise errors.OCDValueError(msg)
[docs]def run_shot_parser(subparsers): '''Add a subcommand "run_shot" to run a hetdex shot. Parameters ---------- subparsers : argparse subparsers object subparser to use to generate new parsers Returns ------- parser : :class:`argparse.ArgumentParser` modified parser ''' import argparse as ap parser = subparsers.add_parser('run_shot', help='Run a hetdex shot', description='''Subcommand to run a single hetdex shot''', formatter_class=ap.ArgumentDefaultsHelpFormatter) parser.set_defaults(func=_run_shot_cmd) parser = utils.override_epilog_msg(parser) for name, options in _arguments.items(): parser.add_argument(name, **options) parser = ocd_config.config_file_argument(parser) parser.add_argument('-e', '--emit-events', help='''Emit events via ZeroMQ. If used, emits a number of events using the internal OCD ZeroMQ servers. Use with caution, in particular if OCD is running.''', action='store_true') parser.add_argument('-c', '--wait-for-connection', action='store_true', help='''Try to establish a connection with the OCD main loop before starting to run the shot. This option is used only if ``-e/--emit-events`` is activated. If the connection is not established within 10 seconds, abort the execution.''') parser.add_argument('-t', '--timeout', type=int, default=10, help='''If ``-c/--wait-for-connection`` is given, wait at most '%(dest)s' seconds before aborting''') parser.add_argument('-m', '--metadata', type=json.loads, help='''Dictionary of metadata to send to TCS. It must be a JSON string encoding a dictionary. ``json.loads`` is used to deserialize the string to a python dictionary. Example: -m '{"test": 42, "key": "value"}'.''') title = 'Override options in the [run_shot] section' overrides_run_shot = parser.add_argument_group(title=title) overrides_run_shot.add_argument('-d', '--dither-with-probes', choices=['yes', 'no'], dest='setting__run_shot__dither_with_probes', help='''Use the guide probes for dithering''') overrides_run_shot.add_argument('-w', '--wait-last-readout', choices=['yes', 'no'], dest='setting__run_shot__wait_last_readout', help='''Wait for the last readout to happen before exiting''') overrides_run_shot.add_argument('-N', '--n-ocd-run-shot', dest='setting__run_shot__n_ocd_run_shot', metavar='N_OCD_RUN_SHOT', help='''Which of the urls for the ``ocd_run_shot`` option of the ``[urls]`` section use to setup a ZMQ server''') overrides_run_shot.add_argument('--shuffle-shot', dest='setting__run_shot__shuffle_shot', metavar='SHUFFLE_SHOT', help='''Directory containing the shuffle output directories''') overrides_run_shot.add_argument('--shuffle-conf-template', dest='setting__run_shot__shuffle_conf_template', metavar='SHUFFLE_CONF_TEMPLATE', help='''Template for the name of the configuration files generated by shuffle''') overrides_run_shot.add_argument('--acam-dest-file', dest='setting__run_shot__acam_dest_file', metavar='ACAM_DEST_FILE', help='''File where to copy the ACAM image''') title = 'Override options in the [urls] section' overrides_urls = parser.add_argument_group(title=title) overrides_urls.add_argument('--ocd-run-shot', dest='setting__urls__ocd_run_shot', metavar='OCD_RUN_SHOT', help='''Urls/paths used to send the signal when running a shot. Used only if ``-e/--emit-events`` is enabled.''', nargs='+') return subparsers
[docs]def _run_shot_cmd(args): '''Entry point for running a shot from the command line Parameters ---------- args : :class:`~argparse.Namespace` parsed command line arguments ''' config = ocd_config.load_config(config_file=args.config, args=args) # Check that the filter names make sense check_fiter_map(config) # inject into the configuration file, under the ``[run_shot]`` section, the # ``emit_event`` option. # NOTE: this option is not in the OCD configuration file on purpose. config['run_shot'][_emit_events] = 'yes' if args.emit_events else 'no' # inject into the configuration file, under the ``[run_shot]`` section, the # ``raise_exception`` option. # NOTE: this option is not in the OCD configuration file on purpose. config['run_shot']['raise_exception'] = 'yes' # initialize various pieces and bits necessary to run OCD # the proxy must always be the first one to initialize after loading the # configuration file as it provides the logging functionality to the rest # of the code tcs_proxy.init(config) # If required, initialized the ocd servers and add their urls to the list # to pass to the listener if args.emit_events: setup_for_events(args.subcommand, config, args.wait_for_connection, timeout=args.timeout) shot_dict = {k: getattr(args, k) for k in shot_dict_keys} if args.metadata: shot_dict['metadata'] = args.metadata # run the shot run(config, shot_dict)
[docs]def setup_for_events(subcommand, config, wait_for_connection, timeout=10): '''Initialise the zmq servers and, if necessary wait for the connection to be established. Parameters ---------- subcommand : string name of the subcommand config : :class:`pyhetdex.tools.configuration.ConfigParser` configuration entries wait_for_connection : bool whether it has to wait for the connection to be established timeout : int, option if ``wait_for_connection`` is ``True`` wait at most ``timeout`` seconds before aborting Raises ------ ocd.errors.TimeOutError if the timeout is hit ''' utils.init_zmq_servers(subcommand, config) tcs_proxy.tcs_log.log_info('OCD ZeroMQ servers initialized') if wait_for_connection: hb = heart_beat.HeartBeatQuery.from_names('ocd_run_shot', 'ocd_main_loop', n_attempts=None, interval=0.2, timeout=timeout) _, _, timeout_hit = hb.communicate() if timeout_hit: msg = ('No connection with the OCD main loop could be established.' '\nIf ``ocd run`` is nor running, do not use the ' ' ``-c/--wait-for-connection``.\n' 'If ``ocd run`` is running, make sure that the' ' urls/addresses of the servers and listeners match' ' and that there are no network (if using TCP) or file' ' permission (if using IPC) problems') raise errors.TimeOutError(msg)
[docs]def run_shot_subprocess(conf, shot_dict): '''Run the shot in a subprocess. Parameters ---------- conf : :class:`configparser.ConfigParser` configuration. Passed to :func:`ocd.shots_db.create_shot_file`, :func:`run_autoschedule` shot_dict : dictionary dictionary of options; see :func:`run` for more info Returns ------- p : :class:`subprocess.Popen` process running the shot cmd : string command executed ''' conf_file = ocd_config.save_config(conf) exe = ['ocd', 'run_shot'] arguments = [shot_dict[k] for k in shot_dict_keys] options = ['-e', '-c', '--config', conf_file] metadata = shot_dict.get('metadata', None) if metadata is not None: options += ['--metadata', json.dumps(metadata)] cmd = exe + options + arguments cmd = [str(i) for i in cmd] p = sp.Popen(cmd) # , stdout=sp.PIPE, stderr=sp.PIPE) return p, ' '.join(cmd)