PyLamarr
Pythonizations for the ultra-fast simulation option for the LHCb experiment
 
Loading...
Searching...
No Matches
CompressedHepMCLoader.py
1from typing import List
2from contextlib import contextmanager
3from math import ceil
4import shutil
5import random
6import os.path
7from glob import glob
8from dataclasses import dataclass
9import tarfile
10import logging
11import re
12
13from typing import Optional, Collection, Any
14
15from PyLamarr import EventBatch
16
17@dataclass
19 _hepmcloader: Any = None # Expected SQLamarr.HepMC2DataLoader not explicit for optional SQLamarr dep
20 input_files: Collection[str] = None
21 run_numbers: Collection[int] = None
22 event_numbers: Collection[int] = None
23
24 def load(self):
25 files_runs_events = zip(self.input_files, self.run_numbers, self.event_numbers)
26 for hepmc_file, run_number, event_number in files_runs_events:
27 logging.getLogger('HepMC2EventBatch').debug(f"Loading file {hepmc_file}")
28 self._hepmcloader.load(hepmc_file, run_number, event_number)
29 logging.getLogger("HepMC2EventBatch").debug("Loaded.")
30
32 """
33 Adapter to read HepMC2 files compressed in a tar file. Requires SQLamarr.
34 """
35 def __init__(self,
36 regexp_runNumber: str = "Run([0-9]+)",
37 regexp_evtNumber: str = "evt([0-9]+)",
38 regexp_totEvents: str = "([0-9]+)ev[^\w]",
39 tmpdir: str = "/tmp",
40 max_event: Optional[int] = None,
41 events_per_batch: Optional[int] = None,
42 ):
43 self.tmpdir = tmpdir
44 self._db = None
45 self._hepmcloader = None
46 self._batch_counter = 0
47 self._regexp_runNumber = regexp_runNumber
48 self._regexp_evtNumber = regexp_evtNumber
49 self._regexp_totEvents = regexp_totEvents
50 self._max_event = max_event
51 self._events_per_batch = events_per_batch
53 self.logger = logging.getLogger("CompressedHepMCLoad")
54
55 def __call__(self, database):
56 import SQLamarr
57 self._db = database
59 return self
60
61 def _get_run_number(self, filename) -> int:
62 matches = re.findall(self._regexp_runNumber, filename)
63 if len(matches) > 0:
64 self._batch_counter = matches[-1]
65 else:
66 self._batch_counter += 1
67
68 return int(self._batch_counter)
69
70 def _get_evt_number(self, filename: str, default: int) -> int:
71 matches = re.findall(self._regexp_evtNumber, filename)
72 return int(matches[-1]) if len(matches) else default
73
74 def _get_number_of_events(self, filename: str, default: int) -> int:
75 matches = re.findall(self._regexp_totEvents, filename)
76 return int(matches[-1]) if len(matches) else default
77
78
79 @contextmanager
80 def archive_mirror(self, filename: str):
81 tmp_dir = os.path.join(
82 self.tmpdir,
83 f"pylamarr.tmp.{random.randint(0, 0xFFFFFF):06x}"
84 )
85 self.logger.info(f"Creating temporary directory {tmp_dir}")
86 os.mkdir(tmp_dir)
87
88 try:
89 yield self.files_in_archive(filename, tmp_dir=tmp_dir)
90 finally:
91 self.logger.info(f"Removing directory {tmp_dir}")
92 shutil.rmtree(tmp_dir)
93
94 def copy_and_maybe_patch_hepmc(self, input_file_data: str):
95 """
96 Apply patches to the HepMC2 file to avoid segmentation fault in HepMC3 ascii reader
97 """
98 requires_particle_gun_patch = False
99 src_lines = input_file_data.split('\n')
100 if len([li for li in src_lines if li.replace("\n", "").replace(" ", "") != ""]) == 0:
101 self.logger.warning(f"No valid line found in input file")
102 dst_lines = []
103
104 for line in src_lines:
105 line = line[:-1] if len(line) > 0 and line[-1] == '\n' else line
106 if len(line) > 0 and line[0] == 'E':
107 tokens = line.split(" ")
108 # Documentation at https://hepmc.web.cern.ch/hepmc/releases/HepMC2_user_manual.pdf
109 # Section 6.2
110 if int(tokens[6]) == 1: # For Particle Gun process
112 n_vertices = int(tokens[8])
113 tokens[8] = str(n_vertices + 1)
114 tokens[12 + int(tokens[11])] = str(1)
115 tokens += ["1.0"]
116 requires_particle_gun_patch = True
117 dst_lines += [" ".join(tokens), 'N 1 "0"']
118 else:
119 dst_lines.append(line)
120 elif len(line) > 0 and line[0] == 'V' and requires_particle_gun_patch: # First vertex
121 # PGUN Patch:
122 # HepMC3::HepMC2Reader does not tolerate a PV with no incoming particles,
123 # so we create a fake vertex and a fake beam particle.
124 vertex_id = line.split(" ")[1]
125 dst_lines += ["V -99999 0 0 0 0 0 0 0 0", "P 0 0 0. 0. 0. 0. 0. 3 0 0 %s 0" % vertex_id, line]
126 requires_particle_gun_patch = False
127 else:
128 dst_lines.append(line)
129
130 return "\n".join(dst_lines)
131
132 def files_in_archive(self, filename: str, tmp_dir: str):
133 ret = []
134 with tarfile.open(filename, mode='r:*') as tar:
135 for member in tar.getmembers():
136 if member.isfile() and member.name.endswith("mc2"):
137 key = os.path.basename(member.name)
138 file_content = tar.extractfile(member).read().decode('utf-8')
139 patched_filename = os.path.join(tmp_dir, os.path.basename(key))
140 with open(patched_filename, 'w') as file_copy:
141 file_copy.write(self.copy_and_maybe_patch_hepmc(file_content))
142 ret.append(patched_filename)
143
144 return ret
145
146 def load(self, filename: str):
147 """
148 Internal.
149 """
150 if self._db is None:
151 raise ValueError("CompressedHepMCLoader tried loading with uninitialized db.\n"
152 "Missed ()?")
153
154 event_counter = 0
155 batch_counter = 0
156 tot_events = min(
157 self._get_number_of_events(filename, -1),
158 self._max_event if self._max_event is not None else 0x7FFFFFFF
159 )
160
161 batches = {k: [] for k in ('input_files', 'run_numbers', 'event_numbers')}
162 batch_info = dict()
163 with self.archive_mirror(filename) as files_in_archive:
164 for i_file, hepmc_file in enumerate(files_in_archive):
165 run_number = self._get_run_number(filename)
166 event_number = self._get_evt_number(hepmc_file, i_file)
167 n_events = len(batches['event_numbers'])
168 batch_info.update(dict(
169 n_events=n_events,
170 batch_id=batch_counter,
171 description=f"Run {run_number}",
172 _hepmcloader=self._hepmcloader,
173 ))
174
175 if tot_events > 0 and self._events_per_batch is not None:
176 batch_info['n_batches'] = ceil(tot_events/self._events_per_batch)
177
178 if self._max_event is not None and event_counter >= self._max_event:
179 break
180
181 if self._events_per_batch is not None and n_events >= self._events_per_batch:
182 yield HepMC2EventBatch(
183 **batch_info,
184 **batches
185 )
186 batches = {k: [] for k in batches.keys()}
187 batch_counter += 1
188
189
190 batches['input_files'].append(hepmc_file)
191 batches['run_numbers'].append(run_number)
192 batches['event_numbers'].append(event_number)
193 event_counter += 1
194
196 self.logger.warning(
197 f"{self._particle_gun_patched_events} / {event_counter} events were identified as generated with a "
198 "Particle Gun and patched."
199 )
200
201
Adapter to read HepMC2 files compressed in a tar file.
copy_and_maybe_patch_hepmc(self, str input_file_data)
Apply patches to the HepMC2 file to avoid segmentation fault in HepMC3 ascii reader.