11 from ctypes
import POINTER
12 from SQLamarr
import clib, c_TransformerPtr
14 from typing
import List, Any
17 clib.execute_pipeline.argtypes = (ctypes.c_int, POINTER(c_TransformerPtr))
18 clib.execute_pipeline.restype = ctypes.c_int
20 SQL_ERRORSHIFT = 10000
21 LOGIC_ERRORSHIFT = 20000
29 The `Pipeline` object defines the envelop for running C++ transformers from
32 One or multiple algorithms can be enqueed at construction time and executed
33 by calling the method `execute()`.
35 C++-bounded transformer and `PyTransformer`s can be mixed up in the pipeline,
36 however note that passing the control from a C++ algorithm to another C++
37 algorithm has a much less overhead than passing the control to or from a
40 Hence, if logically possible, one should avoid interleaving C++ and Python
45 Acquire the list of algorithms
51 def _exec_chunk (chunk):
52 """@private Execute a sequence of C++-only transformers"""
53 ArrayOfAlgos = c_TransformerPtr * len(chunk)
54 buf = ArrayOfAlgos(*[c.raw_pointer
for c
in chunk])
55 ret = clib.execute_pipeline (len(chunk), buf)
57 if ret >= SQL_ERRORSHIFT
and ret < SQL_ERRORSHIFT + len(chunk):
58 raise SQLiteError(f
"Failed executing {chunk[ret-SQL_ERRORSHIFT]}")
59 elif ret >= LOGIC_ERRORSHIFT
and ret < LOGIC_ERRORSHIFT + len(chunk):
60 raise RuntimeError(f
"Failed executing {chunk[ret-LOGIC_ERRORSHIFT]}")
62 raise Exception(
"Unknown error code from pipeline exec")
65 """Execute the list of algorithms"""
68 if hasattr(alg,
'__call__'):
72 elif hasattr(alg,
'raw_pointer'):
76 f
"Unexpected algorithm {alg} ({alg.__class__.__name__})"
The Pipeline object defines the envelop for running C++ transformers from Python.
def __init__(self, List[Any] algoritms)
Acquire the list of algorithms.
def execute(self)
Execute the list of algorithms.