# This file is part of Pymepix
#
# In all scientific work using Pymepix, please reference it as
#
# A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:
# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)
# https://doi.org/10.1088/1748-0221/14/10/P10003
# https://arxiv.org/abs/1905.07999
#
# Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU
# General Public License as published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without
# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with this program. If not,
# see <https://www.gnu.org/licenses/>.
import ctypes
import multiprocessing
import socket
import time
from multiprocessing.sharedctypes import Value
import numpy as np
import zmq
import pymepix.config.load_config as cfg
from pymepix.core.log import ProcessLogger as Logger
# from pymepix.processing.basepipeline import BasePipelineObject
from pymepix.processing.rawtodisk import Raw2Disk
[docs]
class UdpSampler(multiprocessing.Process):
"""Recieves udp packets from SPDIR
This class, creates a UDP socket connection to SPIDR and recivies the UDP packets from Timepix
It them pre-processes them and sends them off for more processing
"""
def __init__(
self,
longtime,
chunk_size=10_000,
flush_timeout=0.3,
input_queue=None,
create_output=True,
num_outputs=1,
shared_output=None,
):
# BasePipelineObject.__init__(self, 'UdpSampler', input_queue=input_queue, create_output=create_output,
# num_outputs=num_outputs, shared_output=shared_output)
self.log = Logger(__name__)
multiprocessing.Process.__init__(self)
self.init_param = {
"chunk_size": chunk_size,
"flush_timeout": flush_timeout,
"longtime": longtime,
}
self._record = Value(ctypes.c_bool, False)
self._enable = Value(ctypes.c_bool, True)
self._close_file = Value(ctypes.c_bool, False)
self.loop_count = 0
[docs]
def init_new_process(self):
"""create connections and initialize variables in new process"""
self.create_socket_connection()
self._chunk_size = self.init_param["chunk_size"] * 8192
self._flush_timeout = self.init_param["flush_timeout"]
self._packets_collected = 0
self._packet_buffer_list = [
bytearray(int(1.5 * self._chunk_size)) for i in range(10)
] # ring buffer to put received data in
self._buffer_list_idx = 0
self._packet_buffer_view_list = [
memoryview(self._packet_buffer_list[i]) for i in range(len(self._packet_buffer_list))
]
self._packet_buffer_view = self._packet_buffer_view_list[self._buffer_list_idx]
self._recv_bytes = 0
self._total_time = 0.0
self._longtime = self.init_param["longtime"]
# create connection to packetprocessor
self.log.debug("create packetprocessor socket")
ctx = zmq.Context.instance()
self._packet_sock = ctx.socket(zmq.PUSH)
self.log.info(f"bind packet processor to: tcp://127.0.0.1:{cfg.default_cfg['zmq_port']+1}")
try:
self._packet_sock.bind(f"tcp://127.0.0.1:{cfg.default_cfg['zmq_port']+1}")
except Exception as e:
self.log.error(f"Exception occured in UdpSampler: {e}")
self.log.error(e, exc_info=True)
raise
[docs]
def create_socket_connection(self):
# check if keys are in config file
try:
addr1 = (
cfg.default_cfg["timepix"]["udp_ip1"],
cfg.default_cfg["timepix"]["data_port"],
)
except KeyError as e:
self.log.error(f"The key {e} is not configured in the config file.")
"""Establishes a UDP connection to spidr"""
self._sock1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Internet # UDP
try:
self._sock1.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 300_000_000) # NIC buffer
except OSError:
self.log.warning("NIC memory you try to allocate is too much.")
self._sock1.settimeout(1.0)
self.log.info("UDPSampler establishing connection to : {}:{}".format(*addr1))
# check if address can be bound
try:
self._sock1.bind(addr1)
except OSError as e:
self.log.error(f"{e} {addr1} doesn't work.")
[docs]
def get_useful_packets(self, packet):
# Get the header
header = ((packet & 0xF000000000000000) >> 60) & 0xF
subheader = ((packet & 0x0F00000000000000) >> 56) & 0xF
pix_filter = (header == 0xA) | (header == 0xB)
trig_filter = ((header == 0x4) | (header == 0x6)) & (subheader == 0xF)
tpx_filter = pix_filter | trig_filter
tpx_packets = packet[tpx_filter]
return tpx_packets
@property
def enable(self):
"""Enables processing
Determines whether the class will perform processing, this has the result of signalling the process to terminate.
If there are objects ahead of it then they will stop receiving data
if an input queue is required then it will get from the queue before checking processing
This is done to prevent the queue from growing when a process behind it is still working
Parameters
-----------
value : bool
Enable value
Returns
-----------
bool:
Whether the process is enabled or not
"""
return bool(self._enable.value)
@enable.setter
def enable(self, value):
self.log.debug("Setting enabled flag to {}".format(value))
self._enable.value = bool(value)
@property
def record(self):
"""Enables saving data to disk
Determines whether the class will perform processing, this has the result of signalling the process to terminate.
If there are objects ahead of it then they will stop recieving data
if an input queue is required then it will get from the queue before checking processing
This is done to prevent the queue from growing when a process behind it is still working
Parameters
-----------
value : bool
Enable value
Returns
-----------
bool:
Whether the process should record and write to disk or not
"""
return bool(self._record.value)
@record.setter
def record(self, value):
self.log.debug(f"Setting record flag to {value}")
self._record.value = bool(value)
@property
def close_file(self):
return bool(self._close_file.value)
@close_file.setter
def close_file(self, value):
self.log.debug(f"Setting close_file to {value}")
self._close_file.value = bool(value)
@property
def outfile_name(self):
return self._outfile_name
@outfile_name.setter
def outfile_name(self, fileN):
self.log.info(f"Setting file name flag to {fileN}")
if self.write2disk.open_file(fileN):
self.log.info(f"file {fileN} opened")
else:
self.log.error("Huston, here's a problem, file cannot be created.")
[docs]
def pre_run(self):
"""init stuff which should only be available in new process"""
self.init_new_process()
self.write2disk = Raw2Disk()
self._last_update = time.time()
[docs]
def post_run(self):
"""
method get's called either at the very end of the process live or
if there's a socket timeout and raw2disk file should be closed
"""
if self._recv_bytes > 1:
bytes_to_send = self._recv_bytes
self._recv_bytes = 0
curr_list_idx = self._buffer_list_idx
self._buffer_list_idx = (self._buffer_list_idx + 1) % len(self._packet_buffer_list)
self._packet_buffer_view = self._packet_buffer_view_list[self._buffer_list_idx]
self.write2disk.my_sock.send(self._packet_buffer_list[curr_list_idx][:bytes_to_send], copy=False)
if self.write2disk.writing:
self.write2disk.my_sock.send(
b"EOF"
) # we should get a response here, this ends up in nirvana at this point
self.log.debug("post_run: closed file")
# return MessageType.RawData, (
# self._packet_buffer_list[curr_list_idx][:bytes_to_send], self._longtime.value)
else:
if self.write2disk.writing:
self.log.debug("post_run: close file")
self.write2disk.my_sock.send(b"EOF") # we should get a response here, but the socket is elsewhere...
self.log.debug("post_run: closed file")
return None, None
[docs]
def run(self):
"""method which is executed in new process via multiprocessing.Process.start"""
self.pre_run()
enabled = self.enable
start = time.time()
while True:
if enabled:
try:
self._recv_bytes += self._sock1.recv_into(self._packet_buffer_view[self._recv_bytes :])
except socket.timeout:
enabled = self.enable
# put close file here to get the cases where there's no data coming and file should be closed
# mainly there for test to succeed
if self.close_file:
self.close_file = False
self.post_run()
else:
self.log.debug("Socket timeout")
except socket.error:
self.log.debug("socket error")
self._packets_collected += 1
end = time.time()
self._total_time += end - start
# if self._packets_collected % 1000 == 0:
# self.log.debug('Packets collected {}'.format(self._packets_collected))
# self.log.debug('Total time {} s'.format(self._total_time))
flush_time = end - self._last_update
# sends empty packets if flush_timeout==True but no data was received
if (self._recv_bytes > self._chunk_size) or (flush_time > self._flush_timeout):
# tpx_packets = self.get_useful_packets(packet)
if self.record:
self.write2disk.my_sock.send(
self._packet_buffer_list[self._buffer_list_idx][: self._recv_bytes],
copy=False,
)
elif self.close_file:
self.close_file = False
self.log.debug("received close file")
self.write2disk.my_sock.send(
self._packet_buffer_list[self._buffer_list_idx][: self._recv_bytes],
copy=False,
)
self.write2disk.my_sock.send(b"EOF")
# send stuff to packet processor only every 10th iteration
# elif self._buffer_list_idx == 9:
# add longtime to buffers end
bytes_to_send = self._recv_bytes + 8
self._packet_buffer_view[self._recv_bytes : bytes_to_send] = np.uint64(
self._longtime.value
).tobytes()
self._packet_sock.send(
self._packet_buffer_list[self._buffer_list_idx][:bytes_to_send],
copy=False,
)
self._recv_bytes = 0
self._buffer_list_idx = (self._buffer_list_idx + 1) % len(self._packet_buffer_list)
self._packet_buffer_view = self._packet_buffer_view_list[self._buffer_list_idx]
self._last_update = time.time()
enabled = self.enable
# if len(packet) > 1:
# if not curr_list_idx % 20:
# return MessageType.RawData, (self._packet_buffer_list[curr_list_idx][:bytes_to_send], self._longtime.value)
# else:
# return None, None
# else:
# return None, None
# else:
# return None, None
else:
self.log.debug("UDPSampler: I AM LEAVING")
break
self.post_run()
self.write2disk.my_sock.close()
self._packet_sock.close()
[docs]
def send_data(packets, chunk_size, start=0, sleep=0.0001):
############
# send data
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
test_data = np.arange(
start, start + packets * chunk_size, dtype=np.uint64
) # chunk size 135 -> max number=1_012_500
test_data_view = memoryview(test_data)
time.sleep(1) # seems to be necessary if this function get called as a Process
# first packet 0...134, second packet 135...269 and so on
print("sending process")
start = time.time()
for i in range(0, len(test_data_view), chunk_size):
sock.sendto(test_data_view[0 : 0 + chunk_size], ("127.0.0.1", 50000))
# time.sleep(sleep) # if there's no sleep, packets get lost
stop = time.time()
dt = stop - start
print(
f"time to send {dt:.2f}",
f"time to send 1M: {dt/packets*1_000_002:.2f}s, "
# f'bytes: {len(test_data_view.tobytes())}, '
f"MBytes: {len(test_data_view.tobytes()) * 1e-6:.1f}, "
f"{len(test_data_view.tobytes()) * 1e-6 / dt:.2f} MByte/s",
)
# return test_data
[docs]
def main():
cfg.default_cfg['timepix']['ip'] = '127.0.0.1'
cfg.default_cfg['timepix']['data_port'] = 50000
###############
# take data form Queue where PacketProcessor would be sitting
# create connection to packetprocessor
ctx = zmq.Context.instance()
z_sock = ctx.socket(zmq.PULL)
z_sock.connect(f"tcp://127.0.0.1:{cfg.default_cfg['zmq_port']+1}")
longtime = Value("L", 1)
sampler = UdpSampler(longtime)
time.sleep(1) # give thread time to start
# send data
packets = 3_500_000
chunk_size = 139
# test_data = np.arange(0, packets * chunk_size, dtype=np.uint64)
# test_data = send_data(packets=10_000, chunk_size=135, start=15000, sleep=1e-4)
p = multiprocessing.Process(target=send_data, args=(packets, chunk_size, 0, 0))
start = time.time()
p.start()
sampler.start()
p.join()
print("sending finished")
stop = time.time()
# z_sock.send_string("SHUTDOWN")
time.sleep(1)
sampler.close_file = True
sampler.enable = False
z_sock.close()
time.sleep(2)
print(f"took {stop - start}s")
sampler.terminate()
if __name__ == "__main__":
main()