Source code for pymepix.timepixdevice

# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.

import threading
import time
from multiprocessing.sharedctypes import Value

from pymepix.processing.acquisition import PixelPipeline

from .config import DefaultConfig, SophyConfig, TimepixConfig

# from .config.sophyconfig import SophyConfig
from .core.log import Logger
from .timepixdef import *
import numpy as np


[docs] class ConfigClassException(Exception): pass
[docs] class BadPixelFormat(Exception): pass
[docs] class TimepixDevice: """Provides high level control of a timepix/medipix object"""
[docs] def update_timer(self): """Heartbeat thread""" self.log.info("Heartbeat thread starting") while self._run_timer: while self._pause_timer and self._run_timer: time.sleep(1.0) continue self._timer_lsb, self._timer_msb = self._device.timer self._timer = (self._timer_msb & 0xFFFFFFFF) << 32 | (self._timer_lsb & 0xFFFFFFFF) self._longtime.value = self._timer self.log.debug( "Reading heartbeat LSB: {} MSB: {} TIMER: {} ".format(self._timer_lsb, self._timer_msb, self._timer) ) time.sleep(1.0)
def __init__(self, spidr_device, data_queue, pipeline_class=PixelPipeline): self._device = spidr_device self.log = Logger(TimepixDevice.__name__) self._data_queue = data_queue _udp_address = (self._device.ipAddrDest, self._device.serverPort) self.log.info("UDP Address is {}:{}".format(*_udp_address)) self._pixel_offset_coords = (0, 0) self._device.reset() self._device.reinitDevice() self._longtime = Value("L", 0) self.setupAcquisition(pipeline_class) self._initDACS() self._event_callback = None self._run_timer = True self._pause_timer = False self.setEthernetFilter(0xFFFF) # Start the timer thread self._timer_thread = threading.Thread(target=self.update_timer) self._timer_thread.daemon = True self._timer_thread.start() self.pauseHeartbeat() self._acq_running = False @property def config(self): return self.__config @property def maskPixelsJsonable(self): return [list(i) for i in self.__config.maskPixels.astype(str)] @maskPixelsJsonable.setter def maskPixelsJsonable(self, value): self.__config.maskPixels = np.asarray(value, dtype=np.uint8)
[docs] def setupAcquisition(self, acquisition_klass, *args, **kwargs): self.log.info("Setting up acquisition class") self._acquisition_pipeline = acquisition_klass(self._data_queue, self._longtime, *args, **kwargs)
def _initDACS(self): self.setConfigClass(DefaultConfig) self.loadConfig() self.setConfigClass(SophyConfig)
[docs] def setConfigClass(self, klass): if type(klass) is str: klass = globals()[klass] if issubclass(klass, TimepixConfig): self._config_class = klass else: raise ConfigClassException
[docs] def loadConfig(self, *args, **kwargs): """Loads dac settings from the Config class""" self.__config = self._config_class(*args, **kwargs) for code, value in self.__config.dacCodes(): self.log.info("Setting DAC {},{}".format(code, value)) self.setDac(code, value) if self.__config.thresholdPixels is not None: self.pixelThreshold = self.__config.thresholdPixels if self.__config.maskPixels is not None: self.pixelMask = self.__config.maskPixels if self.__config.testPixels is not None: self.pixelTest = self.__config.testPixels self.uploadPixels() self.refreshPixels()
[docs] def setupDevice(self): """ Sets up valid paramters for acquisition This will be manual when other acqusition parameters are working """ self.log.debug("Setting up acqusition") self.polarity = Polarity.Positive self.log.debug("Polarity set to {}".format(Polarity(self.polarity))) self.operationMode = OperationMode.ToAandToT self.log.debug("OperationMode set to {}".format(OperationMode(self.operationMode))) self.grayCounter = GrayCounter.Enable self.log.debug("GrayCounter set to {}".format(GrayCounter(self.grayCounter))) self.superPixel = SuperPixel.Enable self.log.debug("SuperPixel set to {}".format(SuperPixel(self.superPixel))) pll_cfg = 0x01E | 0x100 self._device.pllConfig = pll_cfg
# self._device.setTpPeriodPhase(10,0) # self._device.tpNumber = 1 # self._device.columnTestPulseRegister @property def acquisition(self): """Returns the acquisition object Can be used to set parameters in the acqusition directly for example, to setup TOF calculation when using a :class:`PixelPipeline` >>> tpx.acqusition.enableEvents False >>> tpx.acquistion.enableEvents = True """ return self._acquisition_pipeline
[docs] def pauseHeartbeat(self): self._pause_timer = True
[docs] def resumeHeartbeat(self): self._pause_timer = False
[docs] def devIdToString(self): """Converts device ID into readable string Returns -------- str Device string identifier """ devId = self._device.deviceId waferno = (devId >> 8) & 0xFFF id_y = (devId >> 4) & 0xF id_x = (devId >> 0) & 0xF return "W{:04d}_{}{:02d}".format(waferno, chr(ord("A") + id_x - 1), id_y)
@property def deviceName(self): return self.devIdToString()
[docs] def setEthernetFilter(self, eth_filter): """Sets the packet filter, usually set to 0xFFFF to all packets""" eth_mask, cpu_mask = self._device.headerFilter eth_mask = eth_filter self._device.setHeaderFilter(eth_mask, cpu_mask) eth_mask, cpu_mask = self._device.headerFilter self.log.info("Dev: {} eth_mask :{:8X} cpu_mask: {:8X}".format(self._device.deviceId, eth_mask, cpu_mask))
[docs] def resetPixels(self): """Clears pixel configuration""" self._device.clearPixelConfig() self._device.resetPixels()
@property def pixelThreshold(self): """Threshold set for timepix device Parameters ---------- value : :obj:`numpy.array` of :obj:`int` 256x256 uint8 threshold to set locally Returns ----------- :obj:`numpy.array` of :obj:`int` or :obj:`None`: Locally stored threshold matrix """ # self._device.getPixelConfig() return self._device._pixel_threshold @pixelThreshold.setter def pixelThreshold(self, value): self._device._pixel_threshold = value @property def pixelMask(self): """Pixel mask set for timepix device Parameters ---------- value : :obj:`numpy.array` of :obj:`int` 256x256 uint8 threshold mask to set locally Returns ----------- :obj:`numpy.array` of :obj:`int` or :obj:`None`: Locally stored pixel mask matrix """ # self._device.getPixelConfig() return self._device._pixel_mask @pixelMask.setter def pixelMask(self, value): self._device._pixel_mask = value @property def pixelTest(self): """Pixel test set for timepix device Parameters ---------- value : :obj:`numpy.array` of :obj:`int` 256x256 uint8 pixel test to set locally Returns ----------- :obj:`numpy.array` of :obj:`int` or :obj:`None`: Locally stored pixel test matrix """ # self._device.getPixelConfig() return self._device._pixel_test @pixelTest.setter def pixelTest(self, value): self._device._pixel_test = value
[docs] def uploadPixels(self): """Uploads local pixel configuration to timepix""" self._device.uploadPixelConfig()
[docs] def refreshPixels(self): """Loads timepix pixel configuration to local array""" self._device.getPixelConfig()
[docs] def start(self): self.stop() self.log.info("Beginning acquisition") self.resumeHeartbeat() if self._acquisition_pipeline is not None: self._acquisition_pipeline.start() self._acq_running = True
[docs] def stop(self): if self._acq_running: self.log.info("Stopping acquisition") if self._acquisition_pipeline is not None: self._acquisition_pipeline.stop() self.pauseHeartbeat() self._acq_running = False
[docs] def start_recording(self, path): udp_sampler = self._acquisition_pipeline._stages[0] udp_sampler._pipeline_objects[0].record = True udp_sampler.udp_sock.send_string(path) res = udp_sampler.udp_sock.recv_string() if res == "OPENED": path = udp_sampler.udp_sock.recv_string() self.log.debug(f"Started recording to {path}") else: self.log.warning(f"Error while starting recording: {res}")
[docs] def stop_recording(self): pipeline = self._acquisition_pipeline._stages[0] pipeline._pipeline_objects[0].record = False pipeline._pipeline_objects[0].close_file = True res = pipeline.udp_sock.recv_string() if res == "CLOSED": self.log.info("Finished recording") else: self.log.warning(f"Error during recording: {res}")
# -----General Configuration------- @property def polarity(self): return Polarity(self._device.genConfig & 0x1) @polarity.setter def polarity(self, value): gen_config = self._device.genConfig self._device.genConfig = (gen_config & ~1) | value @property def operationMode(self): return OperationMode(self._device.genConfig & OperationMode.Mask) @operationMode.setter def operationMode(self, value): gen_config = self._device.genConfig self._device.genConfig = (gen_config & ~OperationMode.Mask) | (value) @property def grayCounter(self): return GrayCounter(self._device.genConfig & GrayCounter.Mask) @grayCounter.setter def grayCounter(self, value): gen_config = self._device.genConfig self._device.genConfig = (gen_config & ~GrayCounter.Mask) | (value) @property def testPulse(self): return TestPulse(self._device.genConfig & TestPulse.Mask) @testPulse.setter def testPulse(self, value): gen_config = self._device.genConfig self._device.genConfig = (gen_config & ~TestPulse.Mask) | (value) @property def superPixel(self): return SuperPixel(self._device.genConfig & SuperPixel.Mask) @superPixel.setter def superPixel(self, value): gen_config = self._device.genConfig self._device.genConfig = (gen_config & ~SuperPixel.Mask) | (value) @property def timerOverflowControl(self): return TimerOverflow(self._device.genConfig & TimerOverflow.Mask) @timerOverflowControl.setter def timerOverflowControl(self, value): gen_config = self._device.genConfig self._device.genConfig = (gen_config & ~TimerOverflow.Mask) | (value) @property def testPulseDigitalAnalog(self): return TestPulseDigAnalog(self._device.genConfig & TestPulseDigAnalog.Mask) @testPulseDigitalAnalog.setter def testPulseDigitalAnalog(self, value): gen_config = self._device.genConfig self._device.genConfig = (gen_config & ~TestPulseDigAnalog.Mask) | (value) @property def testPulseGeneratorSource(self): return TestPulseGenerator(self._device.genConfig & TestPulseGenerator.Mask) @testPulseGeneratorSource.setter def testPulseGeneratorSource(self, value): gen_config = self._device.genConfig self._device.genConfig = (gen_config & ~TestPulseGenerator.Mask) | (value) @property def timeOfArrivalClock(self): return TimeofArrivalClock(self._device.genConfig & TimeofArrivalClock.Mask) @timeOfArrivalClock.setter def timeOfArrivalClock(self, value): gen_config = self._device.genConfig self._device.genConfig = (gen_config & ~TimeofArrivalClock.Mask) | (value) @property def Ibias_Preamp_ON(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.Ibias_Preamp_ON) return value & 0xFF @Ibias_Preamp_ON.setter def Ibias_Preamp_ON(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_Preamp_ON, nint & 0xFF) @property def Ibias_Preamp_OFF(self): """[0, 15]""" value = self._device.getDac(DacRegisterCodes.Ibias_Preamp_OFF) return value & 0xF @Ibias_Preamp_OFF.setter def Ibias_Preamp_OFF(self, value): """[0, 15]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_Preamp_OFF, nint & 0xF) @property def VPreamp_NCAS(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.VPreamp_NCAS) return value & 0xFF @VPreamp_NCAS.setter def VPreamp_NCAS(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.VPreamp_NCAS, nint & 0xFF) @property def Ibias_Ikrum(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.Ibias_Ikrum) return value & 0xFF @Ibias_Ikrum.setter def Ibias_Ikrum(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_Ikrum, nint & 0xFF) @property def Vfbk(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.Vfbk) return value & 0xFF @Vfbk.setter def Vfbk(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.Vfbk, nint & 0xFF) @property def Vthreshold_fine(self): """[0, 511]""" value = self._device.getDac(DacRegisterCodes.Vthreshold_fine) return value & 0x1FF @Vthreshold_fine.setter def Vthreshold_fine(self, value): """[0, 511]""" nint = int(value) self._device.setDac(DacRegisterCodes.Vthreshold_fine, nint & 0x1FF) @property def Vthreshold_coarse(self): """[0, 15]""" value = self._device.getDac(DacRegisterCodes.Vthreshold_coarse) return value & 0xF @Vthreshold_coarse.setter def Vthreshold_coarse(self, value): """[0, 15]""" nint = int(value) self._device.setDac(DacRegisterCodes.Vthreshold_coarse, nint & 0xF) @property def Ibias_DiscS1_ON(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.Ibias_DiscS1_ON) return value & 0xFF @Ibias_DiscS1_ON.setter def Ibias_DiscS1_ON(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_DiscS1_ON, nint & 0xFF) @property def Ibias_DiscS1_OFF(self): """[0, 15]""" value = self._device.getDac(DacRegisterCodes.Ibias_DiscS1_OFF) return value & 0xF @Ibias_DiscS1_OFF.setter def Ibias_DiscS1_OFF(self, value): """[0, 15]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_DiscS1_OFF, nint & 0xF) @property def Ibias_DiscS2_ON(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.Ibias_DiscS2_ON) return value & 0xFF @Ibias_DiscS2_ON.setter def Ibias_DiscS2_ON(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_DiscS2_ON, nint & 0xFF) @property def Ibias_DiscS2_OFF(self): """[0, 15]""" value = self._device.getDac(DacRegisterCodes.Ibias_DiscS2_OFF) return value & 0xF @Ibias_DiscS2_OFF.setter def Ibias_DiscS2_OFF(self, value): """[0, 15]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_DiscS2_OFF, nint & 0xF) @property def Ibias_PixelDAC(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.Ibias_PixelDAC) return value & 0xFF @Ibias_PixelDAC.setter def Ibias_PixelDAC(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_PixelDAC, nint & 0xFF) @property def Ibias_TPbufferIn(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.Ibias_TPbufferIn) return value & 0xFF @Ibias_TPbufferIn.setter def Ibias_TPbufferIn(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_TPbufferIn, nint & 0xFF) @property def Ibias_TPbufferOut(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.Ibias_TPbufferOut) return float((value & 0xFF)) @Ibias_TPbufferOut.setter def Ibias_TPbufferOut(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.Ibias_TPbufferOut, nint & 0xFF) @property def VTP_coarse(self): """[0, 255]""" value = self._device.getDac(DacRegisterCodes.VTP_coarse) return float((value & 0xFF)) @VTP_coarse.setter def VTP_coarse(self, value): """[0, 255]""" nint = int(value) self._device.setDac(DacRegisterCodes.VTP_coarse, nint & 0xFF) @property def VTP_fine(self): """[0, 511]""" value = self._device.getDac(DacRegisterCodes.VTP_fine) return float((value & 0x1FF)) @VTP_fine.setter def VTP_fine(self, value): """[0, 511]""" nint = int(value) self._device.setDac(DacRegisterCodes.VTP_fine, nint & 0x1FF)
[docs] def setDac(self, code, value): """Sets the DAC parameter using codes Parameters ---------- code: :obj:`int` DAC code to set value: :obj:`int` value to set """ self._device.setDac(code, value)
[docs] def main(): import logging from multiprocessing import Queue from .SPIDR.spidrcontroller import SPIDRController logging.basicConfig(level=logging.INFO) end_queue = Queue() def get_queue_thread(queue): while True: value = queue.get() print(value) if value is None: break t = threading.Thread(target=get_queue_thread, args=(end_queue,)) t.daemon = True t.start() spidr = SPIDRController() timepix = TimepixDevice(spidr[0], end_queue) timepix.loadSophyConfig("/Users/alrefaie/Documents/repos/libtimepix/config/eq-norm-50V.spx") # spidr.shutterTriggerMode = SpidrShutterMode.Auto spidr.disableExternalRefClock() TdcEnable = 0x0000 spidr.setSpidrReg(0x2B8, TdcEnable) spidr.enableDecoders(True) spidr.resetTimers() spidr.restartTimers() spidr.datadrivenReadout() timepix.setupDevice() spidr.openShutter() timepix.start() time.sleep(4.0) timepix.stop() logging.info("DONE") spidr.closeShutter()
if __name__ == "__main__": main()