Source code for ocd.db_replay

# Observing condition decision tool: monitor conditions and plan HETDEX
# observations
# Copyright (C) 2017  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''Replay a database to allow testing OCD also during the day or from far away.

This implementation is based on `tcs_replay
<http://www.mpe.mpg.de/~montefra/documentation/tcs_lib/latest/zmq_server.html>`_.
'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

from tcs_lib.server import TCSDBReplay

from . import config as ocd_config
from . import errors
from . import heart_beat
from . import utils
from . import tcs_proxy


[docs]def db_replay_parser(subparsers): '''Add a subcommand "db_replay" to run a hetdex shot. Parameters ---------- subparsers : argparse subparsers object subparser to use to generate new parsers Returns ------- parser : :class:`argparse.ArgumentParser` modified parser ''' import argparse as ap parser = subparsers.add_parser('db_replay', help='Replay a night database', description='''Subcommand to replay a sqlite3 database, typically from night operations.''', formatter_class=ap.ArgumentDefaultsHelpFormatter) parser.set_defaults(func=db_replay_main) parser = utils.override_epilog_msg(parser) parser.add_argument('db_name', help='''Name of the database to replay''') parser = ocd_config.config_file_argument(parser) parser.add_argument('--serve-forever', action='store_true', help='''After the database finishes, restart serving it.''') parser.add_argument('-t', '--timeout', type=int, default=10, help='''When trying to connect with the OCD main loop, wait at most '%(dest)s' seconds before aborting''') parser.add_argument('-s', '--speedup', default=1., type=float, help='''Speedup to apply when replaying the database. For example, %(dest)s=1 plays back at the original speed; %(dest)s=2 plays it twice as fast''') parser.add_argument('-T', '--topics', nargs='+', help='''List of topics to publish. If no topic is specified, all the topics will be published.''') parser.add_argument('-S', '--sort-by', default='__wire_time', choices=['none', '__data_time', '__wire_time'], help='''Method to time-sort the events before serving them. If 'none', no sorting is done and the order of the database is respected.''') title = 'Override options in the [urls] section' overrides_urls = parser.add_argument_group(title=title) overrides_urls.add_argument('--ocd-db-replay', dest='setting__urls__ocd_db_replay', metavar='OCD_DB_REPLAY', help='''Urls/paths used to send the signal when replaying a database.''', nargs='+') return subparsers
[docs]def db_replay_main(args): '''Entry point for replaying the database Parameters ---------- args : :class:`~argparse.Namespace` parsed command line arguments ''' # get the configuration config = ocd_config.load_config(config_file=args.config, args=args) # initialize various pieces and bits necessary to run OCD # the proxy must always be the first one to initialize after loading the # configuration file as it provides the logging functionality to the rest # of the code tcs_proxy.init(config) # initialise the servers utils.init_zmq_servers(args.subcommand, config) tcs_proxy.tcs_log.log_info('OCD ZeroMQ servers initialized') # test that it can connect to the OCD mail loop establish_connection(args.timeout) # create the event generator event_generator = TCSDBReplay(args.db_name, sort_by=args.sort_by, speedup=args.speedup, topics=args.topics, convert_number=True, convert_bool=False) zmq_server = utils.get_zmq_server('ocd_db_replay') while True: print('Start serving database', args.db_name) zmq_server.start(event_generator) print('Finished serving database', args.db_name) if args.serve_forever: print('Restart serving') else: print('Goodbye') break
[docs]def establish_connection(timeout): '''Try to connect to the ocd main loop before starting to replay the database. Parameters ---------- timeout : int Maximum time to wait before giving up Raises ------ ocd.errors.TimeOutError if no connection can be established ''' hb = heart_beat.HeartBeatQuery.from_names('ocd_db_replay', 'ocd_main_loop', n_attempts=None, interval=0.2, timeout=timeout) _, _, timeout_hit = hb.communicate() if timeout_hit: msg = ('No connection with the OCD main loop could be established.' '\nIf ``ocd run`` is nor running, start it.\n' 'If ``ocd run`` is running, make sure that the' ' urls/addresses of the servers and listeners match' ' and that there are no network (if using TCP) or file' ' permission (if using IPC) problems') raise errors.TimeOutError(msg)