Source code for pymepix.timepix4device

# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.

from multiprocessing.sharedctypes import Value

from pymepix.processing.acquisition import PixelPipeline

from .config import TimepixConfig
from .core.log import Logger
from .timepix4def import *


[docs] class ConfigClassException(Exception): pass
[docs] class BadPixelFormat(Exception): pass
[docs] class Timepix4Device: """Provides a control of a timepix4 object""" def __init__(self, tpx4_device, data_queue, pipeline_class=PixelPipeline): self._device = tpx4_device self.log = Logger("Timepix " + self.devIdToString()) self._data_queue = data_queue self._udp_address = (self._device.ipAddrDest, self._device.serverPort) self.log.info("UDP Address is {}:{}".format(*self._udp_address)) self.camera_generation = 4 # self._device.reset() # self._device.reinitDevice() self._longtime = Value("L", 0) self.setupAcquisition(pipeline_class) self._run_timer = True self._pause_timer = False self.setEthernetFilter(0xFFFF) # Start the timer thread self._acq_running = False @property def config(self): return self.__config
[docs] def setupAcquisition(self, acquisition_klass, *args, **kwargs): self.log.info("Setting up acquisition class") self._acquisition_pipeline = acquisition_klass( self._data_queue, self._udp_address, self._longtime, camera_generation=self.camera_generation, *args, **kwargs, )
def _initDACS(self): pass # self.setConfigClass(DefaultConfig) # self.loadConfig() # self.setConfigClass(SophyConfig)
[docs] def setConfigClass(self, klass: TimepixConfig): pass if issubclass(klass, TimepixConfig): self._config_class = klass else: raise ConfigClassException
[docs] def loadConfig(self, *args, **kwargs): """Loads dac settings from the Config class""" pass self.__config = self._config_class(*args, **kwargs) for code, value in self.__config.dacCodes(): self.log.info("Setting DAC {},{}".format(code, value)) self.setDac(code, value) if self.__config.thresholdPixels is not None: self.pixelThreshold = self.__config.thresholdPixels if self.__config.maskPixels is not None: self.pixelMask = self.__config.maskPixels if self.__config.testPixels is not None: self.pixelTest = self.__config.testPixels self.uploadPixels() self.refreshPixels()
[docs] def setupDevice(self): """ Sets up valid parameters for acquisition This will be manual when other acquisition parameters are working """ pass self.log.debug("Setting up acqusition")
# self.polarity = Polarity.Positive # self.log.debug("Polarity set to {}".format(Polarity(self.polarity))) # self.operationMode = OperationMode.ToAandToT # self.log.debug("OperationMode set to {}".format(OperationMode(self.operationMode))) # self.grayCounter = GrayCounter.Enable # self.log.debug("GrayCounter set to {}".format(GrayCounter(self.grayCounter))) # self.superPixel = SuperPixel.Enable # self.log.debug("SuperPixel set to {}".format(SuperPixel(self.superPixel))) # pll_cfg = 0x01E | 0x100 # self._device.pllConfig = pll_cfg # self._device.setTpPeriodPhase(10,0) # self._device.tpNumber = 1 # self._device.columnTestPulseRegister @property def acquisition(self): """Returns the acquisition object Can be used to set parameters in the acqusition directly for example, to setup TOF calculation when using a :class:`PixelPipeline` >>> tpx.acqusition.enableEvents False >>> tpx.acquistion.enableEvents = True """ return self._acquisition_pipeline
[docs] def pauseHeartbeat(self): pass
[docs] def resumeHeartbeat(self): pass
[docs] def devIdToString(self): """Converts device ID into readable string Returns -------- str Device string identifier """ # NEEDS IMPLEMENTATION return ""
@property def deviceName(self): return self.devIdToString()
[docs] def setEthernetFilter(self, eth_filter): """Sets the packet filter, usually set to 0xFFFF to all all packets""" eth_mask, cpu_mask = self._device.headerFilter eth_mask = eth_filter self._device.setHeaderFilter(eth_mask, cpu_mask) eth_mask, cpu_mask = self._device.headerFilter self.log.info("Dev: {} eth_mask :{:8X} cpu_mask: {:8X}".format(self._device.deviceId, eth_mask, cpu_mask))
[docs] def resetPixels(self): """Clears pixel configuration""" # NEEDS IMPLEMENTATION pass
# self._device.clearPixelConfig() # self._device.resetPixels() @property def pixelThreshold(self): """Threshold set for timepix device Parameters ---------- value : :obj:`numpy.array` of :obj:`int` 256x256 uint8 threshold to set locally Returns ----------- :obj:`numpy.array` of :obj:`int` or :obj:`None`: Locally stored threshold matrix """ # NEEDS IMPLEMENTATION pass # self._device.getPixelConfig() # return self._device._pixel_threshold @pixelThreshold.setter def pixelThreshold(self, value): # NEEDS IMPLEMENTATION pass # self._device._pixel_threshold = value @property def pixelMask(self): """Pixel mask set for timepix device Parameters ---------- value : :obj:`numpy.array` of :obj:`int` 256x256 uint8 threshold mask to set locally Returns ----------- :obj:`numpy.array` of :obj:`int` or :obj:`None`: Locally stored pixel mask matrix """ # NEEDS IMPLEMENTATION pass # self._device.getPixelConfig() # return self._device._pixel_mask @pixelMask.setter def pixelMask(self, value): # NEEDS IMPLEMENTATION pass # self._device._pixel_mask = value @property def pixelTest(self): """Pixel test set for timepix device Parameters ---------- value : :obj:`numpy.array` of :obj:`int` 256x256 uint8 pixel test to set locally Returns ----------- :obj:`numpy.array` of :obj:`int` or :obj:`None`: Locally stored pixel test matrix """ # NEEDS IMPLEMENTATION pass # self._device.getPixelConfig() # return self._device._pixel_test @pixelTest.setter def pixelTest(self, value): pass # self._device._pixel_test = value
[docs] def uploadPixels(self): """Uploads local pixel configuration to timepix""" pass
# self._device.uploadPixelConfig()
[docs] def refreshPixels(self): """Loads timepix pixel configuration to local array""" pass
# self._device.getPixelConfig()
[docs] def start(self): self.stop() self.log.info("Beginning acquisition") self.resumeHeartbeat() if self._acquisition_pipeline is not None: self._acquisition_pipeline.start() self._acq_running = True
[docs] def stop(self): if self._acq_running: self.log.info("Stopping acquisition") if self._acquisition_pipeline is not None: self._acquisition_pipeline.stop() self.pauseHeartbeat() self._acq_running = False
[docs] def start_recording(self, path): udp_sampler = self._acquisition_pipeline._stages[0] udp_sampler._pipeline_objects[0].record = True udp_sampler.udp_sock.send_string(path) res = udp_sampler.udp_sock.recv_string() if res == "OPENED": path = udp_sampler.udp_sock.recv_string() self.log.debug(f"Started recording to {path}") else: self.log.warning(f"Error while starting recording: {res}")
[docs] def stop_recording(self): pipeline = self._acquisition_pipeline._stages[0] pipeline._pipeline_objects[0].record = False pipeline._pipeline_objects[0].close_file = True res = pipeline.udp_sock.recv_string() if res == "CLOSED": self.log.info("Finished recording") else: self.log.warning(f"Error during recording: {res}")
@property def Vthreshold_fine(self): # NEEDS IMPLEMENTATION return 0 @Vthreshold_fine.setter def Vthreshold_fine(self, value): # NEEDS IMPLEMENTATION pass @property def Vthreshold_coarse(self): # NEEDS IMPLEMENTATION return 0 @Vthreshold_coarse.setter def Vthreshold_coarse(self, value): # NEEDS IMPLEMENTATION pass
[docs] def setDac(self, code, value): # NEEDS IMPLEMENTATION pass
''' # TODO: update this to Timepix4 def main(): import logging from multiprocessing import Queue from .SPIDR.spidrcontroller import SPIDRController logging.basicConfig(level=logging.INFO) end_queue = Queue() def get_queue_thread(queue): while True: value = queue.get() print(value) if value is None: break t = threading.Thread(target=get_queue_thread, args=(end_queue,)) t.daemon = True t.start() spidr = SPIDRController() timepix = Timepix4Device(spidr[0], end_queue) timepix.loadSophyConfig("/Users/alrefaie/Documents/repos/libtimepix/config/eq-norm-50V.spx") # spidr.shutterTriggerMode = SpidrShutterMode.Auto spidr.disableExternalRefClock() TdcEnable = 0x0000 spidr.setSpidrReg(0x2B8, TdcEnable) spidr.enableDecoders(True) spidr.resetTimers() spidr.restartTimers() spidr.datadrivenReadout() timepix.setupDevice() spidr.openShutter() timepix.start() time.sleep(4.0) timepix.stop() logging.info("DONE") spidr.closeShutter() if __name__ == "__main__": main() '''