Source code for pymepix.processing.basepipeline

# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.

"""Base implementation of objects relating to the processing pipeline"""

import multiprocessing
import time
import traceback
from multiprocessing import Queue
from multiprocessing.sharedctypes import Value

from pymepix.core.log import ProcessLogger


[docs] class BasePipelineObject(multiprocessing.Process): """Base class for integration in a processing pipeline Parameters ------------ name: str Name used for logging input_queue: :obj:`multiprocessing.Queue`, optional Data queue to perform work on (usually) from previous step in processing pipeline create_output: bool, optional Whether this creates its own output queue to pass data, ignored if (Default: True) num_outputs: int,optional Used with create_output, number of output queues to create (Default: 1) shared_output: :obj:`multiprocessing.Queue`, optional Data queue to pass results into, useful when multiple processes can put data into the same queue (such as results from centroiding). Ignored if create_output is True (Default: None) propogate_input: bool Whether the input data should be propgated further down the chain """
[docs] @classmethod def hasOutput(cls): """Defines whether this class can output results or not, e.g. Centroiding can output results but file writing classes do not Returns --------- bool Whether results are generated """ return True
def __init__( self, name, input_queue=None, create_output=True, num_outputs=1, shared_output=None, propogate_input=True, ): self.log = ProcessLogger(name) multiprocessing.Process.__init__(self) self.input_queue = input_queue self.output_queue = [] self._propgate_input = propogate_input if shared_output is not None: self.log.debug("Queue is shared") if type(shared_output) is list: self.log.debug("Queue {} is a list") self.output_queue.extend(shared_output) else: self.output_queue.append(shared_output) elif create_output: self.log.debug("Creating Queue") for x in range(num_outputs): self.output_queue.append(Queue()) self._enable = Value("I", 1) @property def outputQueues(self): """Exposes the outputs so they may be connected to the next step Returns --------- :obj:`list` of :obj:`multiprocessing.Queue` All of the outputs """ return self.output_queue @property def enable(self): """Enables processing Determines whether the class will perform processing, this has the result of signalling the process to terminate. If there are objects ahead of it then they will stop receiving data if an input queue is required then it will get from the queue before checking processing This is done to prevent the queue from growing when a process behind it is still working Parameters ----------- value : bool Enable value Returns ----------- bool: Whether the process is enabled or not """ return bool(self._enable.value) @enable.setter def enable(self, value): self.log.debug("Setting enabled flag to {}".format(value)) self._enable.value = int(value)
[docs] def pushOutput(self, data_type, data): """Pushes results to output queue (if available) Parameters ----------- data_type : int Identifier for data type (see :obj:`MeesageType` for types) data : any Results from processing (must be picklable) """ # self.log.debug('Pushing output {} {} to {}'.format(data_type,data,self.output_queue)) for x in self.output_queue: if x is not None: x.put((data_type, data))
[docs] def process(self, data_type=None, data=None): """Main processing function, override this do perform work To perform work within the pipeline, a class must override this function. General guidelines include, check for correct data type, and must return None for both if no output is given. """ self.log.debug("I AM PROCESSING") time.sleep(0.1) return None, None
[docs] def pre_run(self): """Function called before main processing loop, override to""" pass
[docs] def post_run(self): """Function called after main processing loop, override to""" return None, None
[docs] def run(self): self.pre_run() while True: enabled = self.enable try: if self.input_queue is not None: self.log.debug("Getting value from input queue") value = self.input_queue.get() if value is None: self.log.debug("Value is None") # Put it back in the queue and leave self.input_queue.put(None) break data_type, data = value if enabled: output_type, result = self.process(data_type, data) if self._propgate_input: self.pushOutput(*value) else: if enabled: output_type, result = self.process() else: self.log.debug("I AM LEAVING") break if output_type is not None and result is not None and enabled: self.pushOutput(output_type, result) except Exception as e: self.log.error("Exception occured!!!") self.log.error(e, exc_info=True) self.log.error(traceback.format_exc()) break output_type, result = self.post_run() if output_type is not None and result is not None: self.pushOutput(output_type, result) self.log.info("Job complete")
[docs] def main(): import logging import time # Create the logger logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) proc = BasePipelineObject("Base") proc.start() time.sleep(2.0) proc.enable = False logging.info("DISABLED") time.sleep(2.0) proc.terminate() proc.join()
if __name__ == "__main__": main()