Source code for pymepix.processing.pipeline_packet_processor
# This file is part of Pymepix## In all scientific work using Pymepix, please reference it as## A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)# https://doi.org/10.1088/1748-0221/14/10/P10003# https://arxiv.org/abs/1905.07999## Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU# General Public License as published by the Free Software Foundation, either version 3 of the# License, or (at your option) any later version.## This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU# General Public License for more details.## You should have received a copy of the GNU General Public License along with this program. If not,# see <https://www.gnu.org/licenses/>.importzmqimportpymepix.config.load_configascfgfrompymepix.processing.datatypesimportMessageTypefrompymepix.processing.logic.shared_processing_parameterimport(SharedProcessingParameter,)from.basepipelineimportBasePipelineObjectfrom.logic.packet_processorimportPacketProcessor
[docs]classPipelinePacketProcessor(BasePipelineObject):"""Processes Pixel packets for ToA, ToT, triggers and events This class, creates a UDP socket connection to SPIDR and receives the UDP packets from Timepix It then pre-processes them and sends them off for more processing """def__init__(self,packet_processor:PacketProcessor=PacketProcessor(parameter_wrapper_class=SharedProcessingParameter),input_queue=None,create_output=True,num_outputs=1,shared_output=None,):# set input_queue to None for now, or baseaqusition.build would have to be modified# input_queue is replace by zmqsuper().__init__(PipelinePacketProcessor.__name__,input_queue=None,create_output=create_output,num_outputs=num_outputs,shared_output=shared_output,)self.packet_processor=packet_processor
[docs]defprocess(self,data_type=None,data=None):# timestamps are not required for online processing# event_data, pixel_data, timestamps, triggersevent_data,pixel_data,_timestamps,_=self.packet_processor.process(self._packet_sock.recv(copy=False))ifpixel_dataisnotNone:self.pushOutput(MessageType.PixelData,pixel_data)ifevent_dataisnotNone:returnMessageType.EventData,event_datareturnNone,None