PyLamarr
Pythonizations for the ultra-fast simulation option for the LHCb experiment
 
Loading...
Searching...
No Matches
PandasCollector.py
1import PyLamarr
2
3from dataclasses import dataclass, field
4from typing import Collection, List, Optional, Dict, Union
5import pandas as pd
6import logging
7
8@dataclass
10 tables: Collection[str]
11 dataframes: Dict[str,List[Union[pd.DataFrame, None]]] = field(default_factory=lambda: {})
12 batch_ids: Optional[List[int]] = None
13
14 @PyLamarr.method
15 def __call__(self, db):
16 logger = logging.getLogger("PandasCollector")
17 existing_tables = pd.read_sql_query(f"SELECT name FROM sqlite_master WHERE type == 'table'", db)['name'].values.tolist()
18 for table in self.tables:
19 if table not in self.dataframes.keys():
20 self.dataframes[table] = list()
21
22 if table in existing_tables:
23 df = pd.read_sql_query(f"SELECT * FROM {table}", db)
24 self.dataframes[table].append(df)
25 logger.debug(f"Table {table}, requested for collection, contains {len(df)} rows.")
26 else:
27 self.dataframes[table].append(None)
28 logger.debug(f"Table {table}, requested for collection, NOT FOUND.")
29
30 @property
31 def dataframe(self):
32 ret = {}
33
34 for table, dfs in self.dataframes.items():
35 batch_ids = self.batch_ids if self.batch_ids is not None else list(range(len(dfs)))
36 dataframes = [df.assign(batch_id=bid) for bid, df in zip(batch_ids, dfs) if df is not None and len(df) > 0]
37 # If there is at least one dataframe with entries, it includes the entry and moves forward
38 if len(dataframes):
39 ret[table] = pd.concat(dataframes, ignore_index=True)
40 # If dataframes are all empty, it picks the first one to avoid errors downstream for missing table
41 elif any([len(df) > 0 for df in dfs]):
42 ret[table] = [df for df in dfs if len(df) > 0][0]
43 ret[table]['batch_id'] = []
44
45 return ret
46
47
48
49