import sqlite3
from pathlib import Path
from time import sleep
from typing import Iterable, Optional, Sequence, Union
import pandas as pd
from ._database import Database, require_write_access
from ._dataframe import iter_to_dataframe
from ._sql import (
QueryIterable,
create_new_database,
insert_from_dict,
query_conditions,
read_sql_generator,
update_from_dict,
)
from .datatypes import FeatureRecord
from .types import SizedIterable
[docs]
class TrfDatabase(Database):
"""IO Wrapper for trfdb (transient feature) database file."""
[docs]
def __init__(self, filename: str, mode: str = "ro"):
"""
Open trfdb database file.
Args:
filename: Path to trfdb database file
mode: Define database access:
**"ro"** (read-only),
**"rw"** (read-write),
**"rwc"** (read-write and create empty database if it does not exist)
"""
super().__init__(
filename, mode=mode, table_prefix="trf", required_file_ext=".trfdb",
)
[docs]
@staticmethod
def create(filename: str):
"""
Create empty trfdb.
Args:
filename: Path to new trfdb database file
"""
schema_path = Path(__file__).parent / "schema_templates/trfdb.sql"
schema = schema_path.read_text("utf-8")
create_new_database(filename, schema)
[docs]
def read(self, **kwargs) -> pd.DataFrame:
"""
Read features to Pandas DataFrame.
Args:
**kwargs: Arguments passed to `iread`
Returns:
Pandas DataFrame with features
"""
def record_to_dict(record: FeatureRecord):
return {"trai": record.trai, **record.features}
return iter_to_dataframe(
[record_to_dict(r) for r in self.iread(**kwargs)], desc="Trf", index_column="trai",
)
[docs]
def iread(
self,
*,
trai: Union[None, int, Sequence[int]] = None,
query_filter: Optional[str] = None,
) -> SizedIterable[FeatureRecord]:
"""
Stream features with returned iterable.
Args:
trai: Read data by TRAI (transient recorder index)
query_filter: Optional query filter provided as SQL clause,
e.g. "FFT_CoG >= 150 AND CTP < 20"
Returns:
Sized iterable to sequential read features
"""
query = """
SELECT * FROM (
SELECT * FROM trf_data
ORDER BY TRAI ASC
)
""" + query_conditions(isin={"TRAI": trai}, custom_filter=query_filter)
return QueryIterable(
self._connection_wrapper.get_readonly_connection(),
query,
FeatureRecord.from_sql,
)
[docs]
def listen(
self,
existing: bool = False,
wait: bool = False,
query_filter: Optional[str] = None,
) -> Iterable[FeatureRecord]:
"""
Listen to database changes and return new records.
Args:
existing: Return already existing records
wait: Wait for new records even if no acquisition (writer) is active.
Otherwise the function returns after all records are read.
query_filter: Optional query filter provided as SQL clause,
e.g. "TRAI >= 100"
Yields:
New feature records
"""
max_buffer_size = 1000
query = f"""
SELECT * FROM (
SELECT rowid, * FROM trf_data
WHERE rowid > ?
) {query_conditions(custom_filter=query_filter)} LIMIT {max_buffer_size}
"""
last_rowid = 0 if existing else self._main_index_range()[1]
while True:
# buffer rows to allow in-between write transactions
rows = list(read_sql_generator(self.connection(), query, last_rowid))
for row in rows:
last_rowid = row.pop("rowid")
yield FeatureRecord.from_sql(row)
if len(rows) == 0:
if not wait and self._file_status() == 0: # no writer active
break
sleep(0.1) # wait 100 ms until next read
[docs]
@require_write_access
def write(self, feature_set: FeatureRecord) -> int:
"""
Write feature record to trfdb.
Args:
feature_set: Feature set
Returns:
Index (trai) of inserted row
"""
def convert(value):
try:
return float(value)
except (ValueError, TypeError):
return None
with self.connection() as con: # commit/rollback transaction
row_dict = {
key: convert(value)
for key, value in feature_set.features.items()
}
row_dict["TRAI"] = feature_set.trai
try:
try:
return insert_from_dict(con, self._table_main, row_dict)
except sqlite3.IntegrityError: # UNIQUE constraint, TRAI already exists
# update instead
return update_from_dict(con, self._table_main, row_dict, "TRAI")
except sqlite3.OperationalError: # missing column(s)
self._add_columns(self._table_main, list(row_dict.keys()), "REAL")
return self.write(feature_set) # try again