Vallen AE Python Tools¶
Extract and analyze Acoustic Emission measurement data.
The IO module vallenae.io
enables reading (and writing) of Vallen Systeme SQLite database files:
*.pridb: Primary database
*.tradb: Transient data
*.trfdb: Transient features
The remaining modules are system-independent and try to comprise the most common state-of-the-art algorithms in Acoustic Emission:
vallenae.features
: Extraction of Acoustic Emission featuresvallenae.timepicker
: Timepicking algorithms for arrival time estimations
IO¶
Read/write Vallen Systeme database and setup files.
Database classes¶
Classes to read/write pridb, tradb and trfdb database files.
Warning
Writing is still experimental
|
IO Wrapper for pridb database file. |
|
IO Wrapper for tradb database file. |
|
IO Wrapper for trfdb (transient feature) database file. |
All database classes implement two different interfaces to access data:
Standard read_*
Read data to
pandas.DataFrame
, e.g. withPriDatabase.read_hits
>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb") >>> df = pridb.read_hits() # save all hits to pandas dataframe >>> df[["time", "channel"]] # output columns hit and channel time channel set_id 10 3.992771 3 11 3.992775 2 12 3.992813 4 13 3.992814 1
Streaming iread_*
Iterate through the data row by row. This is a memory-efficient solution ideal for batch processing. The return types are specific
typing.NamedTuple
, see Data types.Example with
PriDatabase.iread_hits
:>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb") >>> for hit in pridb.iread_hits(): ... print(f"time: {hit.time:0.4f}, channel: {hit.channel}") ... time: 3.9928, channel: 3 time: 3.9928, channel: 2 time: 3.9928, channel: 4 time: 3.9928, channel: 1 >>> type(hit) <class 'vallenae.io.datatypes.HitRecord'>
Data types¶
Records of the database are represented as typing.NamedTuple
.
Each record implements a class method from_sql
to init from a SQLite
row dictionary (column name: value).
Hit record in pridb (SetType = 2). |
|
Marker record in pridb (SetType = 4, 5, 6). |
|
Status data record in pridb (SetType = 3). |
|
Parametric data record in pridb (SetType = 1). |
|
Transient data record in tradb. |
|
Transient feature record in trfdb. |
Compression¶
Transient signals in the tradb are stored as BLOBs of 16-bit ADC values – either uncompressed or compressed (FLAC). Following functions convert between BLOBs and arrays of voltage values.
|
Decodes (compressed) 16-bit ADC values from BLOB to array of voltage values. |
|
Encodes array of voltage values to BLOB of 16-bit ADC values for memory-efficient storage. |
Features¶
Acoustic Emission¶
|
Compute maximum absolute amplitude. |
|
Compute index of peak amplitude. |
|
Checks if absolute amplitudes are above threshold. |
|
Compute index of first threshold crossing. |
|
Compute the rise time. |
|
Compute the energy of a hit. |
|
Compute the signal strength of a hit. |
|
Compute the number of positive threshold crossings of a hit (counts). |
|
Compute the root mean square (RMS) of an array. |
Conversion¶
|
Convert amplitude from volts to decibel (dB). |
|
Convert amplitude from decibel (dB) to volts. |
Timerpicker¶
The determination of signal arrival times has a major influence on the localization accuracy. Usually, arrival times are determined by the first threshold crossing (either fixed or adaptive). Following popular methods have been proposed in the past to automatically pick time of arrivals:
|
Hinkley criterion for arrival time estimation. |
|
Akaike Information Criterion (AIC) for arrival time estimation. |
|
Energy ratio for arrival time estimation. |
|
Modified energy ratio method for arrival time estimation. |
Examples¶
A collection of examples how to read and analyse Acoustic Emission data.
Note
Click here to download the full example code
Read pridb¶
import os
import matplotlib.pyplot as plt
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
PRIDB = os.path.join(HERE, "steel_plate/sample.pridb")
Open pridb¶
pridb = vae.io.PriDatabase(PRIDB)
print("Tables in database: ", pridb.tables())
print("Number of rows in data table (ae_data): ", pridb.rows())
print("Set of all channels: ", pridb.channel())
Out:
Tables in database: {'ae_markers', 'ae_globalinfo', 'ae_data', 'data_integrity', 'ae_params', 'acq_setup', 'ae_fieldinfo'}
Number of rows in data table (ae_data): 18
Set of all channels: {1, 2, 3, 4}
Read hits to Pandas DataFrame¶
df_hits = pridb.read_hits()
# Print a few columns
df_hits[["time", "channel", "amplitude", "counts", "energy"]]
Out:
Hits: 0%| | 0/4 [00:00<?, ?it/s]
Hits: 100%|##########| 4/4 [00:00<00:00, 7307.15it/s]
Query Pandas DataFrame¶
DataFrames offer powerful features to query and aggregate data, e.g. plot summed energy per channel
ax = df_hits.groupby("channel").sum()["energy"].plot.bar(figsize=(8, 3))
ax.set_xlabel("Channel")
ax.set_ylabel("Summed Energy [eu = 1e-14 V²s]")
plt.tight_layout()

Read markers¶
df_markers = pridb.read_markers()
df_markers
Out:
Marker: 0%| | 0/5 [00:00<?, ?it/s]
Marker: 100%|##########| 5/5 [00:00<00:00, 19134.60it/s]
Read parametric data¶
df_parametric = pridb.read_parametric()
df_parametric
Out:
Parametric: 0%| | 0/9 [00:00<?, ?it/s]
Parametric: 100%|##########| 9/9 [00:00<00:00, 19558.93it/s]
Total running time of the script: ( 0 minutes 0.235 seconds)
Note
Click here to download the full example code
Read and plot transient data¶

import os
import matplotlib.pyplot as plt
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb") # uncompressed
TRAI = 4 # just an example, no magic here
plt.ioff() # Turn interactive mode off; plt.show() is blocking
def main():
# Read waveform from tradb
with vae.io.TraDatabase(TRADB) as tradb:
y, t = tradb.read_wave(TRAI)
y *= 1e3 # in mV
t *= 1e6 # for µs
# Plot waveforms
plt.figure(figsize=(8, 4), tight_layout=True)
plt.plot(t, y)
plt.xlabel("Time [µs]")
plt.ylabel("Amplitude [mV]")
plt.title(f"Transient Wave Plot; trai = {TRAI}")
plt.show()
if __name__ == "__main__":
main()
Total running time of the script: ( 0 minutes 0.230 seconds)
Note
Click here to download the full example code
Timepicker¶
Following example showcases the results of different timepicking methods.
For more informations, please refer to the functions documentation (vallenae.timepicker
).
import os
import time
import matplotlib.pyplot as plt
import numpy as np
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")
TRAI = 4
SAMPLES = 2000
plt.ioff() # Turn interactive mode off; plt.show() is blocking
Read waveform from tradb¶
tradb = vae.io.TraDatabase(TRADB)
y, t = tradb.read_wave(TRAI)
# crop first samples
t = t[:SAMPLES]
y = y[:SAMPLES]
# unit conversion
t *= 1e6 # convert to µs
y *= 1e3 # convert to mV
Prepare plotting with time-picker results¶
def plot(t_wave, y_wave, y_picker, index_picker, name_picker):
_, ax1 = plt.subplots(figsize=(8, 4), tight_layout=True)
ax1.set_xlabel("Time [µs]")
ax1.set_ylabel("Amplitude [mV]", color="g")
ax1.plot(t_wave, y_wave, color="g")
ax1.tick_params(axis="y", labelcolor="g")
ax2 = ax1.twinx()
ax2.set_ylabel(f"{name_picker}", color="r")
ax2.plot(t_wave, y_picker, color="r")
ax2.tick_params(axis="y", labelcolor="r")
plt.axvline(t_wave[index_picker], color="k", linestyle=":")
plt.show()
Hinkley Criterion¶
hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=5)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")

The negative trend correlates to the chosen alpha value and can influence the results strongly. Results with alpha = 50 (less negative trend):
hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=50)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")

Akaike Information Criterion (AIC)¶
aic_arr, aic_index = vae.timepicker.aic(y)
plot(t, y, aic_arr, aic_index, "Hinkley Criterion")

Energy Ratio¶
er_arr, er_index = vae.timepicker.energy_ratio(y)
plot(t, y, er_arr, er_index, "Energy Ratio")

Modified Energy Ratio¶
mer_arr, mer_index = vae.timepicker.modified_energy_ratio(y)
plot(t, y, mer_arr, mer_index, "Modified Energy Ratio")

Performance comparison¶
All timepicker implementations are using Numba for just-in-time (JIT) compilations. Usually the first function call is slow, because it will trigger the JIT compiler. To compare the performance to a native or numpy implementation, the average of multiple executions should be compared.
def timeit(callable, loops=100):
time_start = time.perf_counter()
for _ in range(loops):
callable()
return 1e6 * (time.perf_counter() - time_start) / loops # elapsed time in µs
timer_results = {
"Hinkley": timeit(lambda: vae.timepicker.hinkley(y, 5)),
"AIC": timeit(lambda: vae.timepicker.aic(y)),
"Energy Ratio": timeit(lambda: vae.timepicker.energy_ratio(y)),
"Modified Energy Ratio": timeit(lambda: vae.timepicker.modified_energy_ratio(y)),
}
for name, time in timer_results.items():
print(f"{name}: {time:0.3f} µs")
plt.figure(figsize=(8, 3), tight_layout=True)
plt.bar(timer_results.keys(), timer_results.values())
plt.ylabel("Time [µs]")
plt.show()

Out:
Hinkley: 10.441 µs
AIC: 139.615 µs
Energy Ratio: 13.925 µs
Modified Energy Ratio: 16.121 µs
Total running time of the script: ( 0 minutes 3.301 seconds)
Note
Click here to download the full example code
Timepicker batch processing¶
Following examples shows how to stream transient data row by row, compute timepicker results and save the results to a feature database (trfdb).
import os
from shutil import copyfile
from tempfile import gettempdir
import matplotlib.pyplot as plt
import pandas as pd
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")
TRFDB = os.path.join(HERE, "steel_plate/sample.trfdb")
TRFDB_TMP = os.path.join(gettempdir(), "sample.trfdb")
Open tradb (readonly) and trfdb (readwrite)¶
copyfile(TRFDB, TRFDB_TMP) # copy trfdb, so we don't overwrite it
tradb = vae.io.TraDatabase(TRADB)
trfdb = vae.io.TrfDatabase(TRFDB_TMP, mode="rw") # allow writing
Read current trfdb¶
trfdb.read()
Compute arrival time offsets with different timepickers¶
To improve localisation, time of arrival estimates using the first threshold crossing can be refined with timepickers. Therefore, arrival time offsets between the first threshold crossings and the timepicker results are computed.
def dt_from_timepicker(timepicker_func, tra: vae.io.TraRecord):
# Index of the first threshold crossing is equal to the pretrigger samples
index_ref = tra.pretrigger
# Only analyse signal until peak amplitude
index_peak = vae.features.peak_amplitude_index(tra.data)
data = tra.data[:index_peak]
# Get timepicker result
_, index_timepicker = timepicker_func(data)
# Compute offset in µs
dt_us = (index_timepicker - index_ref) * 1e6 / tra.samplerate
return dt_us
Transient data is streamed from the database row by row using vallenae.io.TraDatabase.iread
.
Only one transient data set is loaded into memory at a time.
That makes the streaming interface ideal for batch processing.
The timepicker results are saved to the trfdb using vallenae.io.TrfDatabase.write
.
for tra in tradb.iread():
# Calculate arrival time offsets with different timepickers
feature_set = vae.io.FeatureRecord(
trai=tra.trai,
features={
"ATO_Hinkley": dt_from_timepicker(vae.timepicker.hinkley, tra),
"ATO_AIC": dt_from_timepicker(vae.timepicker.aic, tra),
"ATO_ER": dt_from_timepicker(vae.timepicker.energy_ratio, tra),
"ATO_MER": dt_from_timepicker(vae.timepicker.modified_energy_ratio, tra),
}
)
# Save results to trfdb
trfdb.write(feature_set)
Read results from trfdb¶
trfdb.read().filter(regex="ATO")
Plot results¶
ax = trfdb.read()[["ATO_Hinkley", "ATO_AIC", "ATO_ER", "ATO_MER"]].plot.barh()
ax.invert_yaxis()
ax.set_xlabel("Arrival time offset [µs]")

Out:
Text(0.5, 0, 'Arrival time offset [µs]')
Plot waveforms and arrival times¶
_, axes = plt.subplots(4, 1, tight_layout=True, figsize=(8, 8))
for row, ax in zip(trfdb.read().itertuples(), axes):
trai = row.Index
# read waveform from tradb
y, t = tradb.read_wave(trai)
# plot waveform
ax.plot(t[400:1000] * 1e6, y[400:1000] * 1e3, "k") # crop and convert to µs/mV
ax.set_title(f"trai = {trai}")
ax.set_xlabel("Time [µs]")
ax.set_ylabel("Amplitude [mV]")
ax.label_outer()
# plot arrival time offsets
ax.axvline(row.ATO_Hinkley, color="C0")
ax.axvline(row.ATO_AIC, color="C1")
ax.axvline(row.ATO_ER, color="C2")
ax.axvline(row.ATO_MER, color="C3")
axes[0].legend(["Waveform", "Hinkley", "AIC", "ER", "MER"])

Out:
<matplotlib.legend.Legend object at 0x7fe74b702dd8>
Use results in VisualAE¶
The computed arrival time offsets can be directly used in VisualAE.
We only need to specify the unit. VisualAE requires them to be in µs.
Units and other column-related meta data is saved in the trf_fieldinfo table.
Field infos can be retrieved with vallenae.io.TrfDatabase.fieldinfo
:
trfdb.fieldinfo()
Out:
{'FFT_CoG': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(C.o.Gravity)', 'Description': 'Center of gravity of spectrum', 'ShortName': None, 'FormatStr': None}, 'FFT_FoM': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(max. Amp.)', 'Description': 'Frequency of maximum of spectrum', 'ShortName': None, 'FormatStr': None}, 'PA': {'SetTypes': 8, 'Unit': '[mV]', 'LongName': 'Peak Amplitude', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'RT': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Rise Time', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'Dur': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Duration (available)', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'CTP': {'SetTypes': 8, 'Unit': None, 'LongName': 'Cnts to peak', 'Description': None, 'ShortName': None, 'FormatStr': '#'}, 'FI': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Initiation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'FR': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Reverberation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}}
Show results as table:
pd.DataFrame(trfdb.fieldinfo())
Write units to trfdb¶
Field infos can be written with vallenae.io.TrfDatabase.write_fieldinfo
:
trfdb.write_fieldinfo("ATO_Hinkley", {"Unit": "[µs]", "LongName": "Arrival Time Offset (Hinkley)"})
trfdb.write_fieldinfo("ATO_AIC", {"Unit": "[µs]", "LongName": "Arrival Time Offset (AIC)"})
trfdb.write_fieldinfo("ATO_ER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (ER)"})
trfdb.write_fieldinfo("ATO_MER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (MER)"})
pd.DataFrame(trfdb.fieldinfo()).filter(regex="ATO")
Load results in VisualAE¶
Time arrival offsets can be specified in the settings of Location Processors - Channel Positions - Arrival Time Offset. (Make sure to rename the generated trfdb to match the filename of the pridb.)

Total running time of the script: ( 0 minutes 3.426 seconds)
Note
Click here to download the full example code
Localisation¶

Out:
Hits: 0%| | 0/4 [00:00<?, ?it/s]
Hits: 100%|##########| 4/4 [00:00<00:00, 9300.01it/s]
Runtime for 1 call to differential_evolution(): 0.7701 s
fun: 0.0011158878962079443
jac: array([-0.00014075, 0.00012794])
message: 'Optimization terminated successfully.'
nfev: 7775
nit: 96
success: True
x: array([0.22165281, 0.36565923])
import math
import os
import time
import xml.etree.ElementTree as ElementTree
from typing import Dict, Optional, Tuple
import matplotlib.pyplot as plt
import numpy as np
from numba import f8, njit
from numpy.linalg import norm
from scipy.optimize import differential_evolution
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
SETUP = os.path.join(HERE, "steel_plate/sample.vaex")
PRIDB = os.path.join(HERE, "steel_plate/sample.pridb")
NUMBER_SENSORS = 4
plt.ioff() # Turn interactive mode off; plt.show() is blocking
@njit(f8(f8[:], f8, f8[:, :], f8[:]))
def lucy_error_fun(
test_pos: np.ndarray,
speed: float,
sens_poss: np.ndarray,
measured_delta_ts: np.ndarray,
) -> float:
"""
Implementation of the LUCY computation in 2D as documented in
the Vallen online help.
Args:
test_pos: Emitter position to test.
speed: Assumed speed of sound in a plate-like structure.
sens_poss: Sensor positions, often a 4x2 array, has to match
the sorting of the delta-ts.
measured_delta_ts: The measured time differences in seconds, has to
match the order of the sensor positions.
Returns:
The LUCY value as a float. Ideally 0, in practice never 0, always positive.
"""
m = len(measured_delta_ts)
n = m + 1
measured_delta_dists = speed * measured_delta_ts
theo_dists = np.zeros(n)
theo_delta_dists = np.zeros(m)
for i in range(n):
theo_dists[i] = norm(test_pos - sens_poss[i, :])
for i in range(m):
theo_delta_dists[i] = theo_dists[i + 1] - theo_dists[0]
# LUCY definition taken from the vallen online help:
lucy_val = norm(theo_delta_dists - measured_delta_dists) / math.sqrt(n - 1)
return lucy_val
def get_channel_positions(setup_file: str) -> Dict[int, Tuple[float, float]]:
tree = ElementTree.parse(setup_file)
nodes = tree.getroot().findall(".//ChannelPos")
if nodes is None:
raise RuntimeError("Can not retrieve channel positions from %s", setup_file)
channel_positions = {
int(elem.get("Chan")): (float(elem.get("X")), float(elem.get("Y"))) # type: ignore
for elem in nodes if elem is not None
}
return channel_positions
def get_velocity(setup_file: str) -> Optional[float]:
tree = ElementTree.parse(setup_file)
node = tree.getroot().find(".//Location")
if node is not None:
velocity_str = node.get("Velocity")
if velocity_str is not None:
return float(velocity_str) * 1e3 # convert to m/s
raise RuntimeError("Can not retrieve velocity from %s", setup_file)
def main():
# Consts plotting
text_delta_y = 0.03
text_delta_x = -0.12
# Consts LUCY grid
grid_delta = 0.01
location_search_bounds = [(0.0, 0.80), (0.0, 0.80)]
# Read from pridb
pridb = vae.io.PriDatabase(PRIDB)
hits = pridb.read_hits()
pridb.close()
channel_order = hits["channel"].to_numpy()
arrival_times = hits["time"].to_numpy()
delta_ts = (arrival_times - arrival_times[0])[1:]
# Get localisation parameters from .vaex file
velocity = get_velocity(SETUP)
pos_dict = get_channel_positions(SETUP)
# Order sensor positions by hit occurence
pos_ordered = np.array([pos_dict[ch] for ch in channel_order])
# Compute heatmap
lucy_instance_2args = lambda x, y: lucy_error_fun(
np.array([x, y]), velocity, pos_ordered, delta_ts
)
x_range = np.arange(location_search_bounds[0][0], location_search_bounds[0][1], grid_delta)
y_range = x_range
x_grid, y_grid = np.meshgrid(x_range, y_range)
z_grid = np.vectorize(lucy_instance_2args)(x_grid, y_grid)
# Plot heatmap
plt.figure(tight_layout=True)
plt.pcolormesh(x_grid, y_grid, z_grid, cmap="cool")
plt.colorbar()
plt.title("Location Result and LUCY-Heatmap")
plt.xlabel("x [m]")
plt.ylabel("y [m]")
# Compute location
lucy_instance_single_arg = lambda pos: lucy_error_fun(
pos, velocity, pos_ordered, delta_ts
)
start = time.perf_counter()
# These are excessive search / overkill parameters:
location_result = differential_evolution(
lucy_instance_single_arg,
location_search_bounds,
popsize=40,
polish=True,
strategy="rand1bin",
recombination=0.1,
mutation=1.3,
)
end = time.perf_counter()
print(f"Runtime for 1 call to differential_evolution(): {(end - start):0.4} s")
print(location_result)
# Plot location result
x_res = location_result.x[0]
y_res = location_result.x[1]
plt.plot([x_res], [y_res], "bo")
plt.text(
x_res + text_delta_x,
y_res + text_delta_y,
"location result",
fontsize=9,
color="b",
)
# Plot sensor positions
for channel, (x, y) in pos_dict.items():
text = f"S{channel} (x={x:0.2f}m | y={y:0.2f}m)"
plt.scatter(x, y, marker="o", color="w")
plt.text(x + text_delta_x, y + text_delta_y, text, fontsize=9, color="w")
plt.show()
if __name__ == "__main__":
main()
Total running time of the script: ( 0 minutes 2.137 seconds)
Note
Click here to download the full example code
Go fast with multiprocessing¶
The streaming interfaces with iterables allow efficient batch processing as shown here. But still only one core/thread will be utilized. We will change that will multiprocessing.
Following example shows a batch feature extraction procedure using multiple CPU cores.
import os
import time
import multiprocessing
from typing import Dict, Iterable
from itertools import cycle
import __main__
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import vallenae as vae
HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")
Prepare streaming reads¶
tradb = vae.io.TraDatabase(TRADB)
Our sample tradb only contains four data sets. That is not enough data for demonstrating batch processing. Therefore, we will simulate more data by looping over the data sets with following generator/iterable:
def tra_generator(loops: int = 1000) -> Iterable[vae.io.TraRecord]:
for loop, tra in enumerate(cycle(tradb.iread())):
if loop > loops:
break
yield tra
Define feature extraction function¶
Following function will be applied to all data sets and returns computed features:
def feature_extraction(tra: vae.io.TraRecord) -> Dict[str, float]:
# compute random statistical features
return {
"Std": np.std(tra.data),
"Skew": stats.skew(tra.data),
}
# Fix to use pickle serialization in sphinx gallery
setattr(__main__, feature_extraction.__name__, feature_extraction)
Compute with single thread/core¶
Note
The examples are executed on the CI / readthedocs server with limited resources. Therefore, the shown computation times and speedups are below the capability of modern machines.
Run computation in a single thread and get the time:
time_elapsed_ms = lambda t0: 1e3 * (time.perf_counter() - t0)
time_start = time.perf_counter()
for tra in tra_generator():
results = feature_extraction(tra)
# do something with the results
time_single_thread = time_elapsed_ms(time_start)
print(f"Time single thread: {time_single_thread:.2f} ms")
Out:
Time single thread: 1288.00 ms
Compute with multiple processes/cores¶
First get number of available cores in your machine:
cores_total = os.cpu_count()
cores_available = len(os.sched_getaffinity(0)) # https://docs.python.org/3/library/os.html#os.cpu_count
print(f"Available / total CPU cores: {cores_available} / {cores_total}")
Out:
Available / total CPU cores: 4 / 4
But how can we utilize those cores? The common answer for most programming languages is multithreading. Threads run in the same process and heap, so data can be shared between them (with care). Sadly, Python uses a global interpreter lock (GIL) that locks heap memory, because Python objects are not thread-safe. Therefore, threads are blocking each other and no speedups are gained by using multiple threads.
The solution for Python is multiprocessing to work around the GIL. Every process has its own heap and GIL. Multiprocessing will introduce overhead for interprocess communication and data serialization/deserialization. To reduce the overhead, data is sent in bigger chunks.
Run computation on 4 cores with chunks of 128 data sets and get the time / speedup:
with multiprocessing.Pool(4) as pool:
time_start = time.perf_counter()
for results in pool.imap(feature_extraction, tra_generator(), chunksize=128):
pass # do something with the results
time_multiprocessing = time_elapsed_ms(time_start)
print(f"Time multiprocessing: {time_multiprocessing:.2f} ms")
print(f"Speedup: {(time_single_thread / time_multiprocessing):.2f}")
Out:
Time multiprocessing: 457.20 ms
Speedup: 2.82
Variation of the chunksize¶
Following results show how the chunksize impacts the overall performance. The speedup is measured for different chunksizes and plotted against the chunksize:
chunksizes = (10, 40, 60, 80, 100, 120, 140, 160, 200)
speedup_chunksizes = []
with multiprocessing.Pool(4) as pool:
for chunksize in chunksizes:
time_start = time.perf_counter()
for results in pool.imap(feature_extraction, tra_generator(), chunksize=chunksize):
pass # do something with the results
speedup_chunksizes.append(time_single_thread / time_elapsed_ms(time_start))
plt.figure(tight_layout=True, figsize=(6, 3))
plt.plot(chunksizes, speedup_chunksizes)
plt.xlabel("Chunksize")
plt.ylabel("Speedup")

Out:
Text(0, 0.5, 'Speedup')
Total running time of the script: ( 0 minutes 10.336 seconds)
ToDos¶
Todo
Add exception
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.1.0/lib/python3.7/site-packages/vallenae/features/acoustic_emission.py:docstring of vallenae.features.rms, line 11.)
Todo
Status flag
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.1.0/lib/python3.7/site-packages/vallenae/io/pridb.py:docstring of vallenae.io.PriDatabase.write_hit, line 10.)
Todo
Status flag
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.1.0/lib/python3.7/site-packages/vallenae/io/pridb.py:docstring of vallenae.io.PriDatabase.write_parametric, line 10.)
Todo
Status flag
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.1.0/lib/python3.7/site-packages/vallenae/io/pridb.py:docstring of vallenae.io.PriDatabase.write_status, line 10.)
Todo
Status flag
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.1.0/lib/python3.7/site-packages/vallenae/io/tradb.py:docstring of vallenae.io.TraDatabase.write, line 9.)
Todo
Remove RMS
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.1.0/lib/python3.7/site-packages/vallenae/io/datatypes.py:docstring of vallenae.io.TraRecord, line 3.)
Todo
Weak performance, if used with default parameter alpha
(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.1.0/lib/python3.7/site-packages/vallenae/__init__.py:docstring of vallenae.timepicker.hinkley, line 22.)