Source code for pymepix.channel.data_channel

import threading
import zmq
import queue

from pymepix.channel.channel_types import ChannelDataType
from pymepix.processing.datatypes import MessageType

import os
import platform
import subprocess
import signal
import errno
import time

[docs] class Data_Channel(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.daemon = True self.q = queue.Queue() self.socket = None self.address = None self.bound = False self.lock = threading.Lock()
[docs] def public_address(self): return f"tcp://{self.socket.gethostname()}:{self.port}"
[docs] def register(self, api_address): with self.lock: if self.socket is None: self.address = api_address host, s_port = api_address.split("//")[-1].split(":") self.port = int(s_port) context = zmq.Context() self.socket = context.socket(zmq.PUB) system = platform.system() if system == "Linux" or system == "Darwin": for port in [25000, 25001]: # Use lsof to find the process ID (PID) using the specified port command = f"lsof -i :{port}" result = subprocess.run(command, shell=True, capture_output=True, text=True) # Extract the PID from the lsof output for line in result.stdout.splitlines()[1:]: parts = line.split() if len(parts) > 1: pid = parts[1] response = input(f"Found process with PID {pid} already using {port}, but it is needed by Pymepix. Pymepix may not function properly if this port isn't available. Terminate that process? (y/N): ").strip().lower() if response == 'y': print(f"Terminating process {pid} on port {port}") # Terminate process while True: try: os.kill(int(pid), signal.SIGTERM) time.sleep(1) except OSError as e: if e.errno == errno.ESRCH: # No such process print(f"Process with PID {pid} has exited.") break elif e.errno == errno.EPERM: # Permission denied print(f"No permission to check process with PID {pid}.") break else: raise else: #Find a way to check for windows (already found, but untested) print(f"Cannot close open ports for operating system: {system}. If required ports are in use there may be difficulties. Rebooting the system may be required.") self.socket.bind(f"tcp://{host}:{s_port}")
[docs] def unregister(self): with self.lock: if self.socket is not None: self.socket.unbind(self.address) self.address = None self.socket = None
[docs] def stop(self): self.q.put(None)
[docs] def run(self): while True: new_data = self.q.get() if new_data is None: break with self.lock: if self.socket is not None: self.socket.send_pyobj(new_data)
[docs] def send(self, data_type, data): if data_type == ChannelDataType.COMMAND: self.q.put_nowait({"type": data_type.value, "data": data.value}) else: self.q.put_nowait({"type": data_type.value, "data": data})
[docs] def send_data_by_message_type(self, message_type, data): if message_type == MessageType.PixelData: self.q.put_nowait({"type": ChannelDataType.PIXEL.value, "data": data}) elif message_type == MessageType.EventData: self.q.put_nowait({"type": ChannelDataType.TOF.value, "data": data}) elif message_type == MessageType.CentroidData: self.q.put_nowait({"type": ChannelDataType.CENTROID.value, "data": data})