4from typing
import Collection, List, Optional, Dict, Union
10 tables: Collection[str]
11 dataframes: Dict[str,List[Union[pd.DataFrame,
None]]] = field(default_factory=
lambda: {})
12 batch_ids: Optional[List[int]] =
None
15 def __call__(self, db):
16 logger = logging.getLogger(
"PandasCollector")
17 existing_tables = pd.read_sql_query(f
"SELECT name FROM sqlite_master WHERE type == 'table'", db)[
'name'].values.tolist()
22 if table
in existing_tables:
23 df = pd.read_sql_query(f
"SELECT * FROM {table}", db)
25 logger.debug(f
"Table {table}, requested for collection, contains {len(df)} rows.")
28 logger.debug(f
"Table {table}, requested for collection, NOT FOUND.")
36 dataframes = [df.assign(batch_id=bid)
for bid, df
in zip(batch_ids, dfs)
if df
is not None and len(df) > 0]
39 ret[table] = pd.concat(dataframes, ignore_index=
True)
41 elif any([len(df) > 0
for df
in dfs]):
42 ret[table] = [df
for df
in dfs
if len(df) > 0][0]
43 ret[table][
'batch_id'] = []