Source code for pymepix.SPIDR.spidrcontroller

# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.


"""SPIDR related classes"""

import socket
import threading

import numpy as np

import pymepix.config.load_config as cfg
from pymepix.core.log import Logger

from .error import PymePixException
from .spidrcmds import SpidrCmds
from .spidrdefs import SpidrRegs, SpidrShutterMode
from .spidrdevice import SpidrDevice


[docs] class SPIDRController: """Object that interfaces over ethernet with the SPIDR board This object interfaces with the spidr board through TCP and is used to send commands and receive data. It can be treated as a list of :class:`SpidrDevice` objects to talk to a specific device Parameters ---------- dst_ip_port : :obj:`tuple` of :obj:`str` and :obj:`int` socket style tuple of SPIDR ip address and port src_ip_port : :obj:`tuple` of :obj:`str` and :obj:`int`, optional socket style tuple of the IP address and port of the interface that is connecting to SPIDR Examples -------- The class can be used to talk to SPIDR >>> spidr = SPIDRController() >>> spidr.fpgaTemperature 39.5 Or access a specific :class:`SpidrDevice` (e.g. Timepix/Medipix) >>> spidr[0].deviceId 7272 >>> spidr[1].deviceId 2147483648 .. Warning:: This object assumes SPIDR is working as intended however since this is still in development there are a few functions that do not behave as they should, this will be documented in their relevant areas. """ def __init__(self): self.log = Logger(SPIDRController.__name__) self.log.info(f"Connecting to camera on {cfg.default_cfg['timepix']['ip']}:{cfg.default_cfg['timepix']['port']}") self._udp_port = cfg.default_cfg['timepix']['data_port'] # TCP connection self._sock = socket.create_connection((cfg.default_cfg['timepix']['ip'], cfg.default_cfg['timepix']['port'])) self._request_lock = threading.Lock() self._req_buffer = np.ndarray(shape=(512,), dtype=np.uint32) self._reply_buffer = bytearray(4096) self._reply_view = memoryview(self._reply_buffer) self._vec_htonl = np.vectorize(self.convertHtonl) self._vec_ntohl = np.vectorize(self.convertNtohl) self._pixel_config = np.ndarray(shape=(256, 256), dtype=np.uint8) # self.resetModule(SpidrReadoutSpeed.Default) self._devices = [] self._initDevices() def __getitem__(self, key): return self._devices[key] def __len__(self): return len(self._devices) def _initDevices(self): count = self.deviceCount for x in range(count): self._devices.append(SpidrDevice(self, x)) self._devices[x].serverPort = self._udp_port + x def prepare(self): self.disableExternalRefClock() TdcEnable = 0x0000 self.setSpidrReg(0x2B8, TdcEnable) self.enableDecoders(True) self.datadrivenReadout()
[docs] def resetModule(self, readout_speed): """Resets the SPIDR board and sets a new readout speed Parameters ---------- readout_speed : :class:`SpidrReadoutSpeed` Read-out speed the device will operate at Notes ---------- Its not clear if this does anything as its not usually used """ self.requestGetInt(SpidrCmds.CMD_RESET_MODULE, 0, readout_speed.value)
# -----------------Registers----------------------- @property def CpuToTpx(self): """Cpu2Tpx register access Parameters ---------- value : int Value to write to the register Returns ---------- int Current value of the register Raises ---------- :class:`PymePixException` Communication error Notes ---------- Register controls clock setup """ return self.getSpidrReg(SpidrRegs.SPIDR_CPU2TPX_WR_I) @CpuToTpx.setter def CpuToTpx(self, value): return self.setSpidrReg(SpidrRegs.SPIDR_CPU2TPX_WR_I, value) # ---------Shutter registers--------- @property def ShutterTriggerCtrl(self): """Shutter Trigger Control register access Parameters ---------- value : int Value to write to the register Returns ---------- int Current value of the register Raises ---------- :class:`PymePixException` Communication error """ return self.getSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_CTRL_I) @ShutterTriggerCtrl.setter def ShutterTriggerCtrl(self, value): return self.setSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_CTRL_I, value) @property def ShutterTriggerMode(self): """Controls how the shutter is triggered Parameters ---------- value : :class:`SpidrShutterMode` Shutter trigger mode to set Returns ---------- :class:`SpidrShutterMode` Current shutter operation mode read from SPIDR Raises ---------- :class:`PymePixException` Communication error Notes ---------- AutoTrigger is the only functioning trigger mode that SPIDR can operate in """ return SpidrShutterMode(self.ShutterTriggerCtrl & 0x7) @ShutterTriggerMode.setter def ShutterTriggerMode(self, mode): reg = self.ShutterTriggerCtrl reg &= ~0x7 reg |= mode.value self.ShutterTriggerCtrl = reg @property def ShutterTriggerCount(self): """Number of times the shutter is triggered in auto trigger mode Parameters ---------- value : int Trigger count to set for auto trigger mode ( Set to 0 for infinite triggers) Returns ---------- int: Current value of the trigger count read from SPIDR Raises ---------- :class:`PymePixException` Communication error """ return self.getSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_CNT_I) @ShutterTriggerCount.setter def ShutterTriggerCount(self, value): return self.setSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_CNT_I, value) @property def ShutterTriggerFreq(self): """Triggering frequency for the auto trigger Parameters ---------- value : float Frequency in mHz Returns ---------- float: Frequency value in mHz read from SPIDR Raises ---------- :class:`PymePixException` Communication error """ freq = self.getSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_FREQ_I) mhz = 40000000000.0 / freq return int(mhz) @ShutterTriggerFreq.setter def ShutterTriggerFreq(self, mhz): freq = 40000000000.0 / mhz return self.setSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_FREQ_I, freq) @property def ShutterTriggerLength(self): """Length of time shutter remains open at each trigger Parameters ---------- value : int Length in ns Returns ---------- value: int Current length in ns read from SPIDR Raises ---------- :class:`PymePixException` Communication error """ return self.getSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_LENGTH_I) * 25 @ShutterTriggerLength.setter def ShutterTriggerLength(self, value): return self.setSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_LENGTH_I, (value + 24) // 25) @property def ShutterTriggerDelay(self): """Delay time before shutter can be triggered again in auto trigger mode Parameters ---------- value : int Time in ns Returns ---------- value: int Current time in ns read from SPIDR Raises ---------- :class:`PymePixException` Communication error """ return self.getSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_DELAY_I) * 25 @ShutterTriggerDelay.setter def ShutterTriggerDelay(self, value): return self.setSpidrReg(SpidrRegs.SPIDR_SHUTTERTRIG_DELAY_I, value // 25) @property def DeviceAndPorts(self): return self.getSpidrReg(SpidrRegs.SPIDR_DEVICES_AND_PORTS_I) @property def TdcTriggerCounter(self): """Trigger packets sent by SPIDR since last counter reset""" return self.getSpidrReg(SpidrRegs.SPIDR_TDC_TRIGGERCOUNTER_I) @property def UdpPacketCounter(self): """UDP packets sent by SPIDR since last counter reset""" return self.getSpidrReg(SpidrRegs.SPIDR_UDP_PKTCOUNTER_I) @property def UdpMonPacketCounter(self): return self.getSpidrReg(SpidrRegs.SPIDR_UDPMON_PKTCOUNTER_I) @property def UdpPausePacketCounter(self): """UDP packets collected during readout pause since last counter reset""" return self.getSpidrReg(SpidrRegs.SPIDR_UDPPAUSE_PKTCOUNTER_I) @UdpPacketCounter.setter def UdpPacketCounter(self, value): return self.setSpidrReg(SpidrRegs.SPIDR_UDP_PKTCOUNTER_I, 0) @UdpMonPacketCounter.setter def UdpMonPacketCounter(self, value): return self.setSpidrReg(SpidrRegs.SPIDR_UDPMON_PKTCOUNTER_I, 0) @UdpPausePacketCounter.setter def UdpPausePacketCounter(self, value): return self.setSpidrReg(SpidrRegs.SPIDR_UDPPAUSE_PKTCOUNTER_I, 0) # --------------------------------------------------- @property def softwareVersion(self): """Software version Returns -------- int: Version number of software in the SPIDR board Raises ---------- :class:`PymePixException` Communication error """ return self.requestGetInt(SpidrCmds.CMD_GET_SOFTWVERSION, 0) @property def firmwareVersion(self): """Firmware version Returns -------- int: Version number of firmware within the FPGA Raises ---------- :class:`PymePixException` Communication error """ return self.requestGetInt(SpidrCmds.CMD_GET_FIRMWVERSION, 0) @property def localTemperature(self): """Local ????!?!? Temperature read from sensor Returns -------- float: Temperature in Celsius Raises ---------- :class:`PymePixException` Communication error """ return self.requestGetInt(SpidrCmds.CMD_GET_LOCALTEMP, 0) / 1000 @property def remoteTemperature(self): """Remote ????!?!? Temperature read from sensor Returns -------- float: Temperature in Celsius Raises ---------- :class:`PymePixException` Communication error """ return self.requestGetInt(SpidrCmds.CMD_GET_REMOTETEMP, 0) / 1000 @property def fpgaTemperature(self): """Temperature of FPGA board read from sensor Returns -------- float: Temperature in Celsius Raises ---------- :class:`PymePixException` Communication error """ return self.requestGetInt(SpidrCmds.CMD_GET_FPGATEMP, 0) / 1000 @property def humidity(self): """Humidity read from sensor Returns -------- int: Humidity as percentage Raises ---------- :class:`PymePixException` Communication error """ return self.requestGetInt(SpidrCmds.CMD_GET_HUMIDITY, 0) @property def pressure(self): """Pressure read from sensor Returns -------- int: Pressure in bar Raises ---------- :class:`PymePixException` Communication error """ return self.requestGetInt(SpidrCmds.CMD_GET_PRESSURE, 0) @property def chipboardFanSpeed(self): return self.requestGetInt(SpidrCmds.CMD_GET_FANSPEED, 0, 0) @property def boardFanSpeed(self): return self.requestGetInt(SpidrCmds.CMD_GET_FANSPEED, 0, 1) @property def avdd(self): return tuple(self.requestGetInts(SpidrCmds.CMD_GET_AVDD, 0, 3) / 1000) @property def vdd(self): return tuple(self.requestGetInts(SpidrCmds.CMD_GET_VDD, 0, 3) / 1000) @property def dvdd(self): return tuple(self.requestGetInts(SpidrCmds.CMD_GET_DVDD, 0, 3) / 1000) @property def avddNow(self): return tuple(self.requestGetInts(SpidrCmds.CMD_GET_AVDD_NOW, 0, 3) / 1000) @property def vddNow(self): return tuple(self.requestGetInts(SpidrCmds.CMD_GET_VDD_NOW, 0, 3) / 1000) @property def dvddNow(self): return tuple(self.requestGetInts(SpidrCmds.CMD_GET_DVDD_NOW, 0, 3) / 1000) @property def deviceCount(self): """Count of devices connected to SPIDR Returns --------- int: Number of devices connected to SPIDR Raises ---------- :class:`PymePixException` Communication error .. Warning:: SPIDR always returns 4 since it currently can't determine if the devices are actually valid or not """ return self.requestGetInt(SpidrCmds.CMD_GET_DEVICECOUNT, 0) @property def deviceIds(self): """The ids of all devices connected to the SPIDR board Returns --------- :obj:`list` of :obj:`int`: A list all connected device ids Raises ---------- :class:`PymePixException` Communication error Notes -------- Index of devices are the same as the those in the SPIDRController list >>> spidr[1].deviceId == spidr.deviceIds[1] True """ device_count = self.deviceCount return self.requestGetInts(SpidrCmds.CMD_GET_DEVICEIDS, 0, device_count) @property def linkCounts(self): links = self.DeviceAndPorts return ((links & 0xF00) >> 8) + 1 @property def chipboardId(self): return self.requestGetInt(SpidrCmds.CMD_GET_CHIPBOARDID, 0)
[docs] def setBusy(self): return self.requestSetInt(SpidrCmds.CMD_SET_BUSY, 0, 0)
[docs] def clearBusy(self): return self.requestSetInt(SpidrCmds.CMD_CLEAR_BUSY, 0, 0)
[docs] def resetDevices(self): """Resets all devices""" self.requestSetInt(SpidrCmds.CMD_RESET_DEVICES, 0, 0)
[docs] def reinitDevices(self): """Resets and initializes all devices Raises ---------- :class:`PymePixException` Communication error """ self.requestSetInt(SpidrCmds.CMD_REINIT_DEVICES, 0, 0)
[docs] def setPowerPulseEnable(self, enable): self.requestSetInt(SpidrCmds.CMD_PWRPULSE_ENA, 0, int(enable))
[docs] def setTpxPowerPulseEnable(self, enable): self.requestSetInt(SpidrCmds.CMD_TPX_POWER_ENA, 0, int(enable))
[docs] def setBiasSupplyEnable(self, enable): """Enables/Disables bias supply voltage Parameters ----------- enable : bool True - enables bias supply voltage False - disables bias supply voltage Raises ---------- :class:`PymePixException` Communication error """ self.requestSetInt(SpidrCmds.CMD_BIAS_SUPPLY_ENA, 0, int(enable))
@property def biasVoltage(self): """Bias voltage Parameters ----------- volts : int Bias voltage to supply in volts Minimum is 12V and Maximum is 104V Returns ----------- int: Current bias supply in volts Raises ---------- :class:`PymePixException` Communication error """ adc_data = self.requestGetInt(SpidrCmds.CMD_GET_SPIDR_ADC, 0, 1) return (((adc_data & 0xFFF) * 1500 + 4095) / 4096) / 10 @biasVoltage.setter def biasVoltage(self, volts): if volts < 12: volts = 12 if volts > 104: volts = 104 dac_value = int(((volts - 12) * 4095) / (104 - 12)) self.log.info("Setting bias Voltage to {} V (Dac value {})".format(volts, dac_value)) self.requestSetInt(SpidrCmds.CMD_SET_BIAS_ADJUST, 0, dac_value)
[docs] def enableDecoders(self, enable): """Determines whether the internal FPGA decodes ToA values Time of Arrival from UDP packets are gray encoded if this is enabled then SPIDR will decode them for you, otherwise you have to do this yourself after extracting them Parameters ----------- enable: bool True - enable FPGA decoding False - disable FPGA decoding Raises ---------- :class:`PymePixException` Communication error .. Tip:: Enable this """ self.requestSetInt(SpidrCmds.CMD_DECODERS_ENA, 0, int(enable))
[docs] def enablePeriphClk80Mhz(self): self.CpuToTpx |= 1 << 24
[docs] def disablePeriphClk80Mhz(self): self.CpuToTpx &= ~(1 << 24)
[docs] def enableExternalRefClock(self): """SPIDR recieves its reference clock externally This is often used when combining multiple Timepixs together so they can synchronize their clocks. The SPIDR board essentially acts as a slave to other SPIDRs Raises ---------- :class:`PymePixException` Communication error """ self.CpuToTpx |= 1 << 25
[docs] def disableExternalRefClock(self): """SPIDR recieves its reference clock internally This should be set in single SPIDR mode. When combining other SPIDR board, the master will set this to disabled Raises ---------- :class:`PymePixException` Communication error """ self.CpuToTpx &= ~(1 << 25)
[docs] def sequentialReadout(self, tokens, now): if now: tokens |= 0x80000000 self.requestSetInt(SpidrCmds.CMD_SEQ_READOUT, 0, tokens)
[docs] def datadrivenReadout(self): """Set SPIDR into data driven readout mode Data driven mode refers to the pixels packets sent as they are hit rather than camera style frames Raises ---------- :class:`PymePixException` Communication error .. Warning:: This is the only tested mode for pymepix. It is recommended that this is enabled """ self.requestSetInt(SpidrCmds.CMD_DDRIVEN_READOUT, 0, 0)
[docs] def pauseReadout(self): self.requestSetInt(SpidrCmds.CMD_PAUSE_READOUT, 0, 0)
[docs] def setShutterTriggerConfig(self, mode, length_us, freq_hz, count, delay_ns=0): """Set the shutter configuration in one go Parameters ---------- mode: int Shutter trigger mode length_us: int Shutter open time in microseconds freq_hz: int Auto trigger frequency in Hertz count: int Number of triggers delay_ns: int, optional Delay between each trigger (Default: 0) Raises ---------- :class:`PymePixException` Communication error """ data = [mode, length_us, freq_hz, count, delay_ns] if delay_ns == 0: data.pop() self.requestSetInts(SpidrCmds.CMD_SET_TRIGCONFIG, 0, data)
@property def shutterTriggerConfig(self): config = self.requestGetInts(SpidrCmds.CMD_GET_TRIGCONFIG, 0, 5) return tuple(config)
[docs] def startAutoTrigger(self): """Starts the auto trigger Raises ---------- :class:`PymePixException` Communication error """ self.requestSetInt(SpidrCmds.CMD_AUTOTRIG_START, 0, 0)
[docs] def stopAutoTrigger(self): """Stops the auto trigger Raises ---------- :class:`PymePixException` Communication error """ self.requestSetInt(SpidrCmds.CMD_AUTOTRIG_STOP, 0, 0)
[docs] def openShutter(self): """Immediately opens the shutter indefinetly Raises ---------- :class:`PymePixException` Communication error Notes --------- This overwrites shutter configurations with one that forces an open shutter """ self.setShutterTriggerConfig(SpidrShutterMode.Auto.value, 0, 10, 1, 0) self.startAutoTrigger()
[docs] def closeShutter(self): """Immediately closes the shutter Raises ---------- :class:`PymePixException` Communication error """ self.stopAutoTrigger()
@property def externalShutterCounter(self): return self.requestGetInt(SpidrCmds.CMD_GET_EXTSHUTTERCNTR, 0) @property def shutterCounter(self): return self.requestGetInt(SpidrCmds.CMD_GET_SHUTTERCNTR, 0)
[docs] def restartTimers(self): """Restarts SPIDR and Device timers Synchronizes both the SPIDR clock and Timepix/Medipix clocks so both trigger and ToA timestamps match .. Important:: This must be done if event selection is required (e.g. time of flight) otherwise the timestamps will be offset Raises ---------- :class:`PymePixException` Communication error """ return self.requestSetInt(SpidrCmds.CMD_RESTART_TIMERS, 0, 0)
[docs] def resetCounters(self): self.requestSetInt(SpidrCmds.CMD_RESET_COUNTERS, 0, 0)
[docs] def resetTimers(self): """Resets all timers to zero Sets the internal 48-bit timers for all Timepix/Medipix devices to zero Raises ---------- :class:`PymePixException` Communication error """ self.requestSetInt(SpidrCmds.CMD_RESET_TIMER, 0, 0)
[docs] def getAdc(self, channel, nr_of_samples): args = (channel & 0xFFFF) | ((nr_of_samples & 0xFFFF) << 16) self.requestGetInt(SpidrCmds.CMD_GET_SPIDR_ADC, 0, args)
[docs] def resetPacketCounters(self): self.UdpPacketCounter = 0 self.UdpMonPacketCounter = 0 self.UdpPausePackerCounter = 0 for idx, dev in enumerate(self): self.setSpidrReg(SpidrRegs.SPIDR_PIXEL_PKTCOUNTER_I, idx)
[docs] def getSpidrReg(self, addr): res = self.requestGetInts(SpidrCmds.CMD_GET_SPIDRREG, 0, 2, addr) if res[0] != addr: raise Exception("Incorrect register address returned {} expected {}".format(res[0], addr)) return res[1]
[docs] def setSpidrReg(self, addr, value): self.requestSetInts(SpidrCmds.CMD_SET_SPIDRREG, 0, [addr, value])
[docs] def request(self, cmd, dev_nr, message_length, expected_bytes=0): """Sends a command and (may) receive a reply Parameters ----------- cmd: :class:`SpidrCmds` Command to send dev_nr: int Device to send the request to. 0 is SPIDR and device number n is n+1 message_length: int Length of the message in bytes expected_bytes: int Length of expected reply from request (if any) (Default: 0) Returns ----------- :obj:`numpy.array` of :obj:`int` or :obj:`None`: Returns a numpy array of ints if reply expected, otherwise None Raises ---------- :class:`PymePixException` Communication error """ with self._request_lock: self.log.debug( "Command: {}, Device Id: {} Message Length: {} Expected Reply: {}".format( SpidrCmds(cmd).name, dev_nr, message_length, expected_bytes ) ) self._req_buffer[0] = socket.htonl(cmd) self._req_buffer[1] = socket.htonl(message_length) self._req_buffer[2] = 0 self._req_buffer[3] = socket.htonl(dev_nr) self.log.debug("Request Buffer: {}".format(self._req_buffer[0:message_length])) self._sock.send(self._req_buffer.tobytes()[0:message_length]) if cmd & SpidrCmds.CMD_NOREPLY: return bytes_returned = self._sock.recv_into(self._reply_view, 4096) if bytes_returned < 0: raise Exception("Failed to get reply") if bytes_returned < expected_bytes: raise Exception( "Unexpected reply length, got {} expected at least {}".format(bytes_returned, expected_bytes) ) _replyMsg = np.frombuffer(self._reply_buffer, dtype=np.uint32) self.log.debug("reply message: {}".format(_replyMsg)) error = socket.ntohl(int(_replyMsg[2])) if error != 0: try: raise PymePixException(error) except PymePixException as e: if "ERR_EMPTY" in e.message: pass else: raise reply = socket.ntohl(int(_replyMsg[0])) if reply != cmd | SpidrCmds.CMD_REPLY: raise Exception("Unexpected Reply {}".format(reply)) if socket.ntohl(int(_replyMsg[3])) != dev_nr: raise Exception("Unexpected device {}".format(dev_nr)) return _replyMsg
# def customRequest(self,request,total_bytes): # self._sock.send(self._req_buffer.tobytes()[0:total_bytes]) # sock_recv= self._sock.recv(4096) # missing_bytes = ((len(sock_recv)//32) + 1)*32 # buffer = sock_recv + b" "*missing_bytes # arr = np.frombuffer(buffer,dtype=np.uint32) # print(self._vec_ntohl(arr))
[docs] def convertNtohl(self, x): return socket.ntohl(int(x))
[docs] def convertHtonl(self, x): return socket.htonl(int(x))
[docs] def requestGetInt(self, cmd, dev_nr, arg=0): msg_length = 20 self._req_buffer[4] = socket.htonl(arg) reply = self.request(cmd, dev_nr, msg_length, msg_length) return socket.ntohl(int(reply[4]))
[docs] def requestGetInts(self, cmd, dev_nr, num_ints, args=0): msg_length = 20 self._req_buffer[4] = socket.htonl(args) expected_len = (4 + num_ints) * 4 reply = self.request(cmd, dev_nr, msg_length, expected_len) return self._vec_ntohl(reply[4 : 4 + num_ints])
[docs] def requestGetBytes(self, cmd, dev_nr, expected_bytes, args=0): msg_length = (4 + 1) * 4 self._req_buffer[4] = 0 expected_len = 16 + expected_bytes # Cast reply as an uint8 reply = self.request(cmd, dev_nr, msg_length, expected_len) return np.copy(reply[4:].view(dtype=np.uint8)[:expected_bytes])
[docs] def requestGetIntBytes(self, cmd, dev_nr, expected_bytes, args=0): msg_length = (4 + 1) * 4 self._req_buffer[4] = socket.htonl(args) expected_len = 20 + expected_bytes # Cast reply as an uint8 reply = self.request(cmd, dev_nr, msg_length, expected_len) int_val = socket.ntohl(int(reply[4])) byte_val = np.copy(reply[5:].view(dtype=np.uint8)[:expected_bytes]) return int_val, byte_val
[docs] def requestSetInt(self, cmd, dev_nr, value): msg_length = (4 + 1) * 4 self._req_buffer[4] = socket.htonl(value) self.request(cmd, dev_nr, msg_length, 20)
[docs] def requestSetInts(self, cmd, dev_nr, value): num_ints = len(value) msg_length = (4 + num_ints) * 4 self._req_buffer[4 : 4 + num_ints] = self._vec_htonl(value)[:] self.request(cmd, dev_nr, msg_length, 20)
[docs] def requestSetIntBytes(self, cmd, dev_nr, value_int, value_bytes): num_bytes = len(value_bytes) msg_length = (4 + 1) * 4 + num_bytes self._req_buffer[4] = socket.htonl(value_int) self._req_buffer[5:].view(dtype=np.uint8)[:num_bytes] = value_bytes[:] self.request(cmd, dev_nr, msg_length, 20)
[docs] def prepare(self): self.disableExternalRefClock() TdcEnable = 0x0000 self.setSpidrReg(0x2B8, TdcEnable) self.enableDecoders(True) self.datadrivenReadout()
[docs] def main(): import logging logging.basicConfig(level=logging.INFO) spidr = SPIDRController() print("Local temp: {} C".format(spidr.localTemperature)) print("FW: {:8X}".format(spidr.firmwareVersion)) print("SW: {:8X}".format(spidr.softwareVersion)) print("Device Ids {}".format(spidr.deviceIds)) for idx, dev in enumerate(spidr): print("Device {}: {}".format(idx, dev.deviceId)) print("CHIP Fanspeed: ", spidr.chipboardFanSpeed) print("SPIDR Fanspeed: ", spidr.boardFanSpeed) print("Pressure: ", spidr.pressure, "mbar") print("Humidity: ", spidr.humidity, "%") print("Temperature: ", spidr.localTemperature, " C") spidr.resetDevices() spidr.reinitDevices() print("spidr[0].ipAddrSrc: ", spidr[0].ipAddrSrc) print("spidr[0].ipAddrDest: ", spidr[0].ipAddrDest) print("spidr[0].devicePort: ", spidr[0].devicePort) print("spidr[0].serverPort: ", spidr[0].serverPort) print("spidr[0].headerFilter: ", spidr[0].headerFilter) print("spidr[0].TpPeriodPhase: ", spidr[0].TpPeriodPhase) print("spidr.ShutterTriggerFreq: ", spidr.ShutterTriggerFreq)
if __name__ == "__main__": main()