PyLamarr
Pythonizations for the ultra-fast simulation option for the LHCb experiment
 
Loading...
Searching...
No Matches
BasePipeline.py
1import xml.etree.ElementTree as e3
2import os
3import itertools
4import PyLamarr
5import logging
6import sys
7import threading
8import sqlite3
9from PyLamarr import GenericWrapper
10from PyLamarr.RemoteResource import RemoteResource as RemoteRes
11
12from typing import List, Tuple, Any, Union, Dict
13
14
16 def __init__(self,
17 sequence: Union[List[Tuple[Any]], None] = None,
18 loader: str = "HepMC2DataLoader",
19 batch=1,
20 dbfile_fmt="file:/tmp/lamarr.{thread:016x}.db",
21 clean_before_loading=True,
22 clean_after_finishing=True,
23 ):
24 self.logger = logging.getLogger(self.__class__.__name__)
25 PyLamarr.configure_logger()
26 self.logger.info(f"Python {sys.version}".replace("\n", " "))
27
28 with sqlite3.connect(":memory:") as c:
29 sqlite_version = c.execute("SELECT sqlite_version()").fetchall()[0][0]
30 self.logger.info(f"Running with SQLite version {sqlite_version} "
31 f"(bindings: {sqlite3.version})")
32
33 try:
34 import SQLamarr
35 except (ImportError, OSError):
36 self._sqlamarr_available = False
37 self.logger.warning(f"SQLite not found. "
38 "You can still build a configuration, but not run it.")
39 else:
40 self._sqlamarr_available = True
41 self.logger.info(f"Running with SQLamarr version {SQLamarr.version}")
42
43 self._sequence = sequence if sequence is not None else self.default_sequence
44 self._loader = loader
45 self._batch = batch
46 self._dbfile_fmt = dbfile_fmt
47 self._clean_before_loading = clean_before_loading
48 self._clean_after_finishing = clean_after_finishing
49
50 @property
51 def default_sequence(self):
52 return []
53
54 @property
55 def sequence(self):
56 return self._sequence
57
58 @property
59 def loader(self):
60 if isinstance(self._loader, str) and self._sqlamarr_available:
61 import SQLamarr
62 return getattr(SQLamarr, self._loader)
63 return self._loader
64
65 @property
66 def batch(self):
67 return self._batch
68
69 @loader.setter
70 def loader(self, new_loader):
71 self._loader = new_loader
72
73 @batch.setter
74 def batch(self, new_batch):
75 self._batch = new_batch
76
77 @staticmethod
78 def _batched(batch, batch_size):
79 if batch_size < 1:
80 raise ValueError("Batch size must be larger than 1")
81
82 it = iter(batch)
83 while True:
84 batch = tuple(itertools.islice(it, batch_size))
85 if batch:
86 yield batch
87 else:
88 break
89
90 def execute(self,
91 load_args: List[Tuple[Dict]],
92 thread_id: Union[int, None] = None
93 ):
94 if not self._sqlamarr_available:
95 raise ImportError("SQLite is needed for pipeline.execute(). "
96 "Please reinstall as `pip install PyLamarr[SQLamarr]`")
97
98 import SQLamarr
99
100 tid = thread_id if thread_id is not None else threading.get_ident()
101 # FIXME: inmemory DB is not working here...
102 # db = SQLamarr.SQLite3DB(f"file:memdb{tid}?mode=memory&cache=shared")
103 parsed_fmt = self._dbfile_fmt.format(thread=tid)
104 self.logger.info(f"Connecting to SQLite db: {parsed_fmt}")
105 db = SQLamarr.SQLite3DB(parsed_fmt)
106 db.seed(tid)
107 loader = self.loaderloader(db)
108
109 clean = SQLamarr.Pipeline([
111 ])
112
113 pipeline = SQLamarr.Pipeline(
114 [make_algo(db) for _, make_algo in self.sequencesequence],
115 )
116
117 self.logger.info(f"Algorithms:")
118 for iAlg, (name, _) in enumerate(self.sequencesequence, 1):
119 self.logger.info(f" {iAlg:>2d}. {name}")
120
121 for batch in self._batched(load_args, self.batchbatchbatch):
122 for load_arg in batch:
123 self.logger.info(f"Loading {load_arg}")
124 if isinstance(load_arg, (list, tuple)):
125 sub_batch_generator = loader.load(*load_arg)
126 elif isinstance(load_arg, (dict,)):
127 sub_batch_generator = loader.load(**load_arg)
128 else:
129 sub_batch_generator = loader.load(load_arg)
130
131 for sub_batch in sub_batch_generator:
132 self.logger.info(f"Processing {sub_batch}")
133 if self._clean_before_loading:
134 self.logger.debug("Cleaning database for processing a new batch")
135 clean.execute()
136 else:
137 self.logger.warning("Cleaning database was DISABLED")
138
139 self.logger.debug(f"Executing pipeline on a batch of {len(sub_batch)} events")
140 sub_batch.load()
141
142 self.logger.debug(f"Executing the pipeline")
143 pipeline.execute()
144
145 self.logger.debug(f"Completed processing of batch")
146
148 if parsed_fmt.startswith("file:"):
149 if "mode=memory" not in parsed_fmt:
150 if '?' in parsed_fmt:
151 os.remove(parsed_fmt[len('file:'):parsed_fmt.index('?')])
152 else:
153 os.remove(parsed_fmt[len('file:'):])
154 else:
155 os.remove(parsed_fmt)
156
157
158 def to_xml(self, file_like) -> None:
159 root = e3.Element("pipeline", batch=str(self.batchbatchbatch))
160
161 for k, w in self.sequencesequence:
162 if hasattr(w, 'to_xml'):
163 w.to_xml(root).attrib['step'] = k
164 else:
165 self.logger.warning(f"XML serialization unavailable for {k}. Skipped.")
166
167 file_like.write(e3.tostring(root, encoding='unicode'))
168
169 @classmethod
170 def read_xml(cls, file_like):
171 root = e3.fromstring(file_like.read())
172 if root.tag.lower() not in ['pipeline']:
173 raise IOError(f"Unexpected ROOT tag {root.tag}")
174
175 batch_size = int(root.attrib.get("batch", 1))
176
177 algs = []
178 for child in root:
179 alg_type = child.tag
180 alg_name = child.attrib.get("name", alg_type)
181 step_name = child.attrib.get('step', alg_name)
182 config = dict()
183 for cfg_node in child:
184 if cfg_node.tag.lower() == "config":
185 if cfg_node.attrib['type'] == 'str':
186 config[cfg_node.attrib['key']] = cfg_node.text
187 elif cfg_node.attrib['type'] == 'seq':
188 config[cfg_node.attrib['key']] = cfg_node.text.split(";")
189 elif cfg_node.attrib['type'] == 'url':
190 config[cfg_node.attrib['key']] = RemoteRes(cfg_node.text)
191 else:
192 raise NotImplementedError(
193 f"Unexpected type {cfg_node.attrib['type']} for {cfg_node.attrib['key']}"
194 )
195
196 algs.append((step_name, GenericWrapper(implements=alg_type, config=config)))
197
198 return cls(algs, batch=batch_size)
Resource on the Internet, locally cached.
std::unique_ptr< sqlite3, void(*)(sqlite3 *)> SQLite3DB