Source code for vallenae.io.compression

import io

import numpy as np

try:
    import soundfile as sf
    _FLAC_CODEC = True
except OSError:
    _FLAC_CODEC = False


def _check_flac_codec():
    if not _FLAC_CODEC:
        raise ValueError("FLAC library not found. Please install libsndfile.")


[docs] def decode_data_blob( data_blob: bytes, data_format: int, factor_millivolts: float, *, raw: bool = False ) -> np.ndarray: """ Decodes (compressed) 16-bit ADC values from BLOB to array of voltage values. Args: data_blob: Blob from tradb data_format: - 0: uncompressed - 2: FLAC compression factor_millivolts: Factor from int16 representation to millivolts. Stored in tradb -> tr_params as 'TR_mV' raw: Return data as ADC values (`np.int16`), `factor_millivolts` will be ignored Returns: Array of voltage values or ADC values if `raw` is `True` """ def get_data_int16(): if data_format == 0: # uncompressed return np.frombuffer(data_blob, dtype=np.int16) if data_format == 2: # flac _check_flac_codec() return sf.read(io.BytesIO(data_blob), dtype=np.int16)[0] raise ValueError("Data format not supported") data_int16 = get_data_int16() if raw: return data_int16 return np.multiply(data_int16, 1e-3 * factor_millivolts, dtype=np.float32)
[docs] def encode_data_blob( data: np.ndarray, data_format: int, factor_millivolts: float, *, raw: bool = False ) -> bytes: """ Encodes array of voltage values to BLOB of 16-bit ADC values for memory-efficient storage. Args: data: Array with voltage values data_format: - 0: uncompressed - 2: FLAC compression factor_millivolts: Factor from int16 representation to millivolts. Stored in tradb -> tr_params as 'TR_mV' raw: Provide `data` as ADC values (int16), `factor_millivolts` will be ignored Returns: Data blob """ ii16_min = np.iinfo(np.int16).min ii16_max = np.iinfo(np.int16).max def get_data_int16(): if raw: return data.astype(np.int16, casting="safe", copy=False) # only safe casts! temp = np.empty_like(data, dtype=np.float32) np.multiply(data, 1e3 / factor_millivolts, out=temp) # faster than np.clip(temp, a_min=ii16_min, a_max=ii16_max, out=temp) np.minimum(temp, ii16_max, out=temp) np.maximum(temp, ii16_min, out=temp) # faster than np.rint(temp, out=temp) np.floor(temp + 0.5, out=temp) return temp.astype(np.int16) data_int16 = get_data_int16() if data_format == 0: # uncompressed return data_int16.tobytes() if data_format == 2: # flac _check_flac_codec() buffer = io.BytesIO() sf.write(buffer, data_int16, samplerate=1, format="FLAC") return buffer.getvalue() raise ValueError("Data format not supported")