Vallen AE Python Tools
Extract and analyze Acoustic Emission measurement data.
The IO module vallenae.io
enables reading (and writing) of Vallen Systeme SQLite database files:
*.pridb: Primary database
*.tradb: Transient data
*.trfdb: Transient features
The remaining modules are system-independent and try to comprise the most common state-of-the-art algorithms in Acoustic Emission:
vallenae.features
: Extraction of Acoustic Emission featuresvallenae.timepicker
: Timepicking algorithms for arrival time estimations
IO
Read/write Vallen Systeme database and setup files.
Database classes
Classes to read/write pridb, tradb and trfdb database files.
Warning
Writing is still experimental
|
IO Wrapper for pridb database file. |
|
IO Wrapper for tradb database file. |
|
IO Wrapper for trfdb (transient feature) database file. |
All database classes implement two different interfaces to access data:
Standard read_*
Read data to
pandas.DataFrame
, e.g. withPriDatabase.read_hits
>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb") >>> df = pridb.read_hits() # save all hits to pandas dataframe >>> df[["time", "channel"]] # output columns hit and channel time channel set_id 10 3.992771 3 11 3.992775 2 12 3.992813 4 13 3.992814 1
Streaming iread_*
Iterate through the data row by row. This is a memory-efficient solution ideal for batch processing. The return types are specific
typing.NamedTuple
, see Data types.Example with
PriDatabase.iread_hits
:>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb") >>> for hit in pridb.iread_hits(): ... print(f"time: {hit.time:0.4f}, channel: {hit.channel}") ... time: 3.9928, channel: 3 time: 3.9928, channel: 2 time: 3.9928, channel: 4 time: 3.9928, channel: 1 >>> type(hit) <class 'vallenae.io.datatypes.HitRecord'>
Data types
Records of the database are represented as typing.NamedTuple
.
Each record implements a class method from_sql
to init from a SQLite
row dictionary (column name: value).
|
Hit record in pridb (SetType = 2). |
|
Marker record in pridb (SetType = 4, 5, 6). |
|
Status data record in pridb (SetType = 3). |
|
Parametric data record in pridb (SetType = 1). |
|
Transient data record in tradb. |
|
Transient feature record in trfdb. |
Compression
Transient signals in the tradb are stored as BLOBs of 16-bit ADC values – either uncompressed or compressed (FLAC). Following functions convert between BLOBs and arrays of voltage values.
|
Decodes (compressed) 16-bit ADC values from BLOB to array of voltage values. |
|
Encodes array of voltage values to BLOB of 16-bit ADC values for memory-efficient storage. |
Features
Acoustic Emission
|
Compute maximum absolute amplitude. |
|
Compute index of peak amplitude. |
|
Checks if absolute amplitudes are above threshold. |
|
Compute index of first threshold crossing. |
|
Compute the rise time. |
|
Compute the energy of a hit. |
|
Compute the signal strength of a hit. |
|
Compute the number of positive threshold crossings of a hit (counts). |
|
Compute the root mean square (RMS) of an array. |
Conversion
|
Convert amplitude from volts to decibel (dB). |
|
Convert amplitude from decibel (dB) to volts. |
Timerpicker
The determination of signal arrival times has a major influence on the localization accuracy. Usually, arrival times are determined by the first threshold crossing (either fixed or adaptive). Following popular methods have been proposed in the past to automatically pick time of arrivals:
|
Hinkley criterion for arrival time estimation. |
|
Akaike Information Criterion (AIC) for arrival time estimation. |
|
Energy ratio for arrival time estimation. |
|
Modified energy ratio method for arrival time estimation. |
Examples
A collection of examples how to read and analyse Acoustic Emission data.
Note
Go to the end to download the full example code
Export to WAV (incremental)
Generate WAV files from tradb. We use the vallenae.io.TraDatabase.read_continuous_wave
method to
read the transient data as a continuous array.
This example reads and writes the data incrementally to handle big file sizes that don’t fit into memory at once.
Export channel 1 to /tmp/tmpixw1v5gd/bearing_plain_ch1.wav
Generated WAV files:
/tmp/tmpixw1v5gd/bearing_plain_ch1.wav
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional
import vallenae as vae
from soundfile import SoundFile
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
def export_wav_incremental(
filename_wav: Path,
tradb: vae.io.TraDatabase,
channel: int,
time_start: Optional[float] = None,
time_stop: Optional[float] = None,
time_block: int = 1,
):
"""
Export data from tradb to a WAV file.
Args:
filename_wav: Path to the generated WAV file
tradb: `TraDatabase` instance
channel: Channel number
time_start: Start reading at relative time (in seconds). Start at beginning if `None`
time_start: Stop reading at relative time (in seconds). Read until end if `None`
time_block: Block size in seconds
"""
con = tradb.connection()
if time_start is None:
time_start = 0
if time_stop is None:
time_stop = con.execute("SELECT MAX(Time) FROM view_tr_data").fetchone()[0]
samplerates = con.execute("SELECT DISTINCT(SampleRate) FROM tr_data").fetchone()
assert len(samplerates) == 1
samplerate = samplerates[0]
blocks = int((time_stop - time_start) // time_block + 1)
with SoundFile(filename_wav, "w", samplerate=samplerate, channels=1, subtype="PCM_16") as f:
for block in range(blocks):
time_block_start = time_start + block * time_block
time_block_stop = min(time_start + (block + 1) * time_block, time_stop)
y, _ = tradb.read_continuous_wave(
channel=channel,
time_start=time_block_start,
time_stop=time_block_stop,
time_axis=False,
show_progress=False,
raw=True, # read as ADC values (int16)
)
f.write(y)
f.flush()
def main():
filename_tradb = HERE / "bearing" / "bearing_plain.tradb"
# use a temporary file for this example
with TemporaryDirectory() as tmpdir:
with vae.io.TraDatabase(filename_tradb) as tradb:
for channel in tradb.channel():
filename_wav = Path(tmpdir) / f"{filename_tradb.stem}_ch{channel}.wav"
print(f"Export channel {channel} to {filename_wav}")
export_wav_incremental(
filename_wav=filename_wav,
tradb=tradb,
channel=channel,
time_start=0, # read from t = 0 s
time_stop=None, # read until end if None
time_block=1, # read/write in block sizes of 1 s
)
# list all generated wav files
print("Generated WAV files:")
for file in Path(tmpdir).iterdir():
print(file)
if __name__ == "__main__":
main()
Total running time of the script: (0 minutes 0.064 seconds)
Note
Go to the end to download the full example code
Read pridb
from pathlib import Path
import matplotlib.pyplot as plt
import vallenae as vae
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
PRIDB = HERE / "steel_plate" / "sample.pridb"
Open pridb
pridb = vae.io.PriDatabase(PRIDB)
print("Tables in database: ", pridb.tables())
print("Number of rows in data table (ae_data): ", pridb.rows())
print("Set of all channels: ", pridb.channel())
Tables in database: {'ae_data', 'acq_setup', 'ae_globalinfo', 'ae_params', 'ae_fieldinfo', 'data_integrity', 'ae_markers'}
Number of rows in data table (ae_data): 18
Set of all channels: {1, 2, 3, 4}
Read hits to Pandas DataFrame
df_hits = pridb.read_hits()
# Print a few columns
print(df_hits[["time", "channel", "amplitude", "counts", "energy"]])
Hits: 0%| | 0/4 [00:00<?, ?it/s]
Hits: 100%|██████████| 4/4 [00:00<00:00, 7509.94it/s]
time channel amplitude counts energy
set_id
10 3.992771 3 0.046539 2180 2.799510e+07
11 3.992775 2 0.059621 2047 2.276279e+07
12 3.992813 4 0.034119 1854 1.286700e+07
13 3.992814 1 0.029115 1985 1.265275e+07
Query Pandas DataFrame
DataFrames offer powerful features to query and aggregate data, e.g. plot summed energy per channel
ax = df_hits.groupby("channel").sum()["energy"].plot.bar(figsize=(8, 3))
ax.set_xlabel("Channel")
ax.set_ylabel("Summed Energy [eu = 1e-14 V²s]")
plt.tight_layout()
plt.show()

Read markers
df_markers = pridb.read_markers()
print(df_markers)
Marker: 0%| | 0/5 [00:00<?, ?it/s]
Marker: 100%|██████████| 5/5 [00:00<00:00, 14324.81it/s]
time set_type data number
set_id
1 0.00 6 1
2 0.00 4 10:52 Resume 1
3 0.00 5 2019-09-20 10:54:52 <NA>
4 0.00 4 TimeZone: +02:00 (W. Europe Standard Time) 2
18 100.07 4 10:56 Suspend 3
Read parametric data
df_parametric = pridb.read_parametric()
print(df_parametric)
Parametric: 0%| | 0/9 [00:00<?, ?it/s]
Parametric: 100%|██████████| 9/9 [00:00<00:00, 13657.29it/s]
time param_id pctd pcta
set_id
5 0.00 1 0 0
6 1.00 1 0 0
7 2.00 1 0 0
8 3.00 1 0 0
9 3.99 1 0 0
14 4.00 1 0 0
15 5.00 1 0 0
16 6.00 1 0 0
17 6.45 1 0 0
Total running time of the script: (0 minutes 0.219 seconds)
Note
Go to the end to download the full example code
Read and plot transient data

from pathlib import Path
import matplotlib.pyplot as plt
import vallenae as vae
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "steel_plate" / "sample_plain.tradb" # uncompressed
TRAI = 4 # just an example, no magic here
def main():
# Read waveform from tradb
with vae.io.TraDatabase(TRADB) as tradb:
y, t = tradb.read_wave(TRAI)
y *= 1e3 # in mV
t *= 1e6 # for µs
# Plot waveforms
plt.figure(figsize=(8, 4), tight_layout=True)
plt.plot(t, y)
plt.xlabel("Time [µs]")
plt.ylabel("Amplitude [mV]")
plt.title(f"Transient Wave Plot; trai = {TRAI}")
plt.show()
if __name__ == "__main__":
main()
Total running time of the script: (0 minutes 0.266 seconds)
Note
Go to the end to download the full example code
Timepicker
Following example showcases the results of different timepicking methods.
For more informations, please refer to the functions documentation (vallenae.timepicker
).
import time
from pathlib import Path
import matplotlib.pyplot as plt
import vallenae as vae
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "steel_plate" / "sample_plain.tradb"
TRAI = 4
SAMPLES = 2000
Read waveform from tradb
tradb = vae.io.TraDatabase(TRADB)
y, t = tradb.read_wave(TRAI)
# crop first samples
t = t[:SAMPLES]
y = y[:SAMPLES]
# unit conversion
t *= 1e6 # convert to µs
y *= 1e3 # convert to mV
Prepare plotting with time-picker results
def plot(t_wave, y_wave, y_picker, index_picker, name_picker):
_, ax1 = plt.subplots(figsize=(8, 4), tight_layout=True)
ax1.set_xlabel("Time [µs]")
ax1.set_ylabel("Amplitude [mV]", color="g")
ax1.plot(t_wave, y_wave, color="g")
ax1.tick_params(axis="y", labelcolor="g")
ax2 = ax1.twinx()
ax2.set_ylabel(f"{name_picker}", color="r")
ax2.plot(t_wave, y_picker, color="r")
ax2.tick_params(axis="y", labelcolor="r")
plt.axvline(t_wave[index_picker], color="k", linestyle=":")
plt.show()
Hinkley Criterion
hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=5)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")

The negative trend correlates to the chosen alpha value and can influence the results strongly. Results with alpha = 50 (less negative trend):
hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=50)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")

Akaike Information Criterion (AIC)
aic_arr, aic_index = vae.timepicker.aic(y)
plot(t, y, aic_arr, aic_index, "Akaike Information Criterion")

Energy Ratio
er_arr, er_index = vae.timepicker.energy_ratio(y)
plot(t, y, er_arr, er_index, "Energy Ratio")

Modified Energy Ratio
mer_arr, mer_index = vae.timepicker.modified_energy_ratio(y)
plot(t, y, mer_arr, mer_index, "Modified Energy Ratio")

Performance comparison
All timepicker implementations are using Numba for just-in-time (JIT) compilations. Usually the first function call is slow, because it will trigger the JIT compiler. To compare the performance to a native or numpy implementation, the average of multiple executions should be compared.
def timeit(func, loops=100):
time_start = time.perf_counter()
for _ in range(loops):
func()
return 1e6 * (time.perf_counter() - time_start) / loops # elapsed time in µs
timer_results = {
"Hinkley": timeit(lambda: vae.timepicker.hinkley(y, 5)),
"AIC": timeit(lambda: vae.timepicker.aic(y)),
"Energy Ratio": timeit(lambda: vae.timepicker.energy_ratio(y)),
"Modified Energy Ratio": timeit(lambda: vae.timepicker.modified_energy_ratio(y)),
}
for name, execution_time in timer_results.items():
print(f"{name}: {execution_time:0.3f} µs")
plt.figure(figsize=(8, 3), tight_layout=True)
plt.bar(timer_results.keys(), timer_results.values()) # noqa: SIM911
plt.ylabel("Time [µs]")
plt.show()

Hinkley: 10.544 µs
AIC: 65.591 µs
Energy Ratio: 12.616 µs
Modified Energy Ratio: 20.098 µs
Total running time of the script: (0 minutes 3.200 seconds)
Note
Go to the end to download the full example code
Timepicker batch processing
Following examples shows how to stream transient data row by row, compute timepicker results and save the results to a feature database (trfdb).
from pathlib import Path
from shutil import copyfile
from tempfile import gettempdir
import matplotlib.pyplot as plt
import pandas as pd
import vallenae as vae
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "steel_plate" / "sample_plain.tradb"
TRFDB = HERE / "steel_plate" / "sample.trfdb"
TRFDB_TMP = Path(gettempdir()) / "sample.trfdb"
Open tradb (readonly) and trfdb (readwrite)
copyfile(TRFDB, TRFDB_TMP) # copy trfdb, so we don't overwrite it
tradb = vae.io.TraDatabase(TRADB)
trfdb = vae.io.TrfDatabase(TRFDB_TMP, mode="rw") # allow writing
Read current trfdb
print(trfdb.read())
Trf: 0%| | 0/4 [00:00<?, ?it/s]
Trf: 100%|██████████| 4/4 [00:00<00:00, 26462.49it/s]
FFT_CoG FFT_FoM PA ... CTP FI FR
trai ...
1 147.705078 134.277344 46.483864 ... 11 222.672058 110.182449
2 144.042969 139.160156 59.450512 ... 35 182.291672 98.019981
3 155.029297 164.794922 33.995209 ... 55 155.191879 95.493233
4 159.912109 139.160156 29.114828 ... 29 181.023727 101.906227
[4 rows x 8 columns]
Compute arrival time offsets with different timepickers
To improve localisation, time of arrival estimates using the first threshold crossing can be refined with timepickers. Therefore, arrival time offsets between the first threshold crossings and the timepicker results are computed.
def dt_from_timepicker(timepicker_func, tra: vae.io.TraRecord):
# Index of the first threshold crossing is equal to the pretrigger samples
index_ref = tra.pretrigger
# Only analyse signal until peak amplitude
index_peak = vae.features.peak_amplitude_index(tra.data)
data = tra.data[:index_peak]
# Get timepicker result
_, index_timepicker = timepicker_func(data)
# Compute offset in µs
return (index_timepicker - index_ref) * 1e6 / tra.samplerate
Transient data is streamed from the database row by row using vallenae.io.TraDatabase.iread
.
Only one transient data set is loaded into memory at a time.
That makes the streaming interface ideal for batch processing.
The timepicker results are saved to the trfdb using vallenae.io.TrfDatabase.write
.
for tra in tradb.iread():
# Calculate arrival time offsets with different timepickers
feature_set = vae.io.FeatureRecord(
trai=tra.trai,
features={
"ATO_Hinkley": dt_from_timepicker(vae.timepicker.hinkley, tra),
"ATO_AIC": dt_from_timepicker(vae.timepicker.aic, tra),
"ATO_ER": dt_from_timepicker(vae.timepicker.energy_ratio, tra),
"ATO_MER": dt_from_timepicker(vae.timepicker.modified_energy_ratio, tra),
}
)
# Save results to trfdb
trfdb.write(feature_set)
Read results from trfdb
print(trfdb.read().filter(regex="ATO"))
Trf: 0%| | 0/4 [00:00<?, ?it/s]
Trf: 100%|██████████| 4/4 [00:00<00:00, 85598.04it/s]
ATO_Hinkley ATO_AIC ATO_ER ATO_MER
trai
1 30.4 -1.8 -4.0 29.2
2 36.4 -0.8 -1.0 -0.4
3 67.0 -1.8 -3.2 60.4
4 65.8 -1.0 -2.2 64.2
Plot results
ax = trfdb.read()[["ATO_Hinkley", "ATO_AIC", "ATO_ER", "ATO_MER"]].plot.barh()
ax.invert_yaxis()
ax.set_xlabel("Arrival time offset [µs]")
plt.show()

Trf: 0%| | 0/4 [00:00<?, ?it/s]
Trf: 100%|██████████| 4/4 [00:00<00:00, 94254.02it/s]
Plot waveforms and arrival times
_, axes = plt.subplots(4, 1, tight_layout=True, figsize=(8, 8))
for row, ax in zip(trfdb.read().itertuples(), axes):
trai = row.Index
# read waveform from tradb
y, t = tradb.read_wave(trai)
# plot waveform
ax.plot(t[400:1000] * 1e6, y[400:1000] * 1e3, "k") # crop and convert to µs/mV
ax.set_title(f"trai = {trai}")
ax.set_xlabel("Time [µs]")
ax.set_ylabel("Amplitude [mV]")
ax.label_outer()
# plot arrival time offsets
ax.axvline(row.ATO_Hinkley, color="C0")
ax.axvline(row.ATO_AIC, color="C1")
ax.axvline(row.ATO_ER, color="C2")
ax.axvline(row.ATO_MER, color="C3")
axes[0].legend(["Waveform", "Hinkley", "AIC", "ER", "MER"])
plt.show()

Trf: 0%| | 0/4 [00:00<?, ?it/s]
Trf: 100%|██████████| 4/4 [00:00<00:00, 68478.43it/s]
Use results in VisualAE
The computed arrival time offsets can be directly used in VisualAE.
We only need to specify the unit. VisualAE requires them to be in µs.
Units and other column-related meta data is saved in the trf_fieldinfo table.
Field infos can be retrieved with vallenae.io.TrfDatabase.fieldinfo
:
print(trfdb.fieldinfo())
{'FFT_CoG': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(C.o.Gravity)', 'Description': 'Center of gravity of spectrum', 'ShortName': None, 'FormatStr': None}, 'FFT_FoM': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(max. Amp.)', 'Description': 'Frequency of maximum of spectrum', 'ShortName': None, 'FormatStr': None}, 'PA': {'SetTypes': 8, 'Unit': '[mV]', 'LongName': 'Peak Amplitude', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'RT': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Rise Time', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'Dur': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Duration (available)', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'CTP': {'SetTypes': 8, 'Unit': None, 'LongName': 'Cnts to peak', 'Description': None, 'ShortName': None, 'FormatStr': '#'}, 'FI': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Initiation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'FR': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Reverberation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}}
Show results as table:
print(pd.DataFrame(trfdb.fieldinfo()))
FFT_CoG ... FR
SetTypes 2 ... 8
Unit [kHz] ... [kHz]
LongName F(C.o.Gravity) ... Reverberation Freq.
Description Center of gravity of spectrum ... None
ShortName None ... None
FormatStr None ... None
[6 rows x 8 columns]
Write units to trfdb
Field infos can be written with vallenae.io.TrfDatabase.write_fieldinfo
:
trfdb.write_fieldinfo("ATO_Hinkley", {"Unit": "[µs]", "LongName": "Arrival Time Offset (Hinkley)"})
trfdb.write_fieldinfo("ATO_AIC", {"Unit": "[µs]", "LongName": "Arrival Time Offset (AIC)"})
trfdb.write_fieldinfo("ATO_ER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (ER)"})
trfdb.write_fieldinfo("ATO_MER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (MER)"})
print(pd.DataFrame(trfdb.fieldinfo()).filter(regex="ATO"))
ATO_Hinkley ... ATO_MER
SetTypes None ... None
Unit [µs] ... [µs]
LongName Arrival Time Offset (Hinkley) ... Arrival Time Offset (MER)
Description None ... None
ShortName None ... None
FormatStr None ... None
[6 rows x 4 columns]
Load results in VisualAE
Time arrival offsets can be specified in the settings of Location Processors - Channel Positions - Arrival Time Offset. (Make sure to rename the generated trfdb to match the filename of the pridb.)

Total running time of the script: (0 minutes 0.832 seconds)
Note
Go to the end to download the full example code
Localisation

Hits: 0%| | 0/4 [00:00<?, ?it/s]
Hits: 100%|██████████| 4/4 [00:00<00:00, 9664.29it/s]
Runtime for 1 call to differential_evolution(): 0.6849 s
message: Optimization terminated successfully.
success: True
fun: 0.001115888406323039
x: [ 2.217e-01 3.657e-01]
nit: 102
nfev: 8249
population: [[ 2.217e-01 3.657e-01]
[ 2.218e-01 3.656e-01]
...
[ 2.215e-01 3.657e-01]
[ 2.216e-01 3.656e-01]]
population_energies: [ 1.116e-03 1.125e-03 ... 1.125e-03 1.124e-03]
jac: [ 6.477e-04 1.395e-03]
import math
import time
from pathlib import Path
from typing import Dict, Optional, Tuple
from xml.etree import ElementTree
import matplotlib.pyplot as plt
import numpy as np
import vallenae as vae
from numba import f8, njit
from numpy.linalg import norm
from scipy.optimize import differential_evolution
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
PRIDB = HERE / "steel_plate" / "sample.pridb"
SETUP = HERE / "steel_plate" / "sample.vaex"
NUMBER_SENSORS = 4
@njit(f8(f8[:], f8, f8[:, :], f8[:]))
def lucy_error_fun(
test_pos: np.ndarray,
speed: float,
sens_poss: np.ndarray,
measured_delta_ts: np.ndarray,
) -> float:
"""
Implementation of the LUCY computation in 2D as documented in
the Vallen online help.
Args:
test_pos: Emitter position to test.
speed: Assumed speed of sound in a plate-like structure.
sens_poss: Sensor positions, often a 4x2 array, has to match
the sorting of the delta-ts.
measured_delta_ts: The measured time differences in seconds, has to
match the order of the sensor positions.
Returns:
The LUCY value as a float. Ideally 0, in practice never 0, always positive.
"""
m = len(measured_delta_ts)
n = m + 1
measured_delta_dists = speed * measured_delta_ts
theo_dists = np.zeros(n)
theo_delta_dists = np.zeros(m)
for i in range(n):
theo_dists[i] = norm(test_pos - sens_poss[i, :])
for i in range(m):
theo_delta_dists[i] = theo_dists[i + 1] - theo_dists[0]
# LUCY definition taken from the vallen online help:
return norm(theo_delta_dists - measured_delta_dists) / math.sqrt(n - 1)
def get_channel_positions(setup_file: str) -> Dict[int, Tuple[float, float]]:
tree = ElementTree.parse(setup_file)
nodes = tree.getroot().findall(".//ChannelPos")
if nodes is None:
raise RuntimeError("Can not retrieve channel positions from %s", setup_file)
return {
int(elem.get("Chan")): (float(elem.get("X")), float(elem.get("Y"))) # type: ignore
for elem in nodes if elem is not None
}
def get_velocity(setup_file: str) -> Optional[float]:
tree = ElementTree.parse(setup_file)
node = tree.getroot().find(".//Location")
if node is not None:
velocity_str = node.get("Velocity")
if velocity_str is not None:
return float(velocity_str) * 1e3 # convert to m/s
raise RuntimeError("Can not retrieve velocity from %s", setup_file)
def main():
# Consts plotting
text_delta_y = 0.03
text_delta_x = -0.12
# Consts LUCY grid
grid_delta = 0.01
location_search_bounds = [(0.0, 0.80), (0.0, 0.80)]
# Read from pridb
pridb = vae.io.PriDatabase(PRIDB)
hits = pridb.read_hits()
pridb.close()
channel_order = hits["channel"].to_numpy()
arrival_times = hits["time"].to_numpy()
delta_ts = (arrival_times - arrival_times[0])[1:]
# Get localisation parameters from .vaex file
velocity = get_velocity(SETUP)
pos_dict = get_channel_positions(SETUP)
# Order sensor positions by hit occurence
pos_ordered = np.array([pos_dict[ch] for ch in channel_order])
# Compute heatmap
def lucy_instance_2args(x, y):
return lucy_error_fun(np.array([x, y]), velocity, pos_ordered, delta_ts)
x_range = np.arange(location_search_bounds[0][0], location_search_bounds[0][1], grid_delta)
y_range = x_range
x_grid, y_grid = np.meshgrid(x_range, y_range)
z_grid = np.vectorize(lucy_instance_2args)(x_grid, y_grid)
# Plot heatmap
plt.figure(tight_layout=True)
plt.pcolormesh(x_grid, y_grid, z_grid, cmap="cool")
plt.colorbar()
plt.title("Location Result and LUCY-Heatmap")
plt.xlabel("x [m]")
plt.ylabel("y [m]")
# Compute location
def lucy_instance_single_arg(pos):
return lucy_error_fun(pos, velocity, pos_ordered, delta_ts)
start = time.perf_counter()
# These are excessive search / overkill parameters:
location_result = differential_evolution(
lucy_instance_single_arg,
location_search_bounds,
popsize=40,
polish=True,
strategy="rand1bin",
recombination=0.1,
mutation=1.3,
)
end = time.perf_counter()
print(f"Runtime for 1 call to differential_evolution(): {(end - start):0.4} s")
print(location_result)
# Plot location result
x_res = location_result.x[0]
y_res = location_result.x[1]
plt.plot([x_res], [y_res], "bo")
plt.text(
x_res + text_delta_x,
y_res + text_delta_y,
"location result",
fontsize=9,
color="b",
)
# Plot sensor positions
for channel, (x, y) in pos_dict.items():
text = f"S{channel} (x={x:0.2f}m | y={y:0.2f}m)"
plt.scatter(x, y, marker="o", color="w")
plt.text(x + text_delta_x, y + text_delta_y, text, fontsize=9, color="w")
plt.show()
if __name__ == "__main__":
main()
Total running time of the script: (0 minutes 1.861 seconds)
Note
Go to the end to download the full example code
Go fast with multiprocessing
The streaming interfaces with iterables allow efficient batch processing as shown here. But still only one core/thread will be utilized. We will change that will multiprocessing.
Following example shows a batch feature extraction procedure using multiple CPU cores.
import multiprocessing
import os
import time
from itertools import cycle
from pathlib import Path
from typing import Iterable
import matplotlib.pyplot as plt
import vallenae as vae
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "steel_plate" / "sample_plain.tradb"
Prepare streaming reads
Our sample tradb only contains four data sets. That is not enough data for demonstrating batch processing. Therefore, we will simulate more data by looping over the data sets with following generator/iterable:
def tra_generator(loops: int = 1000) -> Iterable[vae.io.TraRecord]:
with vae.io.TraDatabase(TRADB) as tradb:
for loop, tra in enumerate(cycle(tradb.iread())):
if loop > loops:
break
yield tra
Define feature extraction function
A simple function from the module _feature_extraction
is applied to all data sets and returns computed features.
The function is defined in another module to work with multiprocessing.Pool
: https://bugs.python.org/issue25053
from _feature_extraction import feature_extraction # noqa
Compute with single thread/core
Note
The examples are executed on the CI / readthedocs server with limited resources. Therefore, the shown computation times and speedups are below the capability of modern machines.
Run computation in a single thread and get the time:
def time_elapsed_ms(t0):
return 1000.0 * (time.perf_counter() - t0)
if __name__ == "__main__": # guard needed for multiprocessing on Windows
time_start = time.perf_counter()
for tra in tra_generator():
results = feature_extraction(tra)
# do something with the results
time_single_thread = time_elapsed_ms(time_start)
print(f"Time single thread: {time_single_thread:.2f} ms")
Time single thread: 1215.43 ms
Compute with multiple processes/cores
First get number of available cores in your machine:
print(f"Available CPU cores: {os.cpu_count()}")
Available CPU cores: 2
But how can we utilize those cores? The common answer for most programming languages is multithreading. Threads run in the same process and heap, so data can be shared between them (with care). Sadly, Python uses a global interpreter lock (GIL) that locks heap memory, because Python objects are not thread-safe. Therefore, threads are blocking each other and no speedups are gained by using multiple threads.
The solution for Python is multiprocessing to work around the GIL. Every process has its own heap and GIL. Multiprocessing will introduce overhead for interprocess communication and data serialization/deserialization. To reduce the overhead, data is sent in bigger chunks.
Run computation on 4 cores with chunks of 128 data sets and get the time / speedup:
if __name__ == "__main__": # guard needed for multiprocessing on Windows
with multiprocessing.Pool(4) as pool:
time_start = time.perf_counter()
for _results in pool.imap(feature_extraction, tra_generator(), chunksize=128):
pass # do something with the results
time_multiprocessing = time_elapsed_ms(time_start)
print(f"Time multiprocessing: {time_multiprocessing:.2f} ms")
print(f"Speedup: {(time_single_thread / time_multiprocessing):.2f}")
Time multiprocessing: 1190.77 ms
Speedup: 1.02
Variation of the chunksize
Following results show how the chunksize impacts the overall performance. The speedup is measured for different chunksizes and plotted against the chunksize:
if __name__ == "__main__": # guard needed for multiprocessing on Windows
chunksizes = (10, 40, 60, 80, 100, 120, 140, 160, 200)
speedup_chunksizes = []
with multiprocessing.Pool(4) as pool:
for chunksize in chunksizes:
time_start = time.perf_counter()
for _results in pool.imap(feature_extraction, tra_generator(), chunksize=chunksize):
pass # do something with the results
speedup_chunksizes.append(time_single_thread / time_elapsed_ms(time_start))
plt.figure(tight_layout=True, figsize=(6, 3))
plt.plot(chunksizes, speedup_chunksizes)
plt.xlabel("Chunksize")
plt.ylabel("Speedup")
plt.show()

Total running time of the script: (0 minutes 13.960 seconds)
Note
Go to the end to download the full example code
Custom feature extraction
Following examples shows how to compute custom features and save them in the transient feature database (trfdb) to visualize them in VisualAE.
The feature extraction can be live during acquisition. VisualAE will be notified, that a writer to the trfdb is active and waits for the features to be computed. Therefore, the computed features can be visualized in real time.
from pathlib import Path
from tempfile import gettempdir
import matplotlib.pyplot as plt
import numpy as np
import vallenae as vae
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
PRIDB = HERE / "bearing" / "bearing.pridb"
TRADB = HERE / "bearing" / "bearing_plain.tradb"
# TRFDB = HERE / "bearing" / "bearing.trfdb"
TRFDB_TMP = Path(gettempdir()) / "bearing_custom.trfdb" # use a temp file for demo
Custom feature extraction algorithms
def rms(data: np.ndarray) -> float:
"""Root mean square (RMS)."""
return np.sqrt(np.mean(data ** 2))
def crest_factor(data: np.ndarray) -> float:
"""Crest factor (ratio of peak amplitude and RMS)."""
return np.max(np.abs(data)) / rms(data)
def spectral_peak_frequency(spectrum_: np.ndarray, samplerate: int) -> float:
"""
Peak frequency in a spectrum.
Args:
spectrum: FFT amplitudes
samplerate: Sample rate of the spectrum in Hz
Returns:
Peak frequency in Hz
"""
def bin_to_hz(samplerate: int, samples: int, index: int):
return 0.5 * samplerate * index / (samples - 1)
peak_index = np.argmax(spectrum_)
return bin_to_hz(samplerate, len(spectrum_), peak_index)
Open tradb and trfdb
tradb = vae.io.TraDatabase(TRADB)
trfdb = vae.io.TrfDatabase(TRFDB_TMP, mode="rwc")
Helper function to notify VisualAE, that the transient feature database is active/closed
def set_file_status(trfdb_: vae.io.TrfDatabase, status: int):
"""Notify VisualAE that trfdb is active/closed."""
trfdb_.connection().execute(
f"UPDATE trf_globalinfo SET Value = {status} WHERE Key == 'FileStatus'"
)
Read tra records, compute features and save to trfdb
The vallenae.io.TraDatabase.listen
method will read the tradb row by row and can be used during
acquisition. Only if the acquisition is closed and no new records are available, the function
returns.
set_file_status(trfdb, 2) # 2 = active
for tra in tradb.listen(existing=True, wait=False):
spectrum = np.fft.rfft(tra.data)
features = vae.io.FeatureRecord(
trai=tra.trai,
features={
"RMS": rms(tra.data),
"CrestFactor": crest_factor(tra.data),
"SpectralPeakFreq": spectral_peak_frequency(spectrum, tra.samplerate),
}
)
trfdb.write(features)
set_file_status(trfdb, 0) # 0 = closed
Write field infos to trfdb
Field infos can be written with vallenae.io.TrfDatabase.write_fieldinfo
:
trfdb.write_fieldinfo("RMS", {"Unit": "[V]", "LongName": "Root mean square"})
trfdb.write_fieldinfo("CrestFactor", {"Unit": "[]", "LongName": "Crest factor"})
trfdb.write_fieldinfo("SpectralPeakFreq", {"Unit": "[Hz]", "LongName": "Spectral peak frequency"})
Read results from trfdb
df_trfdb = trfdb.read()
print(df_trfdb)
Trf: 0%| | 0/490 [00:00<?, ?it/s]
Trf: 100%|██████████| 490/490 [00:00<00:00, 1892457.61it/s]
RMS CrestFactor SpectralPeakFreq
trai
1 0.000129 3.684550 63800.0
2 0.000134 3.843248 65500.0
3 0.000108 3.906766 58500.0
4 0.000115 3.876217 58000.0
5 0.000106 3.548410 72000.0
... ... ... ...
486 0.000006 3.624268 162900.0
487 0.000006 3.486689 250000.0
488 0.000006 3.530853 164100.0
489 0.000006 4.059815 174000.0
490 0.000006 3.545406 164700.0
[490 rows x 3 columns]
Plot AE features and custom features
Read pridb and join it with trfdb:
with vae.io.PriDatabase(PRIDB) as pridb:
df_pridb = pridb.read_hits()
df_combined = df_pridb.join(df_trfdb, on="trai", how="left")
print(df_combined)
Hits: 0%| | 0/490 [00:00<?, ?it/s]
Hits: 100%|██████████| 490/490 [00:00<00:00, 105568.57it/s]
time channel param_id ... RMS CrestFactor SpectralPeakFreq
set_id ...
6 0.00 1 2 ... 0.000129 3.684550 63800.0
8 0.01 1 2 ... 0.000134 3.843248 65500.0
10 0.02 1 2 ... 0.000108 3.906766 58500.0
12 0.03 1 2 ... 0.000115 3.876217 58000.0
14 0.04 1 2 ... 0.000106 3.548410 72000.0
... ... ... ... ... ... ... ...
976 4.85 1 2 ... 0.000006 3.624268 162900.0
978 4.86 1 2 ... 0.000006 3.486689 250000.0
980 4.87 1 2 ... 0.000006 3.530853 164100.0
982 4.88 1 2 ... 0.000006 4.059815 174000.0
984 4.89 1 2 ... 0.000006 3.545406 164700.0
[490 rows x 15 columns]
Plot joined features from pridb and trfdb
features = [
# from pridb
"amplitude",
"energy",
"counts",
# from trfdb - custom
"RMS",
"CrestFactor",
"SpectralPeakFreq",
]
df_combined.plot(
x="time",
y=features,
xlabel="Time [s]",
title=features,
legend=False,
subplots=True,
figsize=(8, 10),
)
plt.suptitle("AE Features from pridb and custom features from trfdb")
plt.tight_layout()
plt.show()

Display custom features in VisualAE

Total running time of the script: (0 minutes 0.922 seconds)
Note
Go to the end to download the full example code
Spectrogram
Generate spectrogram from tradb. The vallenae.io.TraDatabase.read_continuous_wave
method is used
to read the transient data as a continuous array.
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import vallenae as vae
from matplotlib.colors import LogNorm
from scipy import signal
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "bearing" / "bearing_plain.tradb"
Read transient data as continous array
The signal is exactly cropped to the given time range (time_start, time_stop). Time gaps are filled with 0’s.
with vae.io.TraDatabase(TRADB) as tradb:
y, fs = tradb.read_continuous_wave(
channel=1,
time_start=None,
time_stop=None,
time_axis=False, # return samplerate instead of time axis
show_progress=False,
)
t = np.arange(0, len(y)) / fs # generate time axis
Compute Short-Time Fourier Transform (STFT)
nfft = 4096
noverlap = 2048
fz, tz, zxx = signal.stft(y, fs=fs, window="hann", nperseg=nfft, noverlap=noverlap)
Plot time data and spectrogram
fig, ax = plt.subplots(
nrows=2,
sharex=True,
figsize=(8, 6),
tight_layout=True,
gridspec_kw={"height_ratios": (1, 2)},
)
ax[0].plot(t, y * 1e3)
ax[0].set(xlabel="Time [s]", ylabel="Amplitude [mV]", title="Waveform")
ax[1].pcolormesh(tz, fz / 1e3, np.abs(zxx), norm=LogNorm())
ax[1].set(xlabel="Time [s]", ylabel="Frequency [kHz]", title="STFT")
plt.show()

Total running time of the script: (0 minutes 2.368 seconds)
Note
Go to the end to download the full example code
Export to WAV
Generate WAV files from tradb. We use the vallenae.io.TraDatabase.read_continuous_wave
method to
read the transient data as a continuous array.
The signal can optionally be decimated to reduce the size of the generated WAV files
(using the scipy.signal.decimate
function).
Export channel 1 to /tmp/tmpt96qdcmt/bearing_plain_ch1.wav
Generated WAV files:
/tmp/tmpt96qdcmt/bearing_plain_ch1.wav
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional
import numpy as np
import vallenae as vae
from scipy import signal
from scipy.io import wavfile
HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
def export_wav(
filename_wav: Path,
tradb: vae.io.TraDatabase,
channel: int,
time_start: Optional[float] = None,
time_stop: Optional[float] = None,
decimation_factor: int = 1,
):
"""
Export data from tradb to a WAV file.
Args:
filename_wav: Path to the generated WAV file
tradb: `TraDatabase` instance
channel: Channel number
time_start: Start reading at relative time (in seconds). Start at beginning if `None`
time_start: Stop reading at relative time (in seconds). Read until end if `None`
decimation_factor: Decimate signal with given factor
"""
y, fs = tradb.read_continuous_wave(
channel=channel,
time_start=time_start,
time_stop=time_stop,
time_axis=False,
show_progress=False,
raw=True, # read as ADC values (int16)
)
if decimation_factor > 1:
y = signal.decimate(y, decimation_factor).astype(np.int16)
fs //= decimation_factor
wavfile.write(filename_wav, fs, y)
def main():
filename_tradb = HERE / "bearing" / "bearing_plain.tradb"
# use a temporary file for this example
with TemporaryDirectory() as tmpdir:
with vae.io.TraDatabase(filename_tradb) as tradb:
for channel in tradb.channel():
filename_wav = Path(tmpdir) / f"{filename_tradb.stem}_ch{channel}.wav"
print(f"Export channel {channel} to {filename_wav}")
export_wav(
filename_wav=filename_wav,
tradb=tradb,
channel=channel,
time_start=0, # read from t = 0 s
time_stop=None, # read until end if None
decimation_factor=5, # custom decimation factor
)
# list all generated wav files
print("Generated WAV files:")
for file in Path(tmpdir).iterdir():
print(file)
if __name__ == "__main__":
main()
Total running time of the script: (0 minutes 0.096 seconds)
Changelog
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
Unreleased
0.9.0 - 2024-02-14
Added
Example for incremental WAV export
Python 3.12 support
0.8.0 - 2023-07-13
Added
Flag
TraRecord.raw
if data is stored as ADC values (int16)Flag
raw
forTraDatabase
read methods to read data as ADC values:TraDatabase.iread
TraDatabase.read
TraDatabase.read_wave
TraDatabase.read_continuous_wave
TraDatabase.listen
Bearing example data
Spectrogram example
WAV export example using the new
raw
flagCI for Python 3.11
Changed
Remove scipy dependency (only needed for examples)
Migrate from setuptools to hatch (replace
setup.py
withpyproject.toml
)
Fixed
Multiprocessing example for Windows
0.7.0 - 2022-11-10
Added
Example for custom feature extraction
PyInstaller hook
CI for Python 3.10
Changed
Make Numba dependency optional (fallback timepicker implementations with NumPy)
Fixed
Counts computations (first sample above threshold is not a count)
0.6.0 - 2021-09-02
Added
CI for Python 3.9
Changed
Remove superfluous
data_format
field fromTraRecord
data type
0.5.4 - 2021-05-25
Fixed
Limit number of buffered records in
listen
methodsTime axis rounding errors, e.g. for
TraDatabase.read_wave
withtime_axis=True
0.5.3 - 2021-05-04
Fixed
SQLite URI for absolute linux paths
0.5.2 - 2021-05-04
Fixed
SQLite URI for special characters (#, ?)
0.5.1 - 2021-03-25
Fixed
Buffering of SQL results in
listen
methods to allow SQL queries in between
0.5.0 - 2021-03-18
Added
Query filter parameter to
TrfDatabase.read
andTrfDatabase.iread
listen
method forPriDatabase
,TraDatabase
andTrfDatabase
to retrieve new records live
Changed
Order feature records by TRAI for
TrfDatabase.read
andTrfDatabase.iread
0.4.0 - 2021-02-14
Added
CI with GitHub actions on Linux, MacOS and Windows
Workflow with GitHub actions to publish to PyPI on new releases
pyproject.toml
as the main config file for pylint, pytest, tox, coverage, …
Changed
Return exact time range with
TraDatabase.read_continuous_wave
Return “absolute” time axis with
TraDatabase.read_continuous_wave
(instead of starting at t = 0 s)
Fixed
Fix database close if exception raised in
__init__
(e.g. file not found)Example
ex6_multiprocessing
for MacOSFind lower/upper bounds for same values (times) in binary search (used by
TraDatabase.iread
)Stop condition for
time_stop
inTraDatabase.iread
Use TRAI for
TraDatabase.iread
as a time sorted index for binary search (SetID is not!)Check for empty time ranges in
TraDatabase.iread
0.3.0 - 2020-11-05
Added
Query filter for pridb/tradb (i)read functions
0.2.4 - 2020-11-01
Fixed
SQL schemas for pridb/tradb/trfdb creation, add fieldinfos
0.2.3 - 2020-09-01
Fixed
AIC timepicker
Add threshold for monotonic time check (1 ns) to ignore rounding issues
Suppress exception chaining
0.2.2 - 2020-07-10
Added
Database classes are now pickable and can be used in multiprocessing
SQLite transactions for all writes
Faster blob encoding (
vallenae.io.encode_data_blob
)Faster RMS computation with Numba (
vallenae.features.rms
)
Fixed
Catch possible global_info table parsing errors
0.2.1 - 2020-02-10
Fixed
Examples outputs if not run as notebook
Out-of-bound time_start, time_stop with SQL binary search
Optional signal strength for e.g. spotWave data acquisition
0.2.0 - 2020-02-06
Added
Database creation with
mode="rwc"
, e.g.vallenae.io.PriDatabase.__init__
Fixed
Number field in
vallenae.io.MarkerRecord
optionalScaling of parametric inputs optional
Keep column order of query if new columns are added to the database
Return array with float32 from
vallenae.io.TraDatabase.read_continuous_wave
(instead of float64)
0.1.0 - 2020-01-24
Initial public release