PyLamarr
Pythonizations for the ultra-fast simulation option for the LHCb experiment
 
Loading...
Searching...
No Matches
Wrapper.py
1import xml.etree.ElementTree as e3
2from functools import wraps
3import logging
4
5from dataclasses import dataclass, field, KW_ONLY, fields
6from typing import Dict, Any, List, Optional, Tuple, Union
7from pydantic import validate_arguments
8import ctypes
9
10from PyLamarr.RemoteResource import RemoteResource
11
12
13@dataclass(frozen=True)
14class Wrapper:
15 def __call__(self, db):
16 import SQLamarr
17 try:
18 config = {k: v.file if isinstance(v, RemoteResource) else v
19 for k, v in self.get_config().items()}
20
21 return getattr(SQLamarr, self.get_imp())(db, **config)
22 except (TypeError, AttributeError) as e:
23 logging.getLogger("Wrapper").error(
24 f"Error while configuring {self.__class__.__name__} "
25 f"as {self.get_imp()}"
26 )
27 raise e
28
29 def get_imp(self):
30 if hasattr(self, "implements"):
31 return getattr(self, "implements")
32 raise AttributeError("Class derived from Wrapper does not implement `implements` property")
33
34 def get_config(self):
35 if not hasattr(self, "config"):
36 raise AttributeError("Class derived from Wrapper does not implement `config` property")
37
38 if 'query' in self.config.keys() and len(self.config['query']) > 16384:
39 raise ValueError("Length of the query [{self.__class__.__name__}] exceeds maximum length of 16384 bytes")
40
41 return getattr(self, "config")
42
43
44
45 def to_xml(self, parent_node):
46 node = e3.SubElement(
47 parent_node,
48 self.implements,
49 name=self.__class__.__name__
50 )
51
52 for k, v in self.config.items():
53 print ("#=#=#=#=")
54 print (k, v, type(v))
55 if isinstance(v, RemoteResource):
56 e3.SubElement(node, "CONFIG", key=k, type='url').text = v.remote_url
57 elif isinstance(v, (list, tuple, set)):
58 e3.SubElement(node, "CONFIG", key=k, type='seq').text = ";".join(v)
59 else:
60 e3.SubElement(node, "CONFIG", key=k, type='str').text = str(v)
61
62 return node
63
64
65@validate_arguments
66@dataclass(frozen=True)
68 implements: str
69 config: Dict[str, Any] = field(default_factory=dict)
70
71
72
73
75 def __init__(self,
76 output_columns: List[str],
77 make_persistent: Optional[bool] = True
78 ):
79 self.output_columns = output_columns
80 self.make_persistent = make_persistent
81
82 def __call__(self, query_func):
83 @wraps(query_func)
84 @validate_arguments
85 @dataclass(frozen=True)
86 class _Wrapped(Wrapper):
87 output_table: Optional[str] = query_func.__name__
88 output_columns: Optional[Tuple[str, ...]] = self.output_columns
89 make_persistent: Optional[bool] = self.make_persistent
90
91 def query(s):
92 return query_func()
93
94 implements: str = "TemporaryTable"
95
96 @property
97 def config(s):
98 return dict(
99 output_table=s.output_table,
100 outputs=s.output_columns,
101 query=s.query(),
102 make_persistent=s.make_persistent,
103 )
104
105 return _Wrapped
106
107
108
109
110@validate_arguments
111def custom_query(query: Union[str,List[str]]):
112 @validate_arguments
113 @dataclass(frozen=True)
114 class _Wrapped(Wrapper):
115 replacements: Optional[Dict[str,str]] = field(default_factory=dict)
116 implements: str = "EditEventStore"
117
118 @property
119 def config(s):
120 return dict(
121 queries=(query.format(**s.replacements) if isinstance(query,str)
122 else [q.format(**s.replacements) for q in query])
123 )
124
125 return _Wrapped
126
127
128@validate_arguments
129def pragma(**kwargs):
130 return custom_query([
131 f"PRAGMA {key}={value};"
132 for key, value in kwargs.items()
133 ])
Resource on the Internet, locally cached.