Source code for pymepix.channel.data_channel

import threading
import zmq
import queue

from pymepix.channel.channel_types import ChannelDataType
from pymepix.processing.datatypes import MessageType


[docs] class Data_Channel(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.daemon = True self.q = queue.Queue() self.socket = None self.address = None self.bound = False self.lock = threading.Lock()
[docs] def public_address(self): return f"tcp://{self.socket.gethostname()}:{self.port}"
[docs] def register(self, api_address): with self.lock: if self.socket is None: self.address = api_address host, s_port = api_address.split("//")[-1].split(":") self.port = int(s_port) context = zmq.Context() self.socket = context.socket(zmq.PUB) self.socket.bind(f"tcp://{host}:{s_port}")
[docs] def unregister(self): with self.lock: if self.socket is not None: self.socket.unbind(self.address) self.address = None self.socket = None
[docs] def stop(self): self.q.put(None)
[docs] def run(self): while True: new_data = self.q.get() if new_data is None: break with self.lock: if self.socket is not None: self.socket.send_pyobj(new_data)
[docs] def send(self, data_type, data): if data_type == ChannelDataType.COMMAND: self.q.put_nowait({"type": data_type.value, "data": data.value}) else: self.q.put_nowait({"type": data_type.value, "data": data})
[docs] def send_data_by_message_type(self, message_type, data): if message_type == MessageType.PixelData: self.q.put_nowait({"type": ChannelDataType.PIXEL.value, "data": data}) elif message_type == MessageType.EventData: self.q.put_nowait({"type": ChannelDataType.TOF.value, "data": data}) elif message_type == MessageType.CentroidData: self.q.put_nowait({"type": ChannelDataType.CENTROID.value, "data": data})