# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.
"""Module deals with managing processing objects to form a data pipeline"""
import os
import platform
import subprocess
import signal
import errno
import time
from multiprocessing import Queue
import zmq
import pymepix.config.load_config as cfg
from pymepix.core.log import Logger
from pymepix.processing.usbtrainid import USBTrainID
from .logic.processing_step import ProcessingStep
from pathlib import Path, PurePath
import yaml
from typing import TypeVar, Generic
from .pipeline_centroid_calculator import PipelineCentroidCalculator
from .pipeline_packet_processor import PipelinePacketProcessor
from .logic.centroid_calculator import CentroidCalculator
from .logic.packet_processor_factory import packet_processor_factory
camera_generation = cfg.default_cfg["timepix"]["camera_generation"]
if camera_generation == 3:
from .udpsampler3 import UdpSampler
elif camera_generation == 4:
from .udpsampler4 import UdpSampler
else:
raise ValueError(f"Unsupported camera generation: {camera_generation}")
[docs]
class AcquisitionStage:
"""Defines a single acquisition stage
Usually not created directly. Instead, it is created by :class:`AcquisitionPipeline`
Represent a single pipeline stage and handles management of queues and message passing
as well as creation and destruction of processing objects.
Processes are not created until build() is called and do not run until start() is called
Parameters
------------
stage: int
Initial position in the pipeline, lower stages are executed first
"""
def __init__(self, stage, num_processes=1):
# Logger.__init__(self, "AcqStage-{}".format(stage))
self.log = Logger("AcqStage-{}".format(stage))
self._stage_number = stage
self._pipeline_objects = []
self._pipeline_klass = None
self._num_processes = num_processes
self._running = False
self._input_queue = None
self._output_queue = None
self._args = []
self._kwargs = {}
@property
def stage(self):
"""Current position in the pipeline"""
return self._stage_number
@stage.setter
def stage(self, value):
self._stage_number = value
@property
def numProcess(self):
"""Number of processes to spawn when built
Parameters
----------
value: int
Number of processes to spawn when acquisition starts
Returns
----------
int:
Number of processes
"""
return self._num_processes
@numProcess.setter
def numProcess(self, value):
self._num_processes = max(1, value)
[docs]
def setArgs(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs
@property
def processes(self):
return self._pipeline_objects
[docs]
def build(self, input_queue=None, output_queue=None):
self._input_queue = input_queue
self._output_queue = output_queue
self.log.debug("Building stage with arguments {} {}".format(self._args, self._kwargs))
if self._output_queue is None:
self.log.debug("I am creating the queue")
self._output_queue = Queue()
else:
self.log.debug("Recieved the queue {}".format(output_queue))
self.log.debug("Building stage {} ".format(self._stage_number))
self.log.info("Creating {} processes".format(self._num_processes))
for n in range(self._num_processes):
p = self._pipeline_klass(
*self._args,
**self._kwargs,
input_queue=self._input_queue,
shared_output=self._output_queue,
)
p.daemon = True
self._pipeline_objects.append(p)
# if self._output_queue is None: do we really need it?
# self._output_queue = p.outputQueues()[-1]
@property
def outputQueue(self):
return self._output_queue
[docs]
def kill_process(self, pid:int, port):
self.log.info(f"Terminating process {pid} on port {port}")
while True:
try:
os.kill(int(pid), signal.SIGTERM)
time.sleep(1)
except OSError as e:
if e.errno == errno.ESRCH: # No such process
print(f"Process with PID {pid} has exited.")
break
elif e.errno == errno.EPERM: # Permission denied
print(f"No permission to check process with PID {pid}.")
break
else:
raise
[docs]
def start(self):
for p in self._pipeline_objects:
zmq_port = cfg.default_cfg["timepix"]["zmq_port"]
if p.name.find("UdpSampler-") > -1:
self.udp_sock = self.ctx.socket(zmq.PAIR)
# Check if we're on linux or mac
system = platform.system()
if system == "Linux" or system == "Darwin":
for port in [zmq_port, zmq_port+1]:
# Use lsof to find the process ID (PID) using the specified port
command = f"lsof -i :{port}"
result = subprocess.run(command, shell=True, capture_output=True, text=True)
# Extract the PID from the lsof output
for line in result.stdout.splitlines()[1:]:
parts = line.split()
if len(parts) > 1:
pid = parts[1]
response = input(f"Found process with PID {pid} already using port {port}, but it is needed by Pymepix. Pymepix may not function properly if this port isn't available. Terminate that process? (y/N): ").strip().lower()
if response == 'y':
self.kill_process(pid, port)
else:
# Find a way to check for windows (already found, but untested)
print(f"Cannot close open ports for operating system: {system}. If required ports are in use there may be difficulties. Rebooting the system may be required.")
self.udp_sock.bind(f"tcp://127.0.0.1:{zmq_port}")
self.log.info(f'IPC ZMQ bind on "tcp://127.0.0.1:{zmq_port}"')
if cfg.default_cfg["trainID"]["connected"]:
self.train_sock = self.ctx.socket(zmq.PAIR)
self.train_sock.bind(f"ipc:///tmp/train_sock{zmq_port}")
self.log.info(f'trainID bind on "ipc:///tmp/train_sock{zmq_port}"')
self.startTrainID()
p.start()
[docs]
def stop(self, force=False):
self.log.info("Stopping stage {}".format(self.stage))
if self._input_queue is not None:
# Put a none in and join all threads
self._input_queue.put(None)
for idx, p in enumerate(self._pipeline_objects):
p.enable = False
self.log.info("Waiting for process {}".format(idx))
p.join(1.0)
p.terminate()
p.join()
self.log.info("Process stop complete")
if self._input_queue.get() is not None:
self.log.error("Queue should only contain None!!")
raise Exception("Queue contains more data")
self._input_queue.close()
else:
for p in self._pipeline_objects:
if p.name.find("UdpSampler-") > -1:
self.log.debug(f"closing zmq socket for tcp://127.0.0.1:{cfg.default_cfg['timepix']['zmq_port']}")
self.udp_sock.close()
if cfg.default_cfg["trainID"]["connected"]:
self.stopTrainID()
self.log.debug(f"closing zmq socket for icp:///tmp/train_sock{cfg.default_cfg['timepix']['zmq_port']}")
self.train_sock.close()
p.enable = False
self.log.info("Joining thread {}".format(p))
p.join(1.0)
p.terminate()
p.join()
self.log.info("Join complete")
self.log.info("Stop complete")
self._pipeline_objects = []
[docs]
def startTrainID(self):
self.log.info("start USBTrainID process")
# generate worker to save the data directly to disk
self._trainIDRec = USBTrainID()
self._trainIDRec.start()
[docs]
def stopTrainID(self):
self.log.info("stopping USBTrainID process")
self.train_sock.send_string("STOP RECORDING")
self.train_sock.send_string("SHUTDOWN")
self._trainIDRec.join(2.0) # file still needs to be saved
self._trainIDRec.terminate()
self._trainIDRec.join()
[docs]
class AcquisitionPipeline:
"""Class that manages various stages"""
def __init__(self, name, data_queue):
self.log = Logger(name + " AcqPipeline")
self.log.info("Initializing pipeline")
self._stages = []
self._data_queue = data_queue
self._running = False
[docs]
def addStage(self, stage_number, pipeline_klass, *args, num_processes=1, **kwargs):
"""Adds a stage to the pipeline"""
stage = AcquisitionStage(stage_number, num_processes)
self.log.info("Adding stage {} with klass {}".format(stage_number, pipeline_klass))
stage.configureStage(pipeline_klass, *args, **kwargs)
self._stages.append(stage)
self._stages = sorted(self._stages, key=lambda x: x.stage)
[docs]
def getStage(self, stage_number) -> AcquisitionStage:
for x in self._stages:
if x.stage == stage_number:
return x
return None
@property
def stages(self):
return self._stages
[docs]
def start(self):
"""Starts all stages"""
# Sort them by stage number
self.log.info("Starting acquisition")
# Build them
previous_stage = None
last_index = len(self._stages) - 1
self.log.debug("Last index is {}".format(last_index))
for idx, s in enumerate(self._stages):
self.log.debug("Building stage {} {}".format(idx, s.stage))
if previous_stage is not None:
queues = previous_stage.outputQueue
self.log.debug("Queues: {}".format(queues))
if idx != last_index:
s.build(input_queue=queues)
else:
self.log.debug("This is the last queue so output is the last one")
s.build(input_queue=queues, output_queue=self._data_queue)
else:
if idx != last_index:
s.build()
else:
self.log.info("First stage shares output")
s.build(output_queue=self._data_queue)
previous_stage = s
self.log.debug("Last stage is {}".format(s))
for s in self._stages:
s.enable = True
s.start()
self._running = True
@property
def isRunning(self):
return self._running
[docs]
def stop(self):
"""Stops all stages"""
self.log.info("Stopping acquisition")
self.log.debug(self._stages)
if self._running is True:
for s in self._stages:
s.stop()
self._running = False
[docs]
def getConfigFromYaml(config:str, config_file:str="processing/pipelines.yaml"):
"""Loads pipeline config from yaml"""
pipelines_file = Path(PurePath(__file__).parent.parent, config_file)
if Path(pipelines_file).is_file():
with open(pipelines_file, "r") as f:
pipeline_cfg = yaml.safe_load(f)
return pipeline_cfg[config]
else:
raise FileExistsError(f"The given file {Path(pipelines_file)} could not be found")
[docs]
def pipelineFactory(config, data_queue, longtime, name=None, use_event=None, event_window:tuple[float, float]=None):
"""Creates an AcquisitionPipeline, configures it and returns it. Any parameters left as 'None' will be taken from the given configuration.
Parameters:
use_event (boolean): If packets are forwarded to the centroiding. If True centroids are calculated."""
if name is None:
name = config['name']
if use_event is None:
use_event = config['use_event']
if event_window is None:
event_window = (config['event_window'][0], config['event_window'][1])
pipeline = AcquisitionPipeline(name=name, data_queue=data_queue)
pipeline.confStages(config['stages'], longtime, use_event, event_window)
return pipeline
[docs]
def confStages(self, stages, longtime, use_event, event_window:tuple[float, float]):
"""Adds and configures multiple stages, including one UdpSampler, as well as every stage specified in the the 'stages' paramter.
'stages' shall be a dictionary with one key 'class', which contains as value either 'pixel' or 'centroid' and one key 'num_processes', which shall hold an integer as value"""
index = 0
for stage in stages:
match(stage['class']):
case "udp_sampler":
self.addStage(0, UdpSampler, longtime)
case "pixel":
PacketProcessorClass = packet_processor_factory()
processing_step: ProcessingStep = PacketProcessorClass(handle_events=use_event, event_window=event_window)
pipeline_klass = PipelinePacketProcessor
case "centroid":
processing_step: ProcessingStep = CentroidCalculator()
pipeline_klass = PipelineCentroidCalculator
case _:
raise ValueError(f"Class must be either 'udp_sampler', 'pixel', or 'centroid'. Got {stage['class']}")
if stage['class'] != "udp_sampler":
self.addStage(index, processing_step, num_processes=stage['num_processes'])
self.getStage(index).configureStage(pipeline_klass=pipeline_klass, processing_step=processing_step)
index += 2
T = TypeVar('T')
[docs]
def getStageByPipelineObject(self, T) -> AcquisitionStage:
"""Returns an Acquisition Stage with a Pipeline Object of the given type from this pipeline if available. Otherwise returns 'None'"""
stage: AcquisitionStage
for stage in self._stages:
if stage._pipeline_klass == T:
return stage
return None
@property
def numBlobProcesses(self):
"""Number of python processes to spawn for centroiding if Centroid Calculator is used
Setting this will spawn the appropriate number of processes to perform centroiding.
Changes take effect on next acquisition.
"""
centroidCalculator = self.getStageByPipelineObject(PipelineCentroidCalculator)
if centroidCalculator is None:
return 0
else:
return centroidCalculator.numProcess
@numBlobProcesses.setter
def numBlobProcesses(self, value):
centroidCalculator = self.getStageByPipelineObject(PipelineCentroidCalculator)
if centroidCalculator is not None:
centroidCalculator.numProcess = max(1, value)
[docs]
def main():
import logging
import threading
import time
from multiprocessing.sharedctypes import Value
from pymepix.processing.pipeline_packet_processor import PipelinePacketProcessor
camera_generation = cfg.default_cfg["timepix"]["camera_generation"]
if camera_generation == 3:
from .udpsampler3 import UdpSampler
elif camera_generation == 4:
from .udpsampler4 import UdpSampler
else:
# log.critical(f"Unsupported camera generation: {camera_generation}") # TODO: find correct logger
raise ValueError(f"Unsupported camera generation: {camera_generation}")
# Create the logger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
end_queue = Queue()
acqpipline = AcquisitionPipeline("Test", end_queue)
longtime = Value("I", 0)
acqpipline.addStage(0, UdpSampler, longtime)
acqpipline.addStage(2, PipelinePacketProcessor, num_processes=4)
def get_queue_thread(queue):
while True:
value = queue.get()
# messType, data = value
# recieved.append(value[1])
# print(value)
if value is None:
break
t = threading.Thread(target=get_queue_thread, args=(end_queue,))
t.daemon = True
t.start()
acqpipline.start()
time.sleep(120.0)
acqpipline.stop()
end_queue.put(None)
t.join()
print("Done")
if __name__ == "__main__":
main()