SQLamarr
The stand-alone ultra-fast simulation option for the LHCb experiment
Pipeline.py
1 # (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration.
2 #
3 # This software is distributed under the terms of the GNU General Public
4 # Licence version 3 (GPL Version 3), copied verbatim in the file "LICENCE".
5 #
6 # In applying this licence, CERN does not waive the privileges and immunities
7 # granted to it by virtue of its status as an Intergovernmental Organization
8 # or submit itself to any jurisdiction.
9 
10 import ctypes
11 from ctypes import POINTER
12 from SQLamarr import clib, c_TransformerPtr
13 
14 from typing import List, Any
15 
16 
17 clib.execute_pipeline.argtypes = (ctypes.c_int, POINTER(c_TransformerPtr))
18 clib.execute_pipeline.restype = ctypes.c_int
19 
20 SQL_ERRORSHIFT = 10000
21 LOGIC_ERRORSHIFT = 20000
22 
23 class SQLiteError (RuntimeError):
24  pass
25 
26 
27 class Pipeline:
28  """
29  The `Pipeline` object defines the envelop for running C++ transformers from
30  Python.
31 
32  One or multiple algorithms can be enqueed at construction time and executed
33  by calling the method `execute()`.
34 
35  C++-bounded transformer and `PyTransformer`s can be mixed up in the pipeline,
36  however note that passing the control from a C++ algorithm to another C++
37  algorithm has a much less overhead than passing the control to or from a
38  Python algorithm.
39 
40  Hence, if logically possible, one should avoid interleaving C++ and Python
41  algorithms.
42  """
43  def __init__(self, algoritms: List[Any]):
44  """
45  Acquire the list of algorithms
46  """
47  self._algorithms_algorithms = algoritms
48 
49 
50  @staticmethod
51  def _exec_chunk (chunk):
52  """@private Execute a sequence of C++-only transformers"""
53  ArrayOfAlgos = c_TransformerPtr * len(chunk)
54  buf = ArrayOfAlgos(*[c.raw_pointer for c in chunk])
55  ret = clib.execute_pipeline (len(chunk), buf)
56 
57  if ret >= SQL_ERRORSHIFT and ret < SQL_ERRORSHIFT + len(chunk):
58  raise SQLiteError(f"Failed executing {chunk[ret-SQL_ERRORSHIFT]}")
59  elif ret >= LOGIC_ERRORSHIFT and ret < LOGIC_ERRORSHIFT + len(chunk):
60  raise RuntimeError(f"Failed executing {chunk[ret-LOGIC_ERRORSHIFT]}")
61  elif ret != 0:
62  raise Exception("Unknown error code from pipeline exec")
63 
64  def execute(self):
65  """Execute the list of algorithms"""
66  chunk = []
67  for alg in self._algorithms_algorithms:
68  if hasattr(alg, '__call__'):
69  self._exec_chunk_exec_chunk(chunk)
70  chunk = []
71  alg()
72  elif hasattr(alg, 'raw_pointer'):
73  chunk.append(alg)
74  else:
75  raise TypeError(
76  f"Unexpected algorithm {alg} ({alg.__class__.__name__})"
77  )
78 
79  self._exec_chunk_exec_chunk(chunk)
80 
81 
The Pipeline object defines the envelop for running C++ transformers from Python.
Definition: Pipeline.py:27
def __init__(self, List[Any] algoritms)
Acquire the list of algorithms.
Definition: Pipeline.py:43
def execute(self)
Execute the list of algorithms.
Definition: Pipeline.py:64