# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.
import time
import os
import struct
import numpy as np
import h5py
from .logic.centroid_calculator import CentroidCalculator
from .logic.packet_processor_factory import packet_processor_factory
[docs]
class RawFileSampler:
def __init__(
self,
file_name,
output_file,
number_of_processes=None,
timewalk_file=None,
cent_timewalk_file=None,
progress_callback=None,
camera_generation=3,
clustering_args={},
dbscan_clustering=True,
**kwargs,
):
self._filename = file_name
self._output_file = output_file
self.timewalk_file = timewalk_file
self.cent_timewalk_file = cent_timewalk_file
self._number_of_processes = number_of_processes
self._progress_callback = progress_callback
self._clustering_args = clustering_args
self._dbscan_clustering = dbscan_clustering
self.camera_generation = camera_generation
[docs]
def init_new_process(self, file):
"""create connections and initialize variables in new process"""
self._startTime = None
with open(self._filename, "rb") as file:
self._startTime = struct.unpack("L", file.read(8))[0]
self._longtime = -1
self._longtime_msb = 0
self._longtime_lsb = 0
self._packet_buffer = []
self._last_longtime = 0
timewalk_lut = None
cent_timewalk_lut = None
if self.timewalk_file is not None:
timewalk_lut = np.load(self.timewalk_file)
if self.cent_timewalk_file is not None:
cent_timewalk_lut = np.load(self.cent_timewalk_file)
self.packet_processor = packet_processor_factory()(
start_time=self._startTime, timewalk_lut=timewalk_lut
)
self.centroid_calculator = CentroidCalculator(
cent_timewalk_lut=cent_timewalk_lut,
number_of_processes=self._number_of_processes,
clustering_args=self._clustering_args,
dbscan_clustering=self._dbscan_clustering,
)
[docs]
def pre_run(self):
"""init stuff which should only be available in new process"""
try:
os.remove(self._output_file)
except OSError:
pass
self.init_new_process(self._filename)
self._last_update = time.time()
self.packet_processor.pre_process()
self.centroid_calculator.pre_process()
[docs]
def post_run(self):
result = self.packet_processor.post_process()
self._packet_buffer = []
if result is not None:
self.__calculate_and_save_centroids(*result)
self.centroid_calculator.post_process()
[docs]
def bytes_from_file(self, data_type="<u8"):
print("Reading to memory", flush=True)
with open(self._filename, "rb") as file:
ba = np.fromfile(file, dtype=data_type)
print("Done", flush=True)
packets_to_process = len(ba)
packets_processed = 0
for b in np.nditer(ba):
yield b
packets_processed += 1
if self._progress_callback is not None and packets_processed % 100 == 0:
self._progress_callback(packets_processed / packets_to_process)
[docs]
def handle_lsb_time(self, pixdata):
self._longtime_lsb = (pixdata & 0x0000FFFFFFFF0000) >> 16
[docs]
def handle_msb_time(self, pixdata):
self._longtime_msb = (pixdata & 0x00000000FFFF0000) << 16
tmplongtime = self._longtime_msb | self._longtime_lsb
if ((tmplongtime + 0x10000000) < (self._longtime)) and (self._longtime > 0):
print("Large backward time jump {} {} ignoring".format(self._longtime * 25e-9, tmplongtime * 25e-9))
elif (tmplongtime > (self._longtime + 0x10000000)) and (self._longtime > 0):
print("Large forward time jump {} {}".format(self._longtime * 25e-9, tmplongtime * 25e-9))
self._longtime = (self._longtime_msb - 0x10000000) | self._longtime_lsb
else:
self._longtime = tmplongtime
if self._last_longtime == 0:
self._last_longtime = self._longtime
return False
time_diff = (self._longtime - self._last_longtime) * 25e-9
# print('msb_time:', time_diff, tmplongtime)
if (time_diff) > 5.0:
self._last_longtime = self._longtime
return True
else:
return False
[docs]
def handle_other(self, pixdata):
"""trash data which arrives before 1st timestamp data (heartbeat)"""
if self._longtime == -1:
return
self._packet_buffer.append(pixdata)
[docs]
def push_data(self, post=False):
result = self.__run_packet_processor(self._packet_buffer)
self._packet_buffer = []
if result is not None:
self.__calculate_and_save_centroids(*result)
def __run_packet_processor(self, packet_buffer):
if len(packet_buffer) > 0:
packet_buffer.append(np.uint64(self._longtime))
return self.packet_processor.process(np.array(packet_buffer, dtype=np.uint64).tobytes())
return None
def __calculate_and_save_centroids(self, event_data, _pixel_data, timestamps, _trigger_data):
centroids = self.centroid_calculator.process(event_data)
self.saveToHDF5(self._output_file, event_data, centroids, timestamps, _trigger_data)
[docs]
def saveToHDF5(self, output_file, raw, clusters, timeStamps, _trigger_data):
if output_file is not None:
with h5py.File(output_file, "a") as f:
names = ["trigger nr", "x", "y", "tof", "tot avg", "tot max", "clustersize"]
###############
# save centroided data
if clusters is not None:
if f.keys().__contains__("centroided"):
for i, key in enumerate(names):
dset = f["centroided"][key]
dset.resize(dset.shape[0] + len(clusters[i]), axis=0)
dset[-len(clusters[i]) :] = clusters[i]
else:
grp = f.create_group("centroided")
grp.attrs["description"] = "centroided events"
grp.attrs["nr events"] = 0
for i, key in enumerate(names):
grp.create_dataset(key, data=clusters[i], maxshape=(None,))
f["centroided/tot max"].attrs["unit"] = "s"
f["centroided/tot avg"].attrs["unit"] = "s"
f["centroided/tot max"].attrs["description"] = "maximum of time above threshold in cluster"
f["centroided/tot avg"].attrs["description"] = "mean of time above threshold in cluster"
f["centroided/tof"].attrs["unit"] = "s"
f["centroided/x"].attrs["unit"] = "pixel"
f["centroided/y"].attrs["unit"] = "pixel"
# out_file.flush()
###############
# save raw data
if raw is not None:
names = ["trigger nr", "x", "y", "tof", "tot"]
if f.keys().__contains__("raw"):
for i, key in enumerate(names):
dset = f["raw"][key]
dset.resize(dset.shape[0] + len(raw[i]), axis=0)
dset[-len(raw[i]) :] = raw[i]
else:
grp = f.create_group("raw")
grp.attrs["description"] = "timewalk corrected raw events"
grp.attrs["nr events"] = 0
grp.create_dataset("trigger nr", data=raw[0].astype(np.uint64), maxshape=(None,))
grp.create_dataset("x", data=raw[1].astype(np.uint8), maxshape=(None,))
grp.create_dataset("y", data=raw[2].astype(np.uint8), maxshape=(None,))
grp.create_dataset("tof", data=raw[3], maxshape=(None,))
grp.create_dataset("tot", data=raw[4].astype(np.uint32), maxshape=(None,))
f["raw/tof"].attrs["unit"] = "s"
f["raw/tot"].attrs["unit"] = "s"
f["raw/x"].attrs["unit"] = "pixel"
f["raw/y"].attrs["unit"] = "pixel"
###############
# save time stamp data
if timeStamps is not None and self._startTime is not None:
names = ["trigger nr", "event trigger", "timestamp"]
if f.keys().__contains__("timing/timepix"):
for i, key in enumerate(names):
dset = f["timing/timepix"][key]
dset.resize(dset.shape[0] + len(timeStamps[i]), axis=0)
dset[-len(timeStamps[i]) :] = timeStamps[i]
else:
grp = f.create_group("timing")
grp.attrs["description"] = "timing information from TimePix and facility"
subgrp = grp.create_group("timepix")
subgrp.attrs["description"] = "timing information from TimePix"
subgrp.attrs["nr events"] = 0
for i, key in enumerate(names):
subgrp.create_dataset(key, data=timeStamps[i], maxshape=(None,))
f["timing/timepix/timestamp"].attrs["unit"] = "ns"
# save trigger1 data
if _trigger_data is not None:
for trig_num, trigger in enumerate(_trigger_data):
if trigger is None:
continue
if f.keys().__contains__(f"triggers/trigger{trig_num+1}"):
dset = f[f"triggers/trigger{trig_num+1}/time"]
dset.resize(dset.shape[0] + len(trigger), axis=0)
dset[-len(trigger) :] = trigger
else:
if not f.keys().__contains__("triggers"):
grp = f.create_group("triggers")
grp.attrs["description"] = "triggering information from TimePix"
else:
grp = f["triggers"]
grp.attrs["description"] = "triggering information from TimePix"
subgrp = grp.create_group(f"trigger{trig_num+1}")
subgrp.attrs["description"] = f"trigger{trig_num+1} time from TimePix starting"
subgrp.create_dataset("time", data=trigger, maxshape=(None,))
f[f"triggers/trigger{trig_num+1}/time"].attrs["unit"] = "s"
[docs]
def run(self):
"""method which is executed in new process via multiprocessing.Process.start"""
self.pre_run()
if self.camera_generation == 4:
chunk_size = 65536 # draft implementation of raw file splitting, will need rework
self._longtime = 0
for i, packet in enumerate(self.bytes_from_file("<i8")):
# print(packet)
# pixdata = struct.pack('>i', packet)
self.handle_other(packet)
if i > 0 and i % chunk_size == 0:
self.push_data()
if len(self._packet_buffer) > 0:
self.push_data()
elif self.camera_generation == 3:
for packet in self.bytes_from_file():
# if we'd leave this with numpy we had to write
# ((packet & 0xF000000000000000) >> np.uint64(60)) & np.uint64(0xF)
# see: https://stackoverflow.com/questions/30513741/python-bit-shifting-with-numpy
pixdata = int(packet)
header = ((pixdata & 0xF000000000000000) >> 60) & 0xF
should_push = False
# Read Pixel Matrix Sequential (Header=0hA0)
# Read Pixel Matrix Data-Driven (Header=0hB0)
if header == 0xA or header == 0xB:
self.handle_other(pixdata)
# 0x4X timer configuration
elif header == 0x4 or header == 0x6:
subheader = ((pixdata & 0x0F00000000000000) >> 56) & 0xF
if subheader == 0x4:
self.handle_lsb_time(pixdata)
elif subheader == 0x5:
should_push = self.handle_msb_time(pixdata)
else:
self.handle_other(pixdata)
if should_push:
self.push_data()
if len(self._packet_buffer) > 0:
self.push_data()
else:
raise ValueError(f"No implementation of rawfilesampler for camera version {self.camera_generation}")
self.post_run()