Source code for pymepix.processing.baseacquisition

# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.

"""Module deals with managing processing objects to form a data pipeline"""

from multiprocessing import Queue

import zmq

import pymepix.config.load_config as cfg
from pymepix.core.log import Logger
from pymepix.processing.usbtrainid import USBTrainID


[docs] class AcquisitionStage: """Defines a single acquisition stage Usually not created directly. Instead, it is created by :class:`AcquisitionPipeline` Represent a single pipeline stage and handles management of queues and message passing as well as creation and destruction of processing objects. Processes are not created until build() is called and do not run until start() is called Parameters ------------ stage: int Initial position in the pipeline, lower stages are executed first """ def __init__(self, stage, num_processes=1): # Logger.__init__(self, "AcqStage-{}".format(stage)) self.log = Logger("AcqStage-{}".format(stage)) self._stage_number = stage self._pipeline_objects = [] self._pipeline_klass = None self._num_processes = num_processes self._running = False self._input_queue = None self._output_queue = None self._args = [] self._kwargs = {} @property def stage(self): """Current position in the pipeline""" return self._stage_number @stage.setter def stage(self, value): self._stage_number = value @property def numProcess(self): """Number of processes to spawn when built Parameters ---------- value: int Number of processes to spawn when acquisition starts Returns ---------- int: Number of processes """ return self._num_processes @numProcess.setter def numProcess(self, value): self._num_processes = max(1, value)
[docs] def configureStage(self, pipeline_klass, *args, **kwargs): """Configures the stage with a particular processing class Parameters ----------- pipeline_klass: :class:`BasePipeline` A pipeline class object *args: positional arguments to pass into the class init **kwargs: keyward arguments to pass into the class init """ self.log.debug("Assigning stage {} to klass {}".format(self.stage, pipeline_klass)) self._pipeline_klass = pipeline_klass # zmq socket for communication with write2disk thread # only initialize this in udpsampler camera_generation = cfg.default_cfg["timepix"]["camera_generation"] if camera_generation == 3: from .udpsampler3 import UdpSampler elif camera_generation == 4: from .udpsampler4 import UdpSampler else: self.log.critical(f"Unsupported camera generation: {camera_generation}") raise ValueError(f"Unsupported camera generation: {camera_generation}") if pipeline_klass == UdpSampler: self.ctx = zmq.Context.instance() # self.udp_sock = self.ctx.socket(zmq.PAIR) # self.udp_sock.bind('tcp://127.0.0.1:40000') # self.log.info("zmq bind on 'tcp://127.0.0.1:40000'") self.setArgs(*args, **kwargs)
[docs] def setArgs(self, *args, **kwargs): self._args = args self._kwargs = kwargs
@property def processes(self): return self._pipeline_objects
[docs] def build(self, input_queue=None, output_queue=None): self._input_queue = input_queue self._output_queue = output_queue self.log.debug("Building stage with arguments {} {}".format(self._args, self._kwargs)) if self._output_queue is None: self.log.debug("I am creating the queue") self._output_queue = Queue() else: self.log.debug("Recieved the queue {}".format(output_queue)) self.log.debug("Building stage {} ".format(self._stage_number)) self.log.info("Creating {} processes".format(self._num_processes)) for n in range(self._num_processes): p = self._pipeline_klass( *self._args, **self._kwargs, input_queue=self._input_queue, shared_output=self._output_queue, ) p.daemon = True self._pipeline_objects.append(p)
# if self._output_queue is None: do we really need it? # self._output_queue = p.outputQueues()[-1] @property def outputQueue(self): return self._output_queue
[docs] def start(self): for p in self._pipeline_objects: zmq_port = cfg.default_cfg["zmq_port"] if p.name.find("UdpSampler-") > -1: self.udp_sock = self.ctx.socket(zmq.PAIR) self.udp_sock.bind(f"tcp://127.0.0.1:{zmq_port}") self.log.info(f'udp_sock bind on "tcp://127.0.0.1:{zmq_port}"') if cfg.default_cfg["trainID"]["connected"]: self.train_sock = self.ctx.socket(zmq.PAIR) self.train_sock.bind(f"ipc:///tmp/train_sock{zmq_port}") self.log.info(f'trainID bind on "ipc:///tmp/train_sock{zmq_port}"') self.startTrainID() p.start()
[docs] def stop(self, force=False): self.log.info("Stopping stage {}".format(self.stage)) if self._input_queue is not None: # Put a none in and join all threads self._input_queue.put(None) for idx, p in enumerate(self._pipeline_objects): p.enable = False self.log.info("Waiting for process {}".format(idx)) p.join(1.0) p.terminate() p.join() self.log.info("Process stop complete") if self._input_queue.get() is not None: self.log.error("Queue should only contain None!!") raise Exception("Queue contains more data") self._input_queue.close() else: for p in self._pipeline_objects: if p.name.find("UdpSampler-") > -1: self.log.debug(f"closing zmq socket for tcp://127.0.0.1:{cfg.default_cfg['zmq_port']}") self.udp_sock.close() if cfg.default_cfg["trainID"]["connected"]: self.stopTrainID() self.log.debug(f"closing zmq socket for icp:///tmp/train_sock{cfg.default_cfg['zmq_port']}") self.train_sock.close() p.enable = False self.log.info("Joining thread {}".format(p)) p.join(1.0) p.terminate() p.join() self.log.info("Join complete") self.log.info("Stop complete") self._pipeline_objects = []
[docs] def startTrainID(self): self.log.info("start USBTrainID process") # generate worker to save the data directly to disk self._trainIDRec = USBTrainID() self._trainIDRec.start()
[docs] def stopTrainID(self): self.log.info("stopping USBTrainID process") self.train_sock.send_string("STOP RECORDING") self.train_sock.send_string("SHUTDOWN") self._trainIDRec.join(2.0) # file still needs to be saved self._trainIDRec.terminate() self._trainIDRec.join()
[docs] class AcquisitionPipeline: """Class that manages various stages""" def __init__(self, name, data_queue): self.log = Logger(name + " AcqPipeline") self.log.info("Initializing pipeline") self._stages = [] self._data_queue = data_queue self._running = False
[docs] def addStage(self, stage_number, pipeline_klass, *args, num_processes=1, **kwargs): """Adds a stage to the pipeline""" stage = AcquisitionStage(stage_number, num_processes) self.log.info("Adding stage {} with klass {}".format(stage_number, pipeline_klass)) stage.configureStage(pipeline_klass, *args, **kwargs) self._stages.append(stage) self._stages = sorted(self._stages, key=lambda x: x.stage)
[docs] def getStage(self, stage_number): for x in self._stages: if x.stage == stage_number: return x return None
@property def stages(self): return self._stages
[docs] def start(self): """Starts all stages""" # Sort them by stage number self.log.info("Starting acquisition") # Build them previous_stage = None last_index = len(self._stages) - 1 self.log.debug("Last index is {}".format(last_index)) for idx, s in enumerate(self._stages): self.log.debug("Building stage {} {}".format(idx, s.stage)) if previous_stage is not None: queues = previous_stage.outputQueue self.log.debug("Queues: {}".format(queues)) if idx != last_index: s.build(input_queue=queues) else: self.log.debug("This is the last queue so output is the last one") s.build(input_queue=queues, output_queue=self._data_queue) else: if idx != last_index: s.build() else: self.log.info("First stage shares output") s.build(output_queue=self._data_queue) previous_stage = s self.log.debug("Last stage is {}".format(s)) for s in self._stages: s.enable = True s.start() self._running = True
@property def isRunning(self): return self._running
[docs] def stop(self): """Stops all stages""" self.log.info("Stopping acquisition") self.log.debug(self._stages) if self._running is True: for s in self._stages: s.stop() self._running = False
[docs] def main(): import logging import threading import time from multiprocessing.sharedctypes import Value from pymepix.processing.pipeline_packet_processor import PipelinePacketProcessor camera_generation = cfg.default_cfg["timepix"]["camera_generation"] if camera_generation == 3: from .udpsampler3 import UdpSampler elif camera_generation == 4: from .udpsampler4 import UdpSampler else: # log.critical(f"Unsupported camera generation: {camera_generation}") # TODO: find correct logger raise ValueError(f"Unsupported camera generation: {camera_generation}") # Create the logger logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) end_queue = Queue() acqpipline = AcquisitionPipeline("Test", end_queue) longtime = Value("I", 0) acqpipline.addStage(0, UdpSampler, longtime) acqpipline.addStage(2, PipelinePacketProcessor, num_processes=4) def get_queue_thread(queue): while True: value = queue.get() # messType, data = value # recieved.append(value[1]) # print(value) if value is None: break t = threading.Thread(target=get_queue_thread, args=(end_queue,)) t.daemon = True t.start() acqpipline.start() time.sleep(120.0) acqpipline.stop() end_queue.put(None) t.join() print("Done")
if __name__ == "__main__": main()