# This file is part of Pymepix## In all scientific work using Pymepix, please reference it as## A. F. Al-Refaie, M. Johny, J. Correa, D. Pennicard, P. Svihra, A. Nomerotski, S. Trippel, and J. Küpper:# "PymePix: a python library for SPIDR readout of Timepix3", J. Inst. 14, P10003 (2019)# https://doi.org/10.1088/1748-0221/14/10/P10003# https://arxiv.org/abs/1905.07999## Pymepix is free software: you can redistribute it and/or modify it under the terms of the GNU# General Public License as published by the Free Software Foundation, either version 3 of the# License, or (at your option) any later version.## This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without# even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU# General Public License for more details.## You should have received a copy of the GNU General Public License along with this program. If not,# see <https://www.gnu.org/licenses/>.importosimportthreadingimporttimeimportzmqimportpymepix.config.load_configascfgfrompymepix.core.logimportProcessLogger# Class to write raw data to files using ZMQ and a new thread to prevent IO blocking
[docs]classRaw2Disk:""" Class for asynchronously writing raw files Intended to allow writing of raw data while minimizing impact on UDP reception reliability. """def__init__(self,context=None):"""Need to pass a ZMQ context object to ensure that inproc sockets can be created"""self.log=ProcessLogger("Raw2Disk")self.log.info("init raw2disk")self.writing=False# Keep track of whether we're currently writing a fileself.stop_thr=Falseself.sock_addr=f"inproc://filewrite-{cfg.default_cfg['zmq_port']}"self.my_context=contextorzmq.Context.instance()# Paired socket allows two-way communicationself.my_sock=self.my_context.socket(zmq.PAIR)self.my_sock.bind(self.sock_addr)self.write_thr=threading.Thread(target=self._run_filewriter_thr,args=(self.sock_addr,None))# self.write_thr.daemon = Trueself.write_thr.start()self.log.debug(f"{__name__} thread started")time.sleep(1)def_run_filewriter_thr(self,sock_addr,context=None):""" Private method that runs in a new thread after initialization. Parameters ---------- sock_addr : str socket address for ZMQ to bind to context ZMQ context """context=contextorzmq.Context.instance()# socket for communication with UDPSamplerinproc_sock=context.socket(zmq.PAIR)inproc_sock.connect(sock_addr)# socket for cummunication with mainz_sock=context.socket(zmq.PAIR)z_sock.connect(f"tcp://127.0.0.1:{cfg.default_cfg['zmq_port']}")self.log.info(f"raw2disk connect to tcp://127.0.0.1:{cfg.default_cfg['zmq_port']}")# socket to maxwellifremote_server:=cfg.default_cfg.get("remote_processing_host")isnotNone:self.log.info(f"connecting to processing server {remote_server}")max_sock=context.socket(zmq.PUSH)max_sock.connect(f"tcp://{remote_server}")else:max_sock=None# State machine etc. local variableswaiting=Truewriting=Falseshutdown=Falsefilehandle=Nonewhilenotshutdown:# wait for instructions, valid commands are# "SHUTDOWN": exits this loop and ends thread# "filename" in the form "/filenamewhilewaiting:cmd=z_sock.recv_string()ifcmd=="SHUTDOWN":self.log.info("SHUTDOWN received")waiting=Falseshutdown=Trueelse:# Interpret as file name / pathfilename=cmdifnotos.path.exists(filename):self.log.info(f"File {filename} opening")# Open filehandlefilehandle=open(filename,"wb")filehandle.write(time.time_ns().to_bytes(8,"little"))# add start time into filez_sock.send_string("OPENED")waiting=Falsewriting=Trueself.writing=Truez_sock.send_string(filename)else:self.log.info(f"{cmd} not a valid command")z_sock.send_string(f"{cmd} in an INVALID command")# start writing received data to a filewhilewriting:# Receive in efficient manner (noncopy with memoryview) and write to file# Check for special message that indicates EOF.data_view=memoryview(inproc_sock.recv(copy=False).buffer)# self.log.debug(f'got {data_view.tobytes()}')iflen(data_view)==3:ifdata_view.tobytes()==b"EOF":self.log.debug("EOF received")writing=Falseself.writing=FalseifwritingisTrue:# print(np.frombuffer(data_view, dtype=np.uint64))filehandle.write(data_view)# close fileiffilehandleisnotNone:self.log.debug("closing file")filehandle.flush()filehandle.close()self.log.debug("file closed")z_sock.send_string("CLOSED")filehandle=Noneifmax_sockisnotNone:# send filename to maxwell for conversionmax_sock.send_string(filename)waiting=True# We reach this point only after "SHUTDOWN" command receivedself.log.debug("Thread is finishing")ifmax_sockisnotNone:max_sock.close()z_sock.close()inproc_sock.close()self.log.debug("Thread is finished")
[docs]defopen_file(self,socket,filename):""" Creates a file with a given filename and path. this doesn't work anylonger using 2 sockets for the communication functionality needs to be put outside where you have access to the socket """ifself.writingisFalse:socket.send_string(filename)response=socket.recv_string()# Check reply from threadifresponse=="OPENED":self.writing=TruereturnTrueelse:self.log.warning("File name not valid")returnFalseelse:self.log.warning("Already writing file!")returnFalse
[docs]defclose(self,socket):""" Close the file currently in progress. call in main below """ifself.writingisTrue:self.my_sock.send(b"EOF")response=socket.recv_string()ifresponse!="CLOSED":self.log.warning("Didn't get expected response when closing file")returnFalseelse:returnTrueelse:self.log.info("Cannot close file - we are not writing anything!")returnFalse
[docs]defwrite(self,data):""" Writes data to the file. Parameter is buffer type (e.g. bytearray or memoryview) Not sure how useful this function actually is... It completes the interface for this class but from a performance point of view it doesn't improve things. How could this be benchmarked? """ifself.writingisTrue:self.my_sock.send(data,copy=False)else:self.log.warning("Cannot write data - file not open")returnFalse
# Destructor - called automatically when object garbage collecteddef__del__(self):"""Stuff to make sure sockets and files are closed..."""ifself.write_thr.is_alive():ifself.writingisTrue:self.close()self.my_sock.send_string("SHUTDOWN")self.write_thr.join()time.sleep(1)self.my_sock.close()self.log.debug("object deleted")else:self.log.debug("thread already closed")
[docs]defmain_process():""" seperate process not strictly necessary, just to double check if this also works with multiprocessing doesn't work for debugging """# Create the loggerimportlogginglogging.basicConfig(level=logging.DEBUG,format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",)# zmq socket for communication with write2disk threadctx=zmq.Context.instance()z_sock=ctx.socket(zmq.PAIR)z_sock.bind(f"tcp://127.0.0.1:{cfg.default_cfg['zmq_port']}")write2disk=Raw2Disk()""" ###### # test 0 write2disk.my_sock.send_string('hallo echo') logging.info(write2disk.my_sock.recv_string()) these example only work if thread uses self.writing directly ###### # test 1 filename = './test1.raw' write2disk.my_sock.send_string(filename) if write2disk.my_sock.recv_string() != 'OPENED': logging.error("Huston, here's a problem, file cannot be created.") text = b'Hallo, heute ist ein guter Tag.' write2disk.my_sock.send(text) write2disk.my_sock.send(b'EOF') assert write2disk.my_sock.recv_string() == "CLOSED" logging.debug("File closed") with open(filename, 'rb') as f: file_content = f.read() assert file_content == text ###### # test 2 filename = './test2.raw' write2disk.my_sock.send_string(filename) if write2disk.my_sock.recv_string() != 'OPENED': logging.error("Huston, here's a problem, file cannot be created.") text = b'Hallo, heute ist immer noch ein guter Tag.' write2disk.my_sock.send(text) if write2disk.close(): logging.debug('file closed') else: logging.error('something went wrong closing the file') with open(filename, 'rb') as f: file_content = f.read() assert file_content == text """####### test 3print("test 3")filename="./test3.raw"ifwrite2disk.open_file(z_sock,filename):print(f"file {filename} opened")else:print("Huston, here's a problem, file cannot be created.")text=b"What a nice day!"write2disk.my_sock.send(text,copy=False)write2disk.write(text)ifwrite2disk.close(z_sock):logging.info(f"File {filename} closed.")else:logging.error(f"File {filename} could not be closed.")# check actual file contentwithopen(filename,"rb")asf:file_content=f.read()assertfile_content==text+textz_sock.send_string("SHUTDOWN")# print(write2disk.my_sock.recv())write2disk.write_thr.join()
[docs]defmain():# from multiprocessing import Process# Process(target=main_process).start()main_process()