Source code for pymepix.pymepix_connection

# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.
import threading
import time
from collections import deque
from multiprocessing import Queue

import pymepix.config.load_config as cfg
from pymepix.channel.channel_types import ChannelDataType, Commands
from pymepix.channel.data_channel import Data_Channel
from pymepix.core.log import Logger
from pymepix.processing.acquisition import PixelPipeline

from .SPIDR.spidrcontroller import SPIDRController
from .timepix4device import Timepix4Device
from .timepixdevice import TimepixDevice
from .TPX4.tpx4controller import Timepix4Controller


[docs] class PollBufferEmpty(Exception): pass
[docs] class PymepixConnection: """High level class to work with timepix and perform acquistion This class performs connection to SPIDR, initilization of timepix and handling of acquisition. Each individual timepix device can be accessed using the square bracket operator. Parameters ---------- spidr_address : :obj:`tuple` of :obj:`str` and :obj:`int` socket style tuple of SPIDR ip address and port src_ip_port : :obj:`tuple` of :obj:`str` and :obj:`int`, optional socket style tuple of the IP address and port of the interface that is connecting to SPIDR Examples -------- Startup device >>> timepix = PymepixConnection(('192.168.1.10',50000)) Find how many Timepix are connected >>> len(timepix) 1 Set the Bias voltage >>> timepix.biasVoltage = 50 Access a specific Timepix device: >>> timepix[0].deviceName W0026_K06 Load a config file into timepix >>> timepix[0].loadSophyConfig('W0026_K06_50V.spx') """
[docs] def data_thread(self): self.log.info("Starting data thread") while True: value = self._data_queue.get() self.log.debug("Popped value {}".format(value)) if value is None: break data_type, data = value self._event_callback(data_type, data) self._channel.send_data_by_message_type(data_type, data)
def __init__(self, pipeline_class=PixelPipeline): self.log = Logger("Pymepix") self._channel = Data_Channel() api_channel_address = (cfg.default_cfg['api_channel']['ip'], cfg.default_cfg['api_channel']['port']) self._channel.start() self._channel.register(f"tcp://{api_channel_address[0]}:{api_channel_address[1]}") controllerClass = self._timepix_controller_class_factory() self._controller = controllerClass() TimepixDeviceClass = self._timepix_device_class_factory() self._timepix_devices: list[TimepixDeviceClass] = [] self._data_queue = Queue() self._createTimepix(pipeline_class) self._controller.setBiasSupplyEnable(True) self.biasVoltage = 50 self.enablePolling() self._data_thread = threading.Thread(target=self.data_thread) self._data_thread.daemon = True self._data_thread.start() self._running = False self.cfg = cfg @property def biasVoltage(self): """Bias voltage in volts""" return self._bias_voltage @biasVoltage.setter def biasVoltage(self, value): self._bias_voltage = value self._controller.biasVoltage = value
[docs] def poll(self, block=False): """If polling is used, returns data stored in data buffer. the buffer is in the form of a ring and will overwrite older values if it becomes full Returns -------- :obj:`MessageType` , data """ if block: while True: try: return self._poll_buffer.popleft() except IndexError: time.sleep(0.5) continue else: try: return self._poll_buffer.popleft() except IndexError: raise PollBufferEmpty
@property def pollBufferLength(self): """Get/Set polling buffer length Clears buffer on set """ return self._poll_buffer.maxlen @pollBufferLength.setter def pollBufferLength(self, value): self.log.warning("Clearing polling buffer") self._poll_buffer = deque(maxlen=value) @property def dataCallback(self): """Function to call when data is received from a timepix device This has the effect of disabling polling. """ return self._event_callback @dataCallback.setter def dataCallback(self, value): self._event_callback = value self.log.warning("Clearing polling buffer") self._poll_buffer.clear()
[docs] def enablePolling(self, maxlen=100): """Enables polling mode This clears any user defined callbacks and the polling buffer """ self.log.info("Enabling polling") self.pollBufferLength = maxlen self.dataCallback = self._pollCallback
def _pollCallback(self, data_type, data): self._poll_buffer.append((data_type, data)) def _createTimepix(self, pipeline_class=PixelPipeline): TimepixDeviceClass = self._timepix_device_class_factory() for x in self._controller: status, enabled, locked = x.linkStatus if enabled != 0 and locked == enabled: self._timepix_devices.append(TimepixDeviceClass(x, self._data_queue, pipeline_class)) self._num_timepix = len(self._timepix_devices) self.log.info("Found {} Timepix/Medipix devices".format(len(self._timepix_devices))) for idx, tpx in enumerate(self._timepix_devices): self.log.info("Device {} - {}".format(idx, tpx.devIdToString()))
[docs] def start_recording(self, path): self._controller.resetTimers() self._controller.restartTimers() time.sleep(1) # give camera time to reset timers self._timepix_devices[0].start_recording(path) self._channel.send(ChannelDataType.COMMAND, Commands.START_RECORD)
[docs] def stop_recording(self): self._timepix_devices[0].stop_recording() self._channel.send(ChannelDataType.COMMAND, Commands.STOP_RECORD)
[docs] def start(self): """Starts acquisition""" if self._running is True: self.stop() self.log.info("Starting acquisition") self._controller.prepare() self._controller.resetTimers() self._controller.restartTimers() for t in self._timepix_devices: self.log.info("Setting up {}".format(t.deviceName)) t.setupDevice() self._controller.restartTimers() self._controller.openShutter() for t in self._timepix_devices: self.log.info("Starting {}".format(t.deviceName)) t.start() self._running = True
[docs] def stop(self): """Stops acquisition""" if self._running is False: return self.log.info("Stopping acquisition") trig_mode = 0 trig_length_us = 10000 trig_freq_hz = 5 nr_of_trigs = 1 self._controller.setShutterTriggerConfig(trig_mode, trig_length_us, trig_freq_hz, nr_of_trigs, 0) self._controller.closeShutter() self.log.debug("Closing shutter") for t in self._timepix_devices: self.log.debug("Stopping {}".format(t.deviceName)) t.stop() self._running = False
@property def isAcquiring(self): return self._running @property def numDevices(self): return self._num_timepix @property def chanAddress(self): return self._channel_address @chanAddress.setter def chanAddress(self, value): self._channel_address = value self._channel.unregister() self._channel.register(f"tcp://{value[0]}:{value[1]}") def __getitem__(self, key) -> TimepixDevice: return self._timepix_devices[key] def __len__(self): return len(self._timepix_devices)
[docs] def getDevice(self, num) -> TimepixDevice: return self._timepix_devices[num]
def _timepix_device_class_factory(self): camera_generation = cfg.default_cfg["timepix"]["camera_generation"] timepix_device_classes = {3: TimepixDevice, 4: Timepix4Device} if camera_generation in timepix_device_classes: return timepix_device_classes[camera_generation] else: raise ValueError(f"No timepix device for camera generation {camera_generation}") def _timepix_controller_class_factory(self): camera_generation = cfg.default_cfg["timepix"]["camera_generation"] timepix_controller_classes = {3: SPIDRController, 4: Timepix4Controller} if camera_generation in timepix_controller_classes: return timepix_controller_classes[camera_generation] else: raise ValueError(f"No timepix controller for camera generation {camera_generation}")