# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.
from enum import IntEnum
import numpy as np
import pymepix.config.load_config as cfg
from pymepix.processing.logic.datatypes_tpx4 import PacketType, ReadoutMode
from pymepix.processing.logic.processing_step import ProcessingStep
[docs]
class PixelOrientation(IntEnum):
"""Defines how row and col are intepreted in the output"""
Up = 0
"""Up is the default, x=column,y=row"""
Left = 1
"""x=row, y=-column"""
Down = 2
"""x=-column, y = -row """
Right = 3
"""x=-row, y=column"""
[docs]
class PacketProcessor_tpx4(ProcessingStep):
"""Class responsible to transform the raw data coming from the timepix directly into an easier
processible data format. Takes into account the pixel- and trigger data to calculate toa and tof
dimensions.
Methods
-------
process(data):
Process data and return the result. To use this class only this method should be used! Use the other methods only for testing or
if you are sure about what you are doing
"""
def __init__(
self,
handle_events=True,
event_window=(0.0, 2e8),
position_offset=(0, 0),
orientation=PixelOrientation.Up,
start_time=0,
timewalk_lut=None,
reversebytes=True,
*args,
**kwargs,
):
"""
Constructor for the PacketProcessor.
Parameters
----------
handle_events : boolean
Calculate events (tof) only if handle_events is True. Otherwise only pixel-data (toa only) is provided.
event_window : (float, float)
The range of tof, used for processing data. Information/ data outside of this range is discarded.
min_samples : (float, float)
Offset/ shift of x- and y-position
orientation : int
start_time : int
timewalk_lut
Data for correction of the time-walk
parameter_wrapper_classe : ProcessingParameter
Class used to wrap the processing parameters to make them changable while processing is running (useful for online optimization)
"""
super().__init__("PacketProcessor4", *args, **kwargs)
self._handle_events = self.parameter_wrapper_class(handle_events)
event_window_min, event_window_max = event_window
self._event_window_min = self.parameter_wrapper_class(event_window_min)
self._event_window_max = self.parameter_wrapper_class(event_window_max)
self._PC24bit = self.parameter_wrapper_class(False)
self._orientation = orientation
self._x_offset, self._y_offset = position_offset
self._start_time = start_time
self._timewalk_lut = timewalk_lut
self._trigger_counter = 0
self.clearBuffers()
self.decode_gray = self.makeinversegraycode()
# Dictionary for decoding ultrafast ToA
self.decode_ufT = {
0x0F: 0,
0x0E: 0.1953125,
0x0C: 0.390625,
0x08: 0.5859375,
0x00: 0.78125,
0x01: 0.9765625,
0x03: 1.171875,
0x07: 1.3671875,
}
self.npa_decode_ufT = np.zeros((0x10,), dtype=float)
for key, val in self.decode_ufT.items():
self.npa_decode_ufT[key] = val
self.int64be = np.dtype(np.int64)
self.int64be = self.int64be.newbyteorder("<")
trig_indxs = cfg.default_cfg.get("trig_indxs", [[], [], [], []])
self.trigger_pixels_indxs = [pix[0] + pix[1] * 512 for pix in trig_indxs if pix != []]
uint64be = np.dtype(np.int64)
# With high speed readout, Jan 2024 firmware requires reversing of byte order in packets
if reversebytes:
self.uint64be = uint64be.newbyteorder("<")
else:
self.uint64be = uint64be.newbyteorder(">")
@property
def event_window(self):
return (self._event_window_min.value, self._event_window_max.value)
@event_window.setter
def event_window(self, event_window):
event_window_min, event_window_max = event_window
self._event_window_min.value = event_window_min
self._event_window_max.value = event_window_max
@property
def handle_events(self):
""":noindex:"""
return self._handle_events.value
@handle_events.setter
def handle_events(self, handle_events):
self._handle_events.value = handle_events
@property
def PC24bit(self):
return self._PC24bit
@PC24bit.setter
def PC24bit(self, PC24bit):
self._PC24bit = PC24bit
[docs]
def makeinversegraycode(self):
arraylength = 2**16
lut = np.zeros(arraylength, dtype=np.uint16)
for inval in range(arraylength):
n = inval
inv = 0
while n:
inv = inv ^ n
n = n >> 1
lut[inval] = inv
return lut
[docs]
def process(self, data):
"""The data should contain udp packets with headers at the beginning with size 54 bytes, the payload is 4960 bytes, therefore udp packetsize is 5014 bytes:
PACKET_HEADER_SIZE = 54
PACKET_LOAD_SIZE = 4960
PACKET_SIZE = 5014
New input:
Data is actually started from 61, lst byte to ignore
"""
event_data, pixel_data, timestamps, triggers = None, None, None, None
packet_view = memoryview(data)
# longtime = int(np.frombuffer(packet_view[-8:], dtype=np.uint64)[0])
number_of_udp_packets = int(len(packet_view[:-8]) / 5014)
rawpacketarray = np.frombuffer(
np.frombuffer(packet_view[:-8], dtype=np.uint8)
.reshape((number_of_udp_packets, 5014))[:, 54:]
.flatten()
.tobytes(),
self.uint64be,
)
if number_of_udp_packets > 0: # longtime data probably will be not needed being legacy from TPX3
# rawpacketarray = rawpacketarray[rawpacketarray != -4294967296] # fix issue with sign
# no_data_packets_entries = rawpacketarray == 0xFFFFFFFF00000000
arange_index = np.arange(len(rawpacketarray))
endofcol = (rawpacketarray & 0x7F80000000000000) >> 55
# for debugging purpose limit data from before 1st shutter rise
"""
shutter_rise = np.where(endofcol == PacketType.ShutterRise)[0][0]
endofcol = endofcol[:shutter_rise]
rawpacketarray = rawpacketarray[:shutter_rise]
arange_index = np.arange(len(rawpacketarray))
"""
pixel_entries = endofcol <= 0xDF
timing_entries = np.logical_and(
PacketType.Heartbeat.value <= endofcol,
endofcol <= PacketType.CtrlDataTestB.value,
)
pixel_packets = rawpacketarray[pixel_entries]
if len(pixel_packets) == 0:
return None
if self.PC24bit is False:
# x, y, ToA, ToT = self.dataPC24bitdecode(rawpacketarray[pixel_entries])
raise NotImplementedError("Not implemented PC24bitdecode!")
else:
x, y, ToA, ToT = self.process_pixels(pixel_packets)
decodedtimestamps = self.timestampeventdecode(rawpacketarray[timing_entries])
ToA = self.correct_coarsetime(
ToA,
arange_index[pixel_entries],
decodedtimestamps,
arange_index[timing_entries],
)
self._toa = ToA
triggers = self.find_triggers(x, y, ToA)
pixel_data = (x, y, ToA, ToT)
if self.handle_events:
result = self.find_events_fast()
if result is not None:
event_data, timestamps = result
return event_data, pixel_data, timestamps, triggers
[docs]
def pre_process(self):
self.log.info("Running with triggers? {}".format(self.handle_events))
[docs]
def post_process(self):
return self.find_events_fast_post()
[docs]
def updateBuffers(self, val_filter):
self._x = self._x[val_filter]
self._y = self._y[val_filter]
self._toa = self._toa[val_filter]
self._tot = self._tot[val_filter]
[docs]
def getBuffers(self, val_filter=None):
if val_filter is None:
return (
np.copy(self._x),
np.copy(self._y),
np.copy(self._toa),
np.copy(self._tot),
)
else:
return (
np.copy(self._x[val_filter]),
np.copy(self._y[val_filter]),
np.copy(self._toa[val_filter]),
np.copy(self._tot[val_filter]),
)
[docs]
def clearBuffers(self):
self._x = None
self._y = None
self._tot = None
self._toa = None
self._triggers = None
[docs]
def process_pixels(self, rawpackets, gray=False):
"""Decodes pixel hit data.
Parameters
----------
rawpacket : 8-byte integer value (big endian format, I believe)
Returns
-------
tuple
The decoded packet, with the following elements:
- PacketType.PixelData
- Pixel x co-ordinate
- Pixel y co-ordinate
- Time of arrival (ns). This incorporates all three pixel internal timers, but not additional global information (e.g from heartbeat signal) and so rolls over every 1.5ms. Additionally, does not (yet) implement timewalk correction
- Time over threshold (ns)
- Pileup - indicates pileup occurred (1) or not (0)
"""
top = (rawpackets >> 63) & 0x1 # 1 bit, 63
endofcol = (rawpackets >> 55) & 0xFF # 8 bits, 62:55
# 6 bit superpixel value starts with a 4-bit superpixel group
# 6-bit value is needed for finding x,y position, and 4-bit spg for correcting ToA
spgroup = (rawpackets >> 51) & 0x0F # 4 bits, 54:51
superpixel = (rawpackets >> 49) & 0x3F # 6 bits, 54:49.
pixel = (rawpackets >> 46) & 0x07 # 3 bits, 48:46
ToA_raw = (rawpackets >> 30) & 0xFFFF # 16 bits, 45:30
if gray:
ToA_raw = self.decode_gray[ToA_raw]
ufToA_start_raw = (rawpackets >> 26) & 0x0F # 4 bits, 29:26
ufToA_stop_raw = (rawpackets >> 22) & 0x0F # 4 bits, 25:22
fToA_rise_raw = (rawpackets >> 17) & 0x1F # 5 bits, 21:17
fToA_fall_raw = (rawpackets >> 12) & 0x1F # 5 bits, 16:12
ToT_raw = (rawpackets >> 1) & 0x7FF # 11 bits, 11:1
# pileup = (rawpackets & 0x01) # 1 bit, 0
step_To = 25.0
step_fTo = 1.5625
step_DLL = 0.78125
# Ultrafast ToA rise/fall values:
# These measure time delays between signal and 640 MHz clock
try:
ToA = (
ToA_raw * step_To
- fToA_rise_raw * step_fTo
+ self.npa_decode_ufT[ufToA_start_raw]
- self.npa_decode_ufT[ufToA_stop_raw]
)
except KeyError:
ToA = ToA_raw * step_To - fToA_rise_raw * step_fTo
# pileup = -1
# Additionally, there is a correction related to clock distribution down the columns
ToA = ToA + (spgroup - 15) * step_DLL
# fToA rise and fall provide more accurate ToT -
ToT = ToT_raw * step_To + fToA_rise_raw * step_fTo - fToA_fall_raw * step_fTo
# Will define top left as x=0, y=0, as with Lambda
x = 2 * endofcol + (pixel > 3)
y = superpixel * 4 + pixel % 4
x = np.abs(447 * top - x)
y = np.abs(511 * (1 - top) - y)
# Need to check if this is a special external timestamp pixel, and return appropriate type
# How external timestamps are fed in is configurable; minimum granularity is superpixel
# Need to find more details of this - is the whole superpixel overridden?
if self.handle_events:
if self._x is None:
self._x = x
self._y = y
self._toa = ToA
self._tot = ToT
else:
self._x = np.append(self._x, x)
self._y = np.append(self._y, y)
self._toa = np.append(self._toa, ToA)
self._tot = np.append(self._tot, ToT)
return x, y, ToA, ToT # , pileup
[docs]
def dataPC24bitdecode(self, rawpackets):
"""Decodes 24 bit photon counting mode data.
Parameters
----------
rawpacket : 8-byte integer value (big endian format, I believe)
Returns
-------
tuple
The decoded packet, with the following elements:
- PacketType.PC24bitData
- Pixel x co-ordinate
- Pixel y co-ordinate
- 24-bit count value
"""
top = (rawpackets >> 63) & 0x1 # 1 bit, 63
endofcol = (rawpackets >> 55) & 0xFF # 8 bits, 62:55
# 6 bit superpixel value starts with a 4-bit superpixel group
# 6-bit value is needed for finding x,y position, and 4-bit spg for correcting ToA
# spgroup = ((rawpackets >> 51) & 0x0f) # 4 bits, 54:51 # not used?
superpixel = (rawpackets >> 49) & 0x3F # 6 bits, 54:49.
pixel = (rawpackets >> 46) & 0x07 # 3 bits, 48:46
counts = rawpackets & 0xFFFFFF # 24 bits, 23:0
# Will define top left as x=0, y=0, as with Lambda
x = 2 * endofcol + (pixel > 3)
y = superpixel * 4 + pixel % 4
x = np.abs(447 * top - x)
y = np.abs(511 * (1 - top) - y)
data_type = np.empty((len(rawpackets),), dtype=PacketType)
data_type.fill(PacketType.PC24bitData)
return data_type, x, y, counts
[docs]
def timestampeventdecode(self, rawpackets):
"""Various Timepix4 special events which include timestamp data.
These are Heartbeat, ShutterRise/Fall, T0Sync, SignalRise/Fall and CtrlDataTest
Parameters
----------
rawpacket : 8-byte integer value (big endian format)
Returns
-------
tuple
The decoded packet, with the following elements:
- PacketType
- Timestamp (ns). This is 48 bits long with a 40 MHz clock - so it has 25ns resolution and a range of up to 7 million s (7e15 ns)
"""
endofcol = (rawpackets & 0x7F80000000000000) >> 55 # 8 bits, 62:55
currentpackettype = np.array(list(map(self.packettype_mapper, endofcol)), dtype=PacketType)
timestamp_raw = rawpackets & 0x0000FFFFFFFFFFFF # 48 bits, 47:00
step_To = 25.0
timestamp = timestamp_raw * step_To # Range up to 7 million seconds
return currentpackettype, timestamp
[docs]
def packettype_mapper(self, packet):
try:
t = PacketType(packet)
return t
except ValueError:
return PacketType.Unknown
[docs]
def readout_mapper(self, packet):
try:
t = ReadoutMode(packet)
return t
except ValueError:
return None
[docs]
def controleventdecode(self, rawpackets):
"""Decodes special packets in Frame mode aimed at helping image decoding.
The image is split up into "sequences", each of which corresponds to a different region of the chip,
and each frame and sequence has a header and trailer (FrameStart, FrameEnd, SequenceStart, SequenceEnd)
Parameters
----------
rawpacket : 8-byte integer value (big endian format)
Returns
-------
tuple
The decoded packet, with the following elements:
- PacketType
- Top - indicates if the data came from the top half (1) or bottom half (0) of the chip
- Segment - each segment consists of 1/8 of the columns from one half of the chip - see manual
- Readout mode - int Enum defined in datatypes.py, indicating 8 bit or 16 bit.
"""
endofcol = (rawpackets & 0x7F80000000000000) >> 55 # 8 bits, 62:55
currentpackettype = np.array(list(map(self.packettype_mapper, endofcol)), dtype=PacketType)
top = (rawpackets >> 63) & 0x1 # 1 bit, 63
segment = (rawpackets & 0x0070000000000000) >> 52 # 3 bit, 54:52
readoutmode = np.array(
list(map(self.readout_mapper, ((rawpackets & 0x000C000000000000) >> 50))),
dtype=ReadoutMode,
) # 2 bit, 51:50. Use enum.
return currentpackettype, top, segment, readoutmode
# ToA = self.correct_coarsetime(ToA, arange_index[pixel_entries], decodedtimestamps, arange_index[timing_entries])
[docs]
def correct_coarsetime(self, ToA, pixel_indxs, Time_events, time_indxs):
rollovertime = 25.0 * 2**16
heartbeats_entries = Time_events[0] == PacketType.Heartbeat
coarsetime = Time_events[1][heartbeats_entries]
coarsetime_bit_15 = ((np.int_(Time_events[1][heartbeats_entries] / 25) >> 15) & 0b1).astype(bool)
ToA_bit_15 = ((np.int_(ToA / 25) >> 15) & 0b1).astype(bool)
if np.shape(coarsetime)[0] > 0:
coarse_time_bins = np.digitize(pixel_indxs, time_indxs[heartbeats_entries]) - 1
mapped_coarse_time = coarsetime[coarse_time_bins]
mapped_coarse_time_b15 = coarsetime_bit_15[coarse_time_bins]
mapped_coarse_time[mapped_coarse_time_b15 & ~ToA_bit_15] += 0b1000000000000000 * 25
mapped_coarse_time = np.floor(mapped_coarse_time / rollovertime) * rollovertime
return ToA + mapped_coarse_time
else:
return ToA
# ToF, triggers = find_trigger_events(decodedpackets[1], decodedpackets[2], decodedpackets[3])
[docs]
def find_triggers(self, x, y, ToA):
"the first index in trig_indxs will be used for calculation of ToF"
trigger_entries = [trig_i == x + y * 512 for trig_i in self.trigger_pixels_indxs]
triggers = [ToA[trig] for trig in trigger_entries]
if self.handle_events:
if self._triggers is None:
self._triggers = np.copy(triggers[0])
else:
self._triggers = np.append(self._triggers, triggers[0])
return triggers
[docs]
def find_events_fast(self):
if self.__exist_enough_triggers():
self._triggers = self._triggers[
np.argmin(self._triggers) :
] # in case TOA gets reset, remove biggest trigger
# self._triggers = np.sort(self._triggers)
if self.__toa_is_not_empty():
start = self._triggers
else:
return None
if start.size > 1:
trigger_counter = np.arange(
self._trigger_counter,
self._trigger_counter + start.size - 1,
dtype=int,
)
self._trigger_counter = trigger_counter[-1] + 1
# Get the first and last triggers in pile
first_trigger = start[0]
last_trigger = start[-1]
# Delete useless pixels before the first trigger
self.updateBuffers(self._toa >= first_trigger)
# grab only pixels we care about
x, y, toa, tot = self.getBuffers(self._toa < last_trigger)
self.updateBuffers(self._toa >= last_trigger)
try:
event_mapping = np.digitize(toa, start) - 1
except Exception as e:
self.log.error("Exception has occured {} due to ", str(e))
self.log.error("Writing output TOA {}".format(toa))
self.log.error("Writing triggers {}".format(start))
self.log.error("Flushing triggers!!!")
self._triggers = self._triggers[-1:]
return None
self._triggers = self._triggers[-1:]
tof = toa - start[event_mapping]
event_number = trigger_counter[event_mapping]
event_window_min, event_window_max = self.event_window
exp_filter = (tof >= event_window_min) & (tof <= event_window_max)
result = (
event_number[exp_filter],
x[exp_filter],
y[exp_filter],
tof[exp_filter],
tot[exp_filter],
)
if result[0].size > 0:
timeStamps = np.uint64(start[np.unique(event_mapping)]) # times for trigger event
else:
return None
return result, (np.unique(result[0]), timeStamps)
return None # Clear out the triggers since they have nothing
def __exist_enough_triggers(self):
return self._triggers is not None and self._triggers.size >= 2
[docs]
def find_events_fast_post(self):
"""Call this function at the very end of to also have the last two trigger events processed"""
# add an imaginary last trigger event after last pixel event for np.digitize to work
if self._toa is not None and self._toa.shape[0] > 0 and self._triggers is not None:
self._triggers = np.concatenate((self._triggers, np.array([self._toa.max() + 1])))
else:
return None, None, None, None
event_data, timestamps = None, None
result = self.find_events_fast()
if result is not None:
event_data, timestamps = result
return event_data, None, timestamps, None
def __toa_is_not_empty(self):
return self._toa is not None and self._toa.size > 0
[docs]
def orientPixels(self, col, row):
"""Orient the pixels based on Timepix orientation"""
if self._orientation is PixelOrientation.Up:
return col, row
elif self._orientation is PixelOrientation.Left:
return row, 255 - col
elif self._orientation is PixelOrientation.Down:
return 255 - col, 255 - row
elif self._orientation is PixelOrientation.Right:
return 255 - row, col
[docs]
def main():
filename = "/home/samartse/TPX4/2023-03-15_30MHz_events_1min_4700_EQ_cosmics_bias_100V_08.raw"
filename = "/home/samartse/TPX4/2023-05-30_events_HV100_cosmic_30s_bytes.bin"
filename = "/home/tmpxusr/data/MID_commissioning_202406/run_0002_TOP.bin"
# file = open(filename,"rb")
# bin_data = file.read()
with open(filename, "rb") as data_file:
# bin_data = np.load(data_file)
# also attach the longtime in the beginning
bin_data = np.uint64(1).tobytes() + data_file.read()
packetprocessor = PacketProcessor_tpx4()
event_data, pixel_data, timestamps, triggers = packetprocessor.process(bin_data)
if __name__ == "__main__":
main()