Vallen AE Python Tools¶
Extract and analyze Acoustic Emission measurement data.
The IO module vallenae.io
enables reading (and writing) of Vallen Systeme SQLite database files:
*.pridb: Primary database
*.tradb: Transient data
*.trfdb: Transient features
The remaining modules are system-independent and try to comprise the most common state-of-the-art algorithms in Acoustic Emission:
vallenae.features
: Extraction of Acoustic Emission featuresvallenae.timepicker
: Timepicking algorithms for arrival time estimations
IO¶
Read/write Vallen Systeme database and setup files.
Database classes¶
Classes to read/write pridb, tradb and trfdb database files.
Warning
Writing is still experimental
|
IO Wrapper for pridb database file. |
|
IO Wrapper for tradb database file. |
|
IO Wrapper for trfdb (transient feature) database file. |
All database classes implement two different interfaces to access data:
Standard read_*
Read data to
pandas.DataFrame
, e.g. withPriDatabase.read_hits
>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb") >>> df = pridb.read_hits() # save all hits to pandas dataframe >>> df[["time", "channel"]] # output columns hit and channel time channel set_id 10 3.992771 3 11 3.992775 2 12 3.992813 4 13 3.992814 1
Streaming iread_*
Iterate through the data row by row. This is a memory-efficient solution ideal for batch processing. The return types are specific
typing.NamedTuple
, see Data types.Example with
PriDatabase.iread_hits
:>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb") >>> for hit in pridb.iread_hits(): ... print(f"time: {hit.time:0.4f}, channel: {hit.channel}") ... time: 3.9928, channel: 3 time: 3.9928, channel: 2 time: 3.9928, channel: 4 time: 3.9928, channel: 1 >>> type(hit) <class 'vallenae.io.datatypes.HitRecord'>
Data types¶
Records of the database are represented as typing.NamedTuple
.
Each record implements a class method from_sql
to init from a SQLite
row dictionary (column name: value).
|
Hit record in pridb (SetType = 2). |
|
Marker record in pridb (SetType = 4, 5, 6). |
|
Status data record in pridb (SetType = 3). |
|
Parametric data record in pridb (SetType = 1). |
|
Transient data record in tradb. |
|
Transient feature record in trfdb. |
Compression¶
Transient signals in the tradb are stored as BLOBs of 16-bit ADC values – either uncompressed or compressed (FLAC). Following functions convert between BLOBs and arrays of voltage values.
|
Decodes (compressed) 16-bit ADC values from BLOB to array of voltage values. |
|
Encodes array of voltage values to BLOB of 16-bit ADC values for memory-efficient storage. |
Features¶
Acoustic Emission¶
|
Compute maximum absolute amplitude. |
|
Compute index of peak amplitude. |
|
Checks if absolute amplitudes are above threshold. |
|
Compute index of first threshold crossing. |
|
Compute the rise time. |
|
Compute the energy of a hit. |
|
Compute the signal strength of a hit. |
|
Compute the number of positive threshold crossings of a hit (counts). |
|
Compute the root mean square (RMS) of an array. |
Conversion¶
|
Convert amplitude from volts to decibel (dB). |
|
Convert amplitude from decibel (dB) to volts. |
Timerpicker¶
The determination of signal arrival times has a major influence on the localization accuracy. Usually, arrival times are determined by the first threshold crossing (either fixed or adaptive). Following popular methods have been proposed in the past to automatically pick time of arrivals:
|
Hinkley criterion for arrival time estimation. |
|
Akaike Information Criterion (AIC) for arrival time estimation. |
|
Energy ratio for arrival time estimation. |
|
Modified energy ratio method for arrival time estimation. |
Examples¶
A collection of examples how to read and analyse Acoustic Emission data.
Note
Click here to download the full example code
Read pridb¶
import os
import matplotlib.pyplot as plt
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
PRIDB = os.path.join(HERE, "steel_plate/sample.pridb")
Open pridb¶
pridb = vae.io.PriDatabase(PRIDB)
print("Tables in database: ", pridb.tables())
print("Number of rows in data table (ae_data): ", pridb.rows())
print("Set of all channels: ", pridb.channel())
Out:
Tables in database: {'ae_params', 'ae_markers', 'data_integrity', 'ae_fieldinfo', 'acq_setup', 'ae_data', 'ae_globalinfo'}
Number of rows in data table (ae_data): 18
Set of all channels: {1, 2, 3, 4}
Read hits to Pandas DataFrame¶
df_hits = pridb.read_hits()
# Print a few columns
print(df_hits[["time", "channel", "amplitude", "counts", "energy"]])
Out:
Hits: 0%| | 0/4 [00:00<?, ?it/s]
Hits: 100%|##########| 4/4 [00:00<00:00, 10761.52it/s]
time channel amplitude counts energy
set_id
10 3.992771 3 0.046539 2180 2.799510e+07
11 3.992775 2 0.059621 2047 2.276279e+07
12 3.992813 4 0.034119 1854 1.286700e+07
13 3.992814 1 0.029115 1985 1.265275e+07
Query Pandas DataFrame¶
DataFrames offer powerful features to query and aggregate data, e.g. plot summed energy per channel
ax = df_hits.groupby("channel").sum()["energy"].plot.bar(figsize=(8, 3))
ax.set_xlabel("Channel")
ax.set_ylabel("Summed Energy [eu = 1e-14 V²s]")
plt.tight_layout()
plt.show()

Read markers¶
df_markers = pridb.read_markers()
print(df_markers)
Out:
Marker: 0%| | 0/5 [00:00<?, ?it/s]
Marker: 100%|##########| 5/5 [00:00<00:00, 17697.49it/s]
time set_type data number
set_id
1 0.00 6 1
2 0.00 4 10:52 Resume 1
3 0.00 5 2019-09-20 10:54:52 <NA>
4 0.00 4 TimeZone: +02:00 (W. Europe Standard Time) 2
18 100.07 4 10:56 Suspend 3
Read parametric data¶
df_parametric = pridb.read_parametric()
print(df_parametric)
Out:
Parametric: 0%| | 0/9 [00:00<?, ?it/s]
Parametric: 100%|##########| 9/9 [00:00<00:00, 25488.68it/s]
time param_id pctd pcta
set_id
5 0.00 1 0 0
6 1.00 1 0 0
7 2.00 1 0 0
8 3.00 1 0 0
9 3.99 1 0 0
14 4.00 1 0 0
15 5.00 1 0 0
16 6.00 1 0 0
17 6.45 1 0 0
Total running time of the script: ( 0 minutes 0.233 seconds)
Note
Click here to download the full example code
Read and plot transient data¶

import os
import matplotlib.pyplot as plt
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb") # uncompressed
TRAI = 4 # just an example, no magic here
def main():
# Read waveform from tradb
with vae.io.TraDatabase(TRADB) as tradb:
y, t = tradb.read_wave(TRAI)
y *= 1e3 # in mV
t *= 1e6 # for µs
# Plot waveforms
plt.figure(figsize=(8, 4), tight_layout=True)
plt.plot(t, y)
plt.xlabel("Time [µs]")
plt.ylabel("Amplitude [mV]")
plt.title(f"Transient Wave Plot; trai = {TRAI}")
plt.show()
if __name__ == "__main__":
main()
Total running time of the script: ( 0 minutes 0.194 seconds)
Note
Click here to download the full example code
Timepicker¶
Following example showcases the results of different timepicking methods.
For more informations, please refer to the functions documentation (vallenae.timepicker
).
import os
import time
import matplotlib.pyplot as plt
import numpy as np
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")
TRAI = 4
SAMPLES = 2000
plt.ioff() # Turn interactive mode off; plt.show() is blocking
Read waveform from tradb¶
tradb = vae.io.TraDatabase(TRADB)
y, t = tradb.read_wave(TRAI)
# crop first samples
t = t[:SAMPLES]
y = y[:SAMPLES]
# unit conversion
t *= 1e6 # convert to µs
y *= 1e3 # convert to mV
Prepare plotting with time-picker results¶
def plot(t_wave, y_wave, y_picker, index_picker, name_picker):
_, ax1 = plt.subplots(figsize=(8, 4), tight_layout=True)
ax1.set_xlabel("Time [µs]")
ax1.set_ylabel("Amplitude [mV]", color="g")
ax1.plot(t_wave, y_wave, color="g")
ax1.tick_params(axis="y", labelcolor="g")
ax2 = ax1.twinx()
ax2.set_ylabel(f"{name_picker}", color="r")
ax2.plot(t_wave, y_picker, color="r")
ax2.tick_params(axis="y", labelcolor="r")
plt.axvline(t_wave[index_picker], color="k", linestyle=":")
plt.show()
Hinkley Criterion¶
hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=5)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")

The negative trend correlates to the chosen alpha value and can influence the results strongly. Results with alpha = 50 (less negative trend):
hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=50)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")

Akaike Information Criterion (AIC)¶
aic_arr, aic_index = vae.timepicker.aic(y)
plot(t, y, aic_arr, aic_index, "Akaike Information Criterion")

Energy Ratio¶
er_arr, er_index = vae.timepicker.energy_ratio(y)
plot(t, y, er_arr, er_index, "Energy Ratio")

Modified Energy Ratio¶
mer_arr, mer_index = vae.timepicker.modified_energy_ratio(y)
plot(t, y, mer_arr, mer_index, "Modified Energy Ratio")

Performance comparison¶
All timepicker implementations are using Numba for just-in-time (JIT) compilations. Usually the first function call is slow, because it will trigger the JIT compiler. To compare the performance to a native or numpy implementation, the average of multiple executions should be compared.
def timeit(callable, loops=100):
time_start = time.perf_counter()
for _ in range(loops):
callable()
return 1e6 * (time.perf_counter() - time_start) / loops # elapsed time in µs
timer_results = {
"Hinkley": timeit(lambda: vae.timepicker.hinkley(y, 5)),
"AIC": timeit(lambda: vae.timepicker.aic(y)),
"Energy Ratio": timeit(lambda: vae.timepicker.energy_ratio(y)),
"Modified Energy Ratio": timeit(lambda: vae.timepicker.modified_energy_ratio(y)),
}
for name, time in timer_results.items():
print(f"{name}: {time:0.3f} µs")
plt.figure(figsize=(8, 3), tight_layout=True)
plt.bar(timer_results.keys(), timer_results.values())
plt.ylabel("Time [µs]")
plt.show()

Out:
Hinkley: 10.045 µs
AIC: 133.962 µs
Energy Ratio: 11.592 µs
Modified Energy Ratio: 15.917 µs
Total running time of the script: ( 0 minutes 3.110 seconds)
Note
Click here to download the full example code
Timepicker batch processing¶
Following examples shows how to stream transient data row by row, compute timepicker results and save the results to a feature database (trfdb).
import os
from shutil import copyfile
from tempfile import gettempdir
import matplotlib.pyplot as plt
import pandas as pd
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")
TRFDB = os.path.join(HERE, "steel_plate/sample.trfdb")
TRFDB_TMP = os.path.join(gettempdir(), "sample.trfdb")
Open tradb (readonly) and trfdb (readwrite)¶
copyfile(TRFDB, TRFDB_TMP) # copy trfdb, so we don't overwrite it
tradb = vae.io.TraDatabase(TRADB)
trfdb = vae.io.TrfDatabase(TRFDB_TMP, mode="rw") # allow writing
Read current trfdb¶
print(trfdb.read())
Out:
Trf: 0%| | 0/4 [00:00<?, ?it/s]
Trf: 100%|##########| 4/4 [00:00<00:00, 86480.49it/s]
FFT_CoG FFT_FoM PA ... CTP FI FR
trai ...
1 147.705078 134.277344 46.483864 ... 11 222.672058 110.182449
2 144.042969 139.160156 59.450512 ... 35 182.291672 98.019981
3 155.029297 164.794922 33.995209 ... 55 155.191879 95.493233
4 159.912109 139.160156 29.114828 ... 29 181.023727 101.906227
[4 rows x 8 columns]
Compute arrival time offsets with different timepickers¶
To improve localisation, time of arrival estimates using the first threshold crossing can be refined with timepickers. Therefore, arrival time offsets between the first threshold crossings and the timepicker results are computed.
def dt_from_timepicker(timepicker_func, tra: vae.io.TraRecord):
# Index of the first threshold crossing is equal to the pretrigger samples
index_ref = tra.pretrigger
# Only analyse signal until peak amplitude
index_peak = vae.features.peak_amplitude_index(tra.data)
data = tra.data[:index_peak]
# Get timepicker result
_, index_timepicker = timepicker_func(data)
# Compute offset in µs
dt_us = (index_timepicker - index_ref) * 1e6 / tra.samplerate
return dt_us
Transient data is streamed from the database row by row using vallenae.io.TraDatabase.iread
.
Only one transient data set is loaded into memory at a time.
That makes the streaming interface ideal for batch processing.
The timepicker results are saved to the trfdb using vallenae.io.TrfDatabase.write
.
for tra in tradb.iread():
# Calculate arrival time offsets with different timepickers
feature_set = vae.io.FeatureRecord(
trai=tra.trai,
features={
"ATO_Hinkley": dt_from_timepicker(vae.timepicker.hinkley, tra),
"ATO_AIC": dt_from_timepicker(vae.timepicker.aic, tra),
"ATO_ER": dt_from_timepicker(vae.timepicker.energy_ratio, tra),
"ATO_MER": dt_from_timepicker(vae.timepicker.modified_energy_ratio, tra),
}
)
# Save results to trfdb
trfdb.write(feature_set)
Read results from trfdb¶
print(trfdb.read().filter(regex="ATO"))
Out:
Trf: 0%| | 0/4 [00:00<?, ?it/s]
Trf: 100%|##########| 4/4 [00:00<00:00, 85598.04it/s]
ATO_Hinkley ATO_AIC ATO_ER ATO_MER
trai
1 30.4 -1.8 -4.0 29.2
2 36.4 -0.8 -1.0 -0.4
3 67.0 -1.8 -3.2 60.4
4 65.8 -1.0 -2.2 64.2
Plot results¶
ax = trfdb.read()[["ATO_Hinkley", "ATO_AIC", "ATO_ER", "ATO_MER"]].plot.barh()
ax.invert_yaxis()
ax.set_xlabel("Arrival time offset [µs]")
plt.show()

Out:
Trf: 0%| | 0/4 [00:00<?, ?it/s]
Trf: 100%|##########| 4/4 [00:00<00:00, 102927.71it/s]
Plot waveforms and arrival times¶
_, axes = plt.subplots(4, 1, tight_layout=True, figsize=(8, 8))
for row, ax in zip(trfdb.read().itertuples(), axes):
trai = row.Index
# read waveform from tradb
y, t = tradb.read_wave(trai)
# plot waveform
ax.plot(t[400:1000] * 1e6, y[400:1000] * 1e3, "k") # crop and convert to µs/mV
ax.set_title(f"trai = {trai}")
ax.set_xlabel("Time [µs]")
ax.set_ylabel("Amplitude [mV]")
ax.label_outer()
# plot arrival time offsets
ax.axvline(row.ATO_Hinkley, color="C0")
ax.axvline(row.ATO_AIC, color="C1")
ax.axvline(row.ATO_ER, color="C2")
ax.axvline(row.ATO_MER, color="C3")
axes[0].legend(["Waveform", "Hinkley", "AIC", "ER", "MER"])
plt.show()

Out:
Trf: 0%| | 0/4 [00:00<?, ?it/s]
Trf: 100%|##########| 4/4 [00:00<00:00, 80273.76it/s]
Use results in VisualAE¶
The computed arrival time offsets can be directly used in VisualAE.
We only need to specify the unit. VisualAE requires them to be in µs.
Units and other column-related meta data is saved in the trf_fieldinfo table.
Field infos can be retrieved with vallenae.io.TrfDatabase.fieldinfo
:
print(trfdb.fieldinfo())
Out:
{'FFT_CoG': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(C.o.Gravity)', 'Description': 'Center of gravity of spectrum', 'ShortName': None, 'FormatStr': None}, 'FFT_FoM': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(max. Amp.)', 'Description': 'Frequency of maximum of spectrum', 'ShortName': None, 'FormatStr': None}, 'PA': {'SetTypes': 8, 'Unit': '[mV]', 'LongName': 'Peak Amplitude', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'RT': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Rise Time', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'Dur': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Duration (available)', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'CTP': {'SetTypes': 8, 'Unit': None, 'LongName': 'Cnts to peak', 'Description': None, 'ShortName': None, 'FormatStr': '#'}, 'FI': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Initiation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'FR': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Reverberation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}}
Show results as table:
print(pd.DataFrame(trfdb.fieldinfo()))
Out:
FFT_CoG ... FR
SetTypes 2 ... 8
Unit [kHz] ... [kHz]
LongName F(C.o.Gravity) ... Reverberation Freq.
Description Center of gravity of spectrum ... None
ShortName None ... None
FormatStr None ... None
[6 rows x 8 columns]
Write units to trfdb¶
Field infos can be written with vallenae.io.TrfDatabase.write_fieldinfo
:
trfdb.write_fieldinfo("ATO_Hinkley", {"Unit": "[µs]", "LongName": "Arrival Time Offset (Hinkley)"})
trfdb.write_fieldinfo("ATO_AIC", {"Unit": "[µs]", "LongName": "Arrival Time Offset (AIC)"})
trfdb.write_fieldinfo("ATO_ER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (ER)"})
trfdb.write_fieldinfo("ATO_MER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (MER)"})
print(pd.DataFrame(trfdb.fieldinfo()).filter(regex="ATO"))
Out:
ATO_Hinkley ... ATO_MER
SetTypes None ... None
Unit [µs] ... [µs]
LongName Arrival Time Offset (Hinkley) ... Arrival Time Offset (MER)
Description None ... None
ShortName None ... None
FormatStr None ... None
[6 rows x 4 columns]
Load results in VisualAE¶
Time arrival offsets can be specified in the settings of Location Processors - Channel Positions - Arrival Time Offset. (Make sure to rename the generated trfdb to match the filename of the pridb.)

Total running time of the script: ( 0 minutes 0.889 seconds)
Note
Click here to download the full example code
Localisation¶

Out:
Hits: 0%| | 0/4 [00:00<?, ?it/s]
Hits: 100%|##########| 4/4 [00:00<00:00, 13400.33it/s]
/home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/checkouts/0.5.1/examples/ex5_location.py:122: MatplotlibDeprecationWarning: shading='flat' when X and Y have the same dimensions as C is deprecated since 3.3. Either specify the corners of the quadrilaterals with X and Y, or pass shading='auto', 'nearest' or 'gouraud', or set rcParams['pcolor.shading']. This will become an error two minor releases later.
plt.pcolormesh(x_grid, y_grid, z_grid, cmap="cool")
Runtime for 1 call to differential_evolution(): 0.608 s
fun: 0.0011158885067467825
jac: array([0.00072356, 0.00151068])
message: 'Optimization terminated successfully.'
nfev: 8169
nit: 101
success: True
x: array([0.22165341, 0.36565977])
import math
import os
import time
import xml.etree.ElementTree as ElementTree
from typing import Dict, Optional, Tuple
import matplotlib.pyplot as plt
import numpy as np
from numba import f8, njit
from numpy.linalg import norm
from scipy.optimize import differential_evolution
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
SETUP = os.path.join(HERE, "steel_plate/sample.vaex")
PRIDB = os.path.join(HERE, "steel_plate/sample.pridb")
NUMBER_SENSORS = 4
@njit(f8(f8[:], f8, f8[:, :], f8[:]))
def lucy_error_fun(
test_pos: np.ndarray,
speed: float,
sens_poss: np.ndarray,
measured_delta_ts: np.ndarray,
) -> float:
"""
Implementation of the LUCY computation in 2D as documented in
the Vallen online help.
Args:
test_pos: Emitter position to test.
speed: Assumed speed of sound in a plate-like structure.
sens_poss: Sensor positions, often a 4x2 array, has to match
the sorting of the delta-ts.
measured_delta_ts: The measured time differences in seconds, has to
match the order of the sensor positions.
Returns:
The LUCY value as a float. Ideally 0, in practice never 0, always positive.
"""
m = len(measured_delta_ts)
n = m + 1
measured_delta_dists = speed * measured_delta_ts
theo_dists = np.zeros(n)
theo_delta_dists = np.zeros(m)
for i in range(n):
theo_dists[i] = norm(test_pos - sens_poss[i, :])
for i in range(m):
theo_delta_dists[i] = theo_dists[i + 1] - theo_dists[0]
# LUCY definition taken from the vallen online help:
lucy_val = norm(theo_delta_dists - measured_delta_dists) / math.sqrt(n - 1)
return lucy_val
def get_channel_positions(setup_file: str) -> Dict[int, Tuple[float, float]]:
tree = ElementTree.parse(setup_file)
nodes = tree.getroot().findall(".//ChannelPos")
if nodes is None:
raise RuntimeError("Can not retrieve channel positions from %s", setup_file)
channel_positions = {
int(elem.get("Chan")): (float(elem.get("X")), float(elem.get("Y"))) # type: ignore
for elem in nodes if elem is not None
}
return channel_positions
def get_velocity(setup_file: str) -> Optional[float]:
tree = ElementTree.parse(setup_file)
node = tree.getroot().find(".//Location")
if node is not None:
velocity_str = node.get("Velocity")
if velocity_str is not None:
return float(velocity_str) * 1e3 # convert to m/s
raise RuntimeError("Can not retrieve velocity from %s", setup_file)
def main():
# Consts plotting
text_delta_y = 0.03
text_delta_x = -0.12
# Consts LUCY grid
grid_delta = 0.01
location_search_bounds = [(0.0, 0.80), (0.0, 0.80)]
# Read from pridb
pridb = vae.io.PriDatabase(PRIDB)
hits = pridb.read_hits()
pridb.close()
channel_order = hits["channel"].to_numpy()
arrival_times = hits["time"].to_numpy()
delta_ts = (arrival_times - arrival_times[0])[1:]
# Get localisation parameters from .vaex file
velocity = get_velocity(SETUP)
pos_dict = get_channel_positions(SETUP)
# Order sensor positions by hit occurence
pos_ordered = np.array([pos_dict[ch] for ch in channel_order])
# Compute heatmap
lucy_instance_2args = lambda x, y: lucy_error_fun(
np.array([x, y]), velocity, pos_ordered, delta_ts
)
x_range = np.arange(location_search_bounds[0][0], location_search_bounds[0][1], grid_delta)
y_range = x_range
x_grid, y_grid = np.meshgrid(x_range, y_range)
z_grid = np.vectorize(lucy_instance_2args)(x_grid, y_grid)
# Plot heatmap
plt.figure(tight_layout=True)
plt.pcolormesh(x_grid, y_grid, z_grid, cmap="cool")
plt.colorbar()
plt.title("Location Result and LUCY-Heatmap")
plt.xlabel("x [m]")
plt.ylabel("y [m]")
# Compute location
lucy_instance_single_arg = lambda pos: lucy_error_fun(
pos, velocity, pos_ordered, delta_ts
)
start = time.perf_counter()
# These are excessive search / overkill parameters:
location_result = differential_evolution(
lucy_instance_single_arg,
location_search_bounds,
popsize=40,
polish=True,
strategy="rand1bin",
recombination=0.1,
mutation=1.3,
)
end = time.perf_counter()
print(f"Runtime for 1 call to differential_evolution(): {(end - start):0.4} s")
print(location_result)
# Plot location result
x_res = location_result.x[0]
y_res = location_result.x[1]
plt.plot([x_res], [y_res], "bo")
plt.text(
x_res + text_delta_x,
y_res + text_delta_y,
"location result",
fontsize=9,
color="b",
)
# Plot sensor positions
for channel, (x, y) in pos_dict.items():
text = f"S{channel} (x={x:0.2f}m | y={y:0.2f}m)"
plt.scatter(x, y, marker="o", color="w")
plt.text(x + text_delta_x, y + text_delta_y, text, fontsize=9, color="w")
plt.show()
if __name__ == "__main__":
main()
Total running time of the script: ( 0 minutes 1.521 seconds)
Note
Click here to download the full example code
Go fast with multiprocessing¶
The streaming interfaces with iterables allow efficient batch processing as shown here. But still only one core/thread will be utilized. We will change that will multiprocessing.
Following example shows a batch feature extraction procedure using multiple CPU cores.
import os
import time
import multiprocessing
from typing import Dict, Iterable
from itertools import cycle
import __main__
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")
Prepare streaming reads¶
tradb = vae.io.TraDatabase(TRADB)
Our sample tradb only contains four data sets. That is not enough data for demonstrating batch processing. Therefore, we will simulate more data by looping over the data sets with following generator/iterable:
def tra_generator(loops: int = 1000) -> Iterable[vae.io.TraRecord]:
for loop, tra in enumerate(cycle(tradb.iread())):
if loop > loops:
break
yield tra
Define feature extraction function¶
Following function will be applied to all data sets and returns computed features:
def feature_extraction(tra: vae.io.TraRecord) -> Dict[str, float]:
# compute random statistical features
return {
"Std": np.std(tra.data),
"Skew": stats.skew(tra.data),
}
# Fix to use pickle serialization in sphinx gallery
setattr(__main__, feature_extraction.__name__, feature_extraction)
Compute with single thread/core¶
Note
The examples are executed on the CI / readthedocs server with limited resources. Therefore, the shown computation times and speedups are below the capability of modern machines.
Run computation in a single thread and get the time:
time_elapsed_ms = lambda t0: 1e3 * (time.perf_counter() - t0)
time_start = time.perf_counter()
for tra in tra_generator():
results = feature_extraction(tra)
# do something with the results
time_single_thread = time_elapsed_ms(time_start)
print(f"Time single thread: {time_single_thread:.2f} ms")
Out:
Time single thread: 773.09 ms
Compute with multiple processes/cores¶
First get number of available cores in your machine:
print(f"Available CPU cores: {os.cpu_count()}")
Out:
Available CPU cores: 2
But how can we utilize those cores? The common answer for most programming languages is multithreading. Threads run in the same process and heap, so data can be shared between them (with care). Sadly, Python uses a global interpreter lock (GIL) that locks heap memory, because Python objects are not thread-safe. Therefore, threads are blocking each other and no speedups are gained by using multiple threads.
The solution for Python is multiprocessing to work around the GIL. Every process has its own heap and GIL. Multiprocessing will introduce overhead for interprocess communication and data serialization/deserialization. To reduce the overhead, data is sent in bigger chunks.
Run computation on 4 cores with chunks of 128 data sets and get the time / speedup:
with multiprocessing.Pool(4) as pool:
time_start = time.perf_counter()
for results in pool.imap(feature_extraction, tra_generator(), chunksize=128):
pass # do something with the results
time_multiprocessing = time_elapsed_ms(time_start)
print(f"Time multiprocessing: {time_multiprocessing:.2f} ms")
print(f"Speedup: {(time_single_thread / time_multiprocessing):.2f}")
Out:
Time multiprocessing: 780.51 ms
Speedup: 0.99
Variation of the chunksize¶
Following results show how the chunksize impacts the overall performance. The speedup is measured for different chunksizes and plotted against the chunksize:
chunksizes = (10, 40, 60, 80, 100, 120, 140, 160, 200)
speedup_chunksizes = []
with multiprocessing.Pool(4) as pool:
for chunksize in chunksizes:
time_start = time.perf_counter()
for results in pool.imap(feature_extraction, tra_generator(), chunksize=chunksize):
pass # do something with the results
speedup_chunksizes.append(time_single_thread / time_elapsed_ms(time_start))
plt.figure(tight_layout=True, figsize=(6, 3))
plt.plot(chunksizes, speedup_chunksizes)
plt.xlabel("Chunksize")
plt.ylabel("Speedup")
plt.show()

Total running time of the script: ( 0 minutes 9.105 seconds)
Changelog¶
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.
0.5.1 - 2020-03-25¶
Fixed¶
Buffering of SQL results in
listen
methods to allow SQL queries in between
0.5.0 - 2020-03-18¶
Added¶
Query filter parameter to
TrfDatabase.read
andTrfDatabase.iread
listen
method forPriDatabase
,TraDatabase
andTrfDatabase
to retrieve new records live
Changed¶
Order feature records by TRAI for
TrfDatabase.read
andTrfDatabase.iread
0.4.0 - 2021-02-14¶
Added¶
CI with GitHub actions on Linux, MacOS and Windows
Workflow with GitHub actions to publish to PyPI on new releases
pyproject.toml
as the main config file for pylint, pytest, tox, coverage, …
Changed¶
Return exact time range with
TraDatabase.read_continuous_wave
Return “absolute” time axis with
TraDatabase.read_continuous_wave
(instead of starting at t = 0 s)
Fixed¶
Fix database close if exception raised in
__init__
(e.g. file not found)Example
ex6_multiprocessing
for MacOSFind lower/upper bounds for same values (times) in binary search (used by
TraDatabase.iread
)Stop condition for
time_stop
inTraDatabase.iread
Use TRAI for
TraDatabase.iread
as a time sorted index for binary search (SetID is not!)Check for empty time ranges in
TraDatabase.iread
0.2.3 - 2020-09-01¶
Fixed¶
AIC timepicker
Add threshold for monotonic time check (1 ns) to ignore rounding issues
Suppress exception chaining
0.2.2 - 2020-07-10¶
Added¶
Database classes are now pickable and can be used in multiprocessing
SQLite transactions for all writes
Faster blob encoding (
vallenae.io.encode_data_blob
)Faster RMS computation with Numba (
vallenae.features.rms
)
Fixed¶
Catch possible global_info table parsing errors
0.2.1 - 2020-02-10¶
Fixed¶
Examples outputs if not run as notebook
Out-of-bound time_start, time_stop with SQL binary search
Optional signal strength for e.g. spotWave data acquisition
0.2.0 - 2020-02-06¶
Added¶
Database creation with
mode="rwc"
, e.g.vallenae.io.PriDatabase.__init__
Fixed¶
Number field in
vallenae.io.MarkerRecord
optionalScaling of parametric inputs optional
Keep column order of query if new columns are added to the database
Return array with float32 from
vallenae.io.TraDatabase.read_continuous_wave
(instead of float64)
ToDos¶
Todo
Status flag
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.1/lib/python3.7/site-packages/vallenae/io/pridb.py:docstring of vallenae.io.pridb.PriDatabase.write_hit, line 10.)
Todo
Status flag
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.1/lib/python3.7/site-packages/vallenae/io/pridb.py:docstring of vallenae.io.pridb.PriDatabase.write_parametric, line 10.)
Todo
Status flag
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.1/lib/python3.7/site-packages/vallenae/io/pridb.py:docstring of vallenae.io.pridb.PriDatabase.write_status, line 10.)
Todo
Status flag
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.1/lib/python3.7/site-packages/vallenae/io/tradb.py:docstring of vallenae.io.tradb.TraDatabase.write, line 9.)
Todo
Remove RMS
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.1/lib/python3.7/site-packages/vallenae/io/datatypes.py:docstring of vallenae.io.datatypes.TraRecord, line 3.)
Todo
Weak performance, if used with default parameter alpha
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.1/lib/python3.7/site-packages/vallenae/timepicker/timepicker.py:docstring of vallenae.timepicker.timepicker.hinkley, line 22.)