Vallen AE Python Tools

Extract and analyze Acoustic Emission measurement data.

The IO module vallenae.io enables reading (and writing) of Vallen Systeme SQLite database files:

  • *.pridb: Primary database

  • *.tradb: Transient data

  • *.trfdb: Transient features

The remaining modules are system-independent and try to comprise the most common state-of-the-art algorithms in Acoustic Emission:

IO

Read/write Vallen Systeme database and setup files.

Database classes

Classes to read/write pridb, tradb and trfdb database files.

Warning

Writing is still experimental

PriDatabase(filename[, mode])

IO Wrapper for pridb database file.

TraDatabase(filename[, mode, compression])

IO Wrapper for tradb database file.

TrfDatabase(filename[, mode])

IO Wrapper for trfdb (transient feature) database file.

All database classes implement two different interfaces to access data:

Standard read_*

Read data to pandas.DataFrame, e.g. with PriDatabase.read_hits

>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb")
>>> df = pridb.read_hits()  # save all hits to pandas dataframe
>>> df[["time", "channel"]]  # output columns hit and channel
            time  channel
set_id
10      3.992771        3
11      3.992775        2
12      3.992813        4
13      3.992814        1

Streaming iread_*

Iterate through the data row by row. This is a memory-efficient solution ideal for batch processing. The return types are specific typing.NamedTuple, see Data types.

Example with PriDatabase.iread_hits:

>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb")
>>> for hit in pridb.iread_hits():
...     print(f"time: {hit.time:0.4f}, channel: {hit.channel}")
...
time: 3.9928,   channel: 3
time: 3.9928,   channel: 2
time: 3.9928,   channel: 4
time: 3.9928,   channel: 1
>>> type(hit)
<class 'vallenae.io.datatypes.HitRecord'>

Data types

Records of the database are represented as typing.NamedTuple. Each record implements a class method from_sql to init from a SQLite row dictionary (column name: value).

HitRecord(time, channel, param_id, …)

Hit record in pridb (SetType = 2).

MarkerRecord(time, set_type, data, number, …)

Marker record in pridb (SetType = 4, 5, 6).

StatusRecord(time, channel, param_id, …)

Status data record in pridb (SetType = 3).

ParametricRecord(time, param_id, set_id, …)

Parametric data record in pridb (SetType = 1).

TraRecord(time, channel, param_id, …)

Transient data record in tradb.

FeatureRecord(trai, features, float])

Transient feature record in trfdb.

Compression

Transient signals in the tradb are stored as BLOBs of 16-bit ADC values – either uncompressed or compressed (FLAC). Following functions convert between BLOBs and arrays of voltage values.

decode_data_blob(data_blob, data_format, …)

Decodes (compressed) 16-bit ADC values from BLOB to array of voltage values.

encode_data_blob(data, data_format, …)

Encodes array of voltage values to BLOB of 16-bit ADC values for memory-efficient storage.

Features

Acoustic Emission

peak_amplitude(data)

Compute maximum absolute amplitude.

peak_amplitude_index(data)

Compute index of peak amplitude.

is_above_threshold(data, threshold)

Checks if absolute amplitudes are above threshold.

first_threshold_crossing(data, threshold)

Compute index of first threshold crossing.

rise_time(data, threshold, samplerate[, …])

Compute the rise time.

energy(data, samplerate)

Compute the energy of a hit.

signal_strength(data, samplerate)

Compute the signal strength of a hit.

counts(data, threshold)

Compute the number of positive threshold crossings of a hit (counts).

rms(data)

Compute the root mean square (RMS) of an array.

Conversion

amplitude_to_db(amplitude[, reference])

Convert amplitude from volts to decibel (dB).

db_to_amplitude(amplitude_db[, reference])

Convert amplitude from decibel (dB) to volts.

Timerpicker

The determination of signal arrival times has a major influence on the localization accuracy. Usually, arrival times are determined by the first threshold crossing (either fixed or adaptive). Following popular methods have been proposed in the past to automatically pick time of arrivals:

hinkley(arr[, alpha])

Hinkley criterion for arrival time estimation.

aic(arr)

Akaike Information Criterion (AIC) for arrival time estimation.

energy_ratio(arr[, win_len])

Energy ratio for arrival time estimation.

modified_energy_ratio(arr[, win_len])

Modified energy ratio method for arrival time estimation.

Examples

A collection of examples how to read and analyse Acoustic Emission data.

Read pridb

import os

import matplotlib.pyplot as plt

import vallenae as vae

HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
PRIDB = os.path.join(HERE, "steel_plate/sample.pridb")

Open pridb

pridb = vae.io.PriDatabase(PRIDB)

print("Tables in database: ", pridb.tables())
print("Number of rows in data table (ae_data): ", pridb.rows())
print("Set of all channels: ", pridb.channel())

Out:

Tables in database:  {'ae_params', 'acq_setup', 'ae_fieldinfo', 'data_integrity', 'ae_globalinfo', 'ae_data', 'ae_markers'}
Number of rows in data table (ae_data):  18
Set of all channels:  {1, 2, 3, 4}

Read hits to Pandas DataFrame

df_hits = pridb.read_hits()
# Print a few columns
print(df_hits[["time", "channel", "amplitude", "counts", "energy"]])

Out:

Hits:   0%|          | 0/4 [00:00<?, ?it/s]
Hits: 100%|##########| 4/4 [00:00<00:00, 10699.76it/s]
            time  channel  amplitude  counts        energy
set_id
10      3.992771        3   0.046539    2180  2.799510e+07
11      3.992775        2   0.059621    2047  2.276279e+07
12      3.992813        4   0.034119    1854  1.286700e+07
13      3.992814        1   0.029115    1985  1.265275e+07

Query Pandas DataFrame

DataFrames offer powerful features to query and aggregate data, e.g. plot summed energy per channel

ax = df_hits.groupby("channel").sum()["energy"].plot.bar(figsize=(8, 3))
ax.set_xlabel("Channel")
ax.set_ylabel("Summed Energy [eu = 1e-14 V²s]")
plt.tight_layout()
plt.show()
ex1 read pridb

Read markers

df_markers = pridb.read_markers()
print(df_markers)

Out:

Marker:   0%|          | 0/5 [00:00<?, ?it/s]
Marker: 100%|##########| 5/5 [00:00<00:00, 17246.32it/s]
          time  set_type                                        data  number
set_id
1         0.00         6                                                   1
2         0.00         4                                10:52 Resume       1
3         0.00         5                         2019-09-20 10:54:52    <NA>
4         0.00         4  TimeZone: +02:00 (W. Europe Standard Time)       2
18      100.07         4                               10:56 Suspend       3

Read parametric data

df_parametric = pridb.read_parametric()
print(df_parametric)

Out:

Parametric:   0%|          | 0/9 [00:00<?, ?it/s]
Parametric: 100%|##########| 9/9 [00:00<00:00, 14932.25it/s]
        time  param_id  pctd  pcta
set_id
5       0.00         1     0     0
6       1.00         1     0     0
7       2.00         1     0     0
8       3.00         1     0     0
9       3.99         1     0     0
14      4.00         1     0     0
15      5.00         1     0     0
16      6.00         1     0     0
17      6.45         1     0     0

Total running time of the script: ( 0 minutes 0.218 seconds)

Gallery generated by Sphinx-Gallery

Read and plot transient data

Transient Wave Plot; trai = 4
import os

import matplotlib.pyplot as plt

import vallenae as vae

HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")  # uncompressed
TRAI = 4  # just an example, no magic here


def main():
    # Read waveform from tradb
    with vae.io.TraDatabase(TRADB) as tradb:
        y, t = tradb.read_wave(TRAI)

    y *= 1e3  # in mV
    t *= 1e6  # for µs

    # Plot waveforms
    plt.figure(figsize=(8, 4), tight_layout=True)
    plt.plot(t, y)
    plt.xlabel("Time [µs]")
    plt.ylabel("Amplitude [mV]")
    plt.title(f"Transient Wave Plot; trai = {TRAI}")
    plt.show()


if __name__ == "__main__":
    main()

Total running time of the script: ( 0 minutes 0.191 seconds)

Gallery generated by Sphinx-Gallery

Timepicker

Following example showcases the results of different timepicking methods. For more informations, please refer to the functions documentation (vallenae.timepicker).

import os
import time

import matplotlib.pyplot as plt
import numpy as np

import vallenae as vae

HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")

TRAI = 4
SAMPLES = 2000

plt.ioff()  # Turn interactive mode off; plt.show() is blocking

Out:

<matplotlib.pyplot._IoffContext object at 0x7fe5ed22e750>

Read waveform from tradb

tradb = vae.io.TraDatabase(TRADB)

y, t = tradb.read_wave(TRAI)
# crop first samples
t = t[:SAMPLES]
y = y[:SAMPLES]
# unit conversion
t *= 1e6  # convert to µs
y *= 1e3  # convert to mV

Prepare plotting with time-picker results

def plot(t_wave, y_wave, y_picker, index_picker, name_picker):
    _, ax1 = plt.subplots(figsize=(8, 4), tight_layout=True)
    ax1.set_xlabel("Time [µs]")
    ax1.set_ylabel("Amplitude [mV]", color="g")
    ax1.plot(t_wave, y_wave, color="g")
    ax1.tick_params(axis="y", labelcolor="g")

    ax2 = ax1.twinx()
    ax2.set_ylabel(f"{name_picker}", color="r")
    ax2.plot(t_wave, y_picker, color="r")
    ax2.tick_params(axis="y", labelcolor="r")

    plt.axvline(t_wave[index_picker], color="k", linestyle=":")
    plt.show()

Hinkley Criterion

hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=5)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")
ex3 timepicker

The negative trend correlates to the chosen alpha value and can influence the results strongly. Results with alpha = 50 (less negative trend):

hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=50)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")
ex3 timepicker

Akaike Information Criterion (AIC)

aic_arr, aic_index = vae.timepicker.aic(y)
plot(t, y, aic_arr, aic_index, "Akaike Information Criterion")
ex3 timepicker

Energy Ratio

er_arr, er_index = vae.timepicker.energy_ratio(y)
plot(t, y, er_arr, er_index, "Energy Ratio")
ex3 timepicker

Modified Energy Ratio

mer_arr, mer_index = vae.timepicker.modified_energy_ratio(y)
plot(t, y, mer_arr, mer_index, "Modified Energy Ratio")
ex3 timepicker

Performance comparison

All timepicker implementations are using Numba for just-in-time (JIT) compilations. Usually the first function call is slow, because it will trigger the JIT compiler. To compare the performance to a native or numpy implementation, the average of multiple executions should be compared.

def timeit(callable, loops=100):
    time_start = time.perf_counter()
    for _ in range(loops):
        callable()
    return 1e6 * (time.perf_counter() - time_start) / loops  # elapsed time in µs

timer_results = {
    "Hinkley": timeit(lambda: vae.timepicker.hinkley(y, 5)),
    "AIC": timeit(lambda: vae.timepicker.aic(y)),
    "Energy Ratio": timeit(lambda: vae.timepicker.energy_ratio(y)),
    "Modified Energy Ratio": timeit(lambda: vae.timepicker.modified_energy_ratio(y)),
}

for name, time in timer_results.items():
    print(f"{name}: {time:0.3f} µs")

plt.figure(figsize=(8, 3), tight_layout=True)
plt.bar(timer_results.keys(), timer_results.values())
plt.ylabel("Time [µs]")
plt.show()
ex3 timepicker

Out:

Hinkley: 9.851 µs
AIC: 130.018 µs
Energy Ratio: 12.306 µs
Modified Energy Ratio: 16.051 µs

Total running time of the script: ( 0 minutes 3.054 seconds)

Gallery generated by Sphinx-Gallery

Timepicker batch processing

Following examples shows how to stream transient data row by row, compute timepicker results and save the results to a feature database (trfdb).

import os
from shutil import copyfile
from tempfile import gettempdir

import matplotlib.pyplot as plt
import pandas as pd

import vallenae as vae

HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")
TRFDB = os.path.join(HERE, "steel_plate/sample.trfdb")
TRFDB_TMP = os.path.join(gettempdir(), "sample.trfdb")

Open tradb (readonly) and trfdb (readwrite)

copyfile(TRFDB, TRFDB_TMP)  # copy trfdb, so we don't overwrite it

tradb = vae.io.TraDatabase(TRADB)
trfdb = vae.io.TrfDatabase(TRFDB_TMP, mode="rw")  # allow writing

Read current trfdb

print(trfdb.read())

Out:

Trf:   0%|          | 0/4 [00:00<?, ?it/s]
Trf: 100%|##########| 4/4 [00:00<00:00, 83886.08it/s]
         FFT_CoG     FFT_FoM         PA  ...  CTP          FI          FR
trai                                     ...
1     147.705078  134.277344  46.483864  ...   11  222.672058  110.182449
2     144.042969  139.160156  59.450512  ...   35  182.291672   98.019981
3     155.029297  164.794922  33.995209  ...   55  155.191879   95.493233
4     159.912109  139.160156  29.114828  ...   29  181.023727  101.906227

[4 rows x 8 columns]

Compute arrival time offsets with different timepickers

To improve localisation, time of arrival estimates using the first threshold crossing can be refined with timepickers. Therefore, arrival time offsets between the first threshold crossings and the timepicker results are computed.

def dt_from_timepicker(timepicker_func, tra: vae.io.TraRecord):
    # Index of the first threshold crossing is equal to the pretrigger samples
    index_ref = tra.pretrigger
    # Only analyse signal until peak amplitude
    index_peak = vae.features.peak_amplitude_index(tra.data)
    data = tra.data[:index_peak]
    # Get timepicker result
    _, index_timepicker = timepicker_func(data)
    # Compute offset in µs
    dt_us = (index_timepicker - index_ref) * 1e6 / tra.samplerate
    return dt_us

Transient data is streamed from the database row by row using vallenae.io.TraDatabase.iread. Only one transient data set is loaded into memory at a time. That makes the streaming interface ideal for batch processing. The timepicker results are saved to the trfdb using vallenae.io.TrfDatabase.write.

for tra in tradb.iread():
    # Calculate arrival time offsets with different timepickers
    feature_set = vae.io.FeatureRecord(
        trai=tra.trai,
        features={
            "ATO_Hinkley": dt_from_timepicker(vae.timepicker.hinkley, tra),
            "ATO_AIC": dt_from_timepicker(vae.timepicker.aic, tra),
            "ATO_ER": dt_from_timepicker(vae.timepicker.energy_ratio, tra),
            "ATO_MER": dt_from_timepicker(vae.timepicker.modified_energy_ratio, tra),
        }
    )
    # Save results to trfdb
    trfdb.write(feature_set)

Read results from trfdb

print(trfdb.read().filter(regex="ATO"))

Out:

Trf:   0%|          | 0/4 [00:00<?, ?it/s]
Trf: 100%|##########| 4/4 [00:00<00:00, 78766.27it/s]
      ATO_Hinkley  ATO_AIC  ATO_ER  ATO_MER
trai
1            30.4     -1.8    -4.0     29.2
2            36.4     -0.8    -1.0     -0.4
3            67.0     -1.8    -3.2     60.4
4            65.8     -1.0    -2.2     64.2

Plot results

ax = trfdb.read()[["ATO_Hinkley", "ATO_AIC", "ATO_ER", "ATO_MER"]].plot.barh()
ax.invert_yaxis()
ax.set_xlabel("Arrival time offset [µs]")
plt.show()
ex4 timepicker batch

Out:

Trf:   0%|          | 0/4 [00:00<?, ?it/s]
Trf: 100%|##########| 4/4 [00:00<00:00, 98689.51it/s]

Plot waveforms and arrival times

_, axes = plt.subplots(4, 1, tight_layout=True, figsize=(8, 8))
for row, ax in zip(trfdb.read().itertuples(), axes):
    trai = row.Index

    # read waveform from tradb
    y, t = tradb.read_wave(trai)

    # plot waveform
    ax.plot(t[400:1000] * 1e6, y[400:1000] * 1e3, "k")  # crop and convert to µs/mV
    ax.set_title(f"trai = {trai}")
    ax.set_xlabel("Time [µs]")
    ax.set_ylabel("Amplitude [mV]")
    ax.label_outer()
    # plot arrival time offsets
    ax.axvline(row.ATO_Hinkley, color="C0")
    ax.axvline(row.ATO_AIC, color="C1")
    ax.axvline(row.ATO_ER, color="C2")
    ax.axvline(row.ATO_MER, color="C3")

axes[0].legend(["Waveform", "Hinkley", "AIC", "ER", "MER"])
plt.show()
trai = 1, trai = 2, trai = 3, trai = 4

Out:

Trf:   0%|          | 0/4 [00:00<?, ?it/s]
Trf: 100%|##########| 4/4 [00:00<00:00, 77314.36it/s]

Use results in VisualAE

The computed arrival time offsets can be directly used in VisualAE. We only need to specify the unit. VisualAE requires them to be in µs. Units and other column-related meta data is saved in the trf_fieldinfo table. Field infos can be retrieved with vallenae.io.TrfDatabase.fieldinfo:

print(trfdb.fieldinfo())

Out:

{'FFT_CoG': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(C.o.Gravity)', 'Description': 'Center of gravity of spectrum', 'ShortName': None, 'FormatStr': None}, 'FFT_FoM': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(max. Amp.)', 'Description': 'Frequency of maximum of spectrum', 'ShortName': None, 'FormatStr': None}, 'PA': {'SetTypes': 8, 'Unit': '[mV]', 'LongName': 'Peak Amplitude', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'RT': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Rise Time', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'Dur': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Duration (available)', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'CTP': {'SetTypes': 8, 'Unit': None, 'LongName': 'Cnts to peak', 'Description': None, 'ShortName': None, 'FormatStr': '#'}, 'FI': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Initiation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'FR': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Reverberation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}}

Show results as table:

print(pd.DataFrame(trfdb.fieldinfo()))

Out:

                                   FFT_CoG  ...                   FR
SetTypes                                 2  ...                    8
Unit                                 [kHz]  ...                [kHz]
LongName                    F(C.o.Gravity)  ...  Reverberation Freq.
Description  Center of gravity of spectrum  ...                 None
ShortName                             None  ...                 None
FormatStr                             None  ...                 None

[6 rows x 8 columns]
Write units to trfdb

Field infos can be written with vallenae.io.TrfDatabase.write_fieldinfo:

trfdb.write_fieldinfo("ATO_Hinkley", {"Unit": "[µs]", "LongName": "Arrival Time Offset (Hinkley)"})
trfdb.write_fieldinfo("ATO_AIC", {"Unit": "[µs]", "LongName": "Arrival Time Offset (AIC)"})
trfdb.write_fieldinfo("ATO_ER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (ER)"})
trfdb.write_fieldinfo("ATO_MER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (MER)"})

print(pd.DataFrame(trfdb.fieldinfo()).filter(regex="ATO"))

Out:

                               ATO_Hinkley  ...                    ATO_MER
SetTypes                              None  ...                       None
Unit                                  [µs]  ...                       [µs]
LongName     Arrival Time Offset (Hinkley)  ...  Arrival Time Offset (MER)
Description                           None  ...                       None
ShortName                             None  ...                       None
FormatStr                             None  ...                       None

[6 rows x 4 columns]
Load results in VisualAE

Time arrival offsets can be specified in the settings of Location Processors - Channel Positions - Arrival Time Offset. (Make sure to rename the generated trfdb to match the filename of the pridb.)

_images/vae_arrival_time_offset.png

Total running time of the script: ( 0 minutes 0.874 seconds)

Gallery generated by Sphinx-Gallery

Localisation

Location Result and LUCY-Heatmap

Out:

Hits:   0%|          | 0/4 [00:00<?, ?it/s]
Hits: 100%|##########| 4/4 [00:00<00:00, 13662.23it/s]
/home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/checkouts/0.5.3/examples/ex5_location.py:122: MatplotlibDeprecationWarning: shading='flat' when X and Y have the same dimensions as C is deprecated since 3.3.  Either specify the corners of the quadrilaterals with X and Y, or pass shading='auto', 'nearest' or 'gouraud', or set rcParams['pcolor.shading'].  This will become an error two minor releases later.
  plt.pcolormesh(x_grid, y_grid, z_grid, cmap="cool")
Runtime for 1 call to differential_evolution(): 0.5579 s
     fun: 0.0011158878601174756
     jac: array([4.56232274e-08, 5.05888734e-08])
 message: 'Optimization terminated successfully.'
    nfev: 7618
     nit: 94
 success: True
       x: array([0.22165315, 0.36565903])

import math
import os
import time
import xml.etree.ElementTree as ElementTree
from typing import Dict, Optional, Tuple

import matplotlib.pyplot as plt
import numpy as np
from numba import f8, njit
from numpy.linalg import norm
from scipy.optimize import differential_evolution

import vallenae as vae

HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
SETUP = os.path.join(HERE, "steel_plate/sample.vaex")
PRIDB = os.path.join(HERE, "steel_plate/sample.pridb")
NUMBER_SENSORS = 4


@njit(f8(f8[:], f8, f8[:, :], f8[:]))
def lucy_error_fun(
    test_pos: np.ndarray,
    speed: float,
    sens_poss: np.ndarray,
    measured_delta_ts: np.ndarray,
) -> float:
    """
    Implementation of the LUCY computation in 2D as documented in
    the Vallen online help.

    Args:
        test_pos: Emitter position to test.
        speed: Assumed speed of sound in a plate-like structure.
        sens_poss: Sensor positions, often a 4x2 array, has to match
            the sorting of the delta-ts.
        measured_delta_ts: The measured time differences in seconds, has to
            match the order of the sensor positions.

    Returns:
        The LUCY value as a float. Ideally 0, in practice never 0, always positive.
    """
    m = len(measured_delta_ts)
    n = m + 1
    measured_delta_dists = speed * measured_delta_ts
    theo_dists = np.zeros(n)
    theo_delta_dists = np.zeros(m)
    for i in range(n):
        theo_dists[i] = norm(test_pos - sens_poss[i, :])
    for i in range(m):
        theo_delta_dists[i] = theo_dists[i + 1] - theo_dists[0]

    # LUCY definition taken from the vallen online help:
    lucy_val = norm(theo_delta_dists - measured_delta_dists) / math.sqrt(n - 1)
    return lucy_val


def get_channel_positions(setup_file: str) -> Dict[int, Tuple[float, float]]:
    tree = ElementTree.parse(setup_file)
    nodes = tree.getroot().findall(".//ChannelPos")
    if nodes is None:
        raise RuntimeError("Can not retrieve channel positions from %s", setup_file)
    channel_positions = {
        int(elem.get("Chan")): (float(elem.get("X")), float(elem.get("Y")))  # type: ignore
        for elem in nodes if elem is not None
    }
    return channel_positions


def get_velocity(setup_file: str) -> Optional[float]:
    tree = ElementTree.parse(setup_file)
    node = tree.getroot().find(".//Location")
    if node is not None:
        velocity_str = node.get("Velocity")
        if velocity_str is not None:
            return float(velocity_str) * 1e3  # convert to m/s
    raise RuntimeError("Can not retrieve velocity from %s", setup_file)


def main():
    # Consts plotting
    text_delta_y = 0.03
    text_delta_x = -0.12

    # Consts LUCY grid
    grid_delta = 0.01
    location_search_bounds = [(0.0, 0.80), (0.0, 0.80)]

    # Read from pridb
    pridb = vae.io.PriDatabase(PRIDB)
    hits = pridb.read_hits()
    pridb.close()

    channel_order = hits["channel"].to_numpy()
    arrival_times = hits["time"].to_numpy()
    delta_ts = (arrival_times - arrival_times[0])[1:]

    # Get localisation parameters from .vaex file
    velocity = get_velocity(SETUP)
    pos_dict = get_channel_positions(SETUP)

    # Order sensor positions by hit occurence
    pos_ordered = np.array([pos_dict[ch] for ch in channel_order])

    # Compute heatmap
    lucy_instance_2args = lambda x, y: lucy_error_fun(
        np.array([x, y]), velocity, pos_ordered, delta_ts
    )

    x_range = np.arange(location_search_bounds[0][0], location_search_bounds[0][1], grid_delta)
    y_range = x_range
    x_grid, y_grid = np.meshgrid(x_range, y_range)
    z_grid = np.vectorize(lucy_instance_2args)(x_grid, y_grid)

    # Plot heatmap
    plt.figure(tight_layout=True)
    plt.pcolormesh(x_grid, y_grid, z_grid, cmap="cool")
    plt.colorbar()
    plt.title("Location Result and LUCY-Heatmap")
    plt.xlabel("x [m]")
    plt.ylabel("y [m]")

    # Compute location
    lucy_instance_single_arg = lambda pos: lucy_error_fun(
        pos, velocity, pos_ordered, delta_ts
    )

    start = time.perf_counter()
    # These are excessive search / overkill parameters:
    location_result = differential_evolution(
        lucy_instance_single_arg,
        location_search_bounds,
        popsize=40,
        polish=True,
        strategy="rand1bin",
        recombination=0.1,
        mutation=1.3,
    )
    end = time.perf_counter()
    print(f"Runtime for 1 call to differential_evolution(): {(end - start):0.4} s")
    print(location_result)

    # Plot location result
    x_res = location_result.x[0]
    y_res = location_result.x[1]
    plt.plot([x_res], [y_res], "bo")
    plt.text(
        x_res + text_delta_x,
        y_res + text_delta_y,
        "location result",
        fontsize=9,
        color="b",
    )

    # Plot sensor positions
    for channel, (x, y) in pos_dict.items():
        text = f"S{channel} (x={x:0.2f}m | y={y:0.2f}m)"
        plt.scatter(x, y, marker="o", color="w")
        plt.text(x + text_delta_x, y + text_delta_y, text, fontsize=9, color="w")

    plt.show()


if __name__ == "__main__":
    main()

Total running time of the script: ( 0 minutes 1.474 seconds)

Gallery generated by Sphinx-Gallery

Go fast with multiprocessing

The streaming interfaces with iterables allow efficient batch processing as shown here. But still only one core/thread will be utilized. We will change that will multiprocessing.

Following example shows a batch feature extraction procedure using multiple CPU cores.

import os
import time
import multiprocessing
from typing import Dict, Iterable
from itertools import cycle
import __main__

import numpy as np
from scipy import stats
import matplotlib.pyplot as plt

import vallenae as vae

HERE = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
TRADB = os.path.join(HERE, "steel_plate/sample_plain.tradb")

Prepare streaming reads

tradb = vae.io.TraDatabase(TRADB)

Our sample tradb only contains four data sets. That is not enough data for demonstrating batch processing. Therefore, we will simulate more data by looping over the data sets with following generator/iterable:

def tra_generator(loops: int = 1000) -> Iterable[vae.io.TraRecord]:
    for loop, tra in enumerate(cycle(tradb.iread())):
        if loop > loops:
            break
        yield tra

Define feature extraction function

Following function will be applied to all data sets and returns computed features:

def feature_extraction(tra: vae.io.TraRecord) -> Dict[str, float]:
    # compute random statistical features
    return {
        "Std": np.std(tra.data),
        "Skew": stats.skew(tra.data),
    }

# Fix to use pickle serialization in sphinx gallery
setattr(__main__, feature_extraction.__name__, feature_extraction)

Compute with single thread/core

Note

The examples are executed on the CI / readthedocs server with limited resources. Therefore, the shown computation times and speedups are below the capability of modern machines.

Run computation in a single thread and get the time:

time_elapsed_ms = lambda t0: 1e3 * (time.perf_counter() - t0)

time_start = time.perf_counter()
for tra in tra_generator():
    results = feature_extraction(tra)
    # do something with the results
time_single_thread = time_elapsed_ms(time_start)

print(f"Time single thread: {time_single_thread:.2f} ms")

Out:

Time single thread: 781.92 ms

Compute with multiple processes/cores

First get number of available cores in your machine:

print(f"Available CPU cores: {os.cpu_count()}")

Out:

Available CPU cores: 2

But how can we utilize those cores? The common answer for most programming languages is multithreading. Threads run in the same process and heap, so data can be shared between them (with care). Sadly, Python uses a global interpreter lock (GIL) that locks heap memory, because Python objects are not thread-safe. Therefore, threads are blocking each other and no speedups are gained by using multiple threads.

The solution for Python is multiprocessing to work around the GIL. Every process has its own heap and GIL. Multiprocessing will introduce overhead for interprocess communication and data serialization/deserialization. To reduce the overhead, data is sent in bigger chunks.

Run computation on 4 cores with chunks of 128 data sets and get the time / speedup:

with multiprocessing.Pool(4) as pool:
    time_start = time.perf_counter()
    for results in pool.imap(feature_extraction, tra_generator(), chunksize=128):
        pass  # do something with the results
    time_multiprocessing = time_elapsed_ms(time_start)

print(f"Time multiprocessing: {time_multiprocessing:.2f} ms")
print(f"Speedup: {(time_single_thread / time_multiprocessing):.2f}")

Out:

Time multiprocessing: 790.07 ms
Speedup: 0.99
Variation of the chunksize

Following results show how the chunksize impacts the overall performance. The speedup is measured for different chunksizes and plotted against the chunksize:

chunksizes = (10, 40, 60, 80, 100, 120, 140, 160, 200)
speedup_chunksizes = []
with multiprocessing.Pool(4) as pool:
    for chunksize in chunksizes:
        time_start = time.perf_counter()
        for results in pool.imap(feature_extraction, tra_generator(), chunksize=chunksize):
            pass  # do something with the results
        speedup_chunksizes.append(time_single_thread / time_elapsed_ms(time_start))

plt.figure(tight_layout=True, figsize=(6, 3))
plt.plot(chunksizes, speedup_chunksizes)
plt.xlabel("Chunksize")
plt.ylabel("Speedup")
plt.show()
ex6 multiprocessing

Total running time of the script: ( 0 minutes 9.293 seconds)

Gallery generated by Sphinx-Gallery

Gallery generated by Sphinx-Gallery

Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

0.5.3 - 2020-05-04

Fixed

  • SQLite URI for absolute linux paths

0.5.2 - 2020-05-04

Fixed

  • SQLite URI for special characters (#, ?)

0.5.1 - 2020-03-25

Fixed

  • Buffering of SQL results in listen methods to allow SQL queries in between

0.5.0 - 2020-03-18

Added

  • Query filter parameter to TrfDatabase.read and TrfDatabase.iread

  • listen method for PriDatabase, TraDatabase and TrfDatabase to retrieve new records live

Changed

  • Order feature records by TRAI for TrfDatabase.read and TrfDatabase.iread

0.4.0 - 2021-02-14

Added

  • CI with GitHub actions on Linux, MacOS and Windows

  • Workflow with GitHub actions to publish to PyPI on new releases

  • pyproject.toml as the main config file for pylint, pytest, tox, coverage, …

Changed

  • Return exact time range with TraDatabase.read_continuous_wave

  • Return “absolute” time axis with TraDatabase.read_continuous_wave (instead of starting at t = 0 s)

Fixed

  • Fix database close if exception raised in __init__ (e.g. file not found)

  • Example ex6_multiprocessing for MacOS

  • Find lower/upper bounds for same values (times) in binary search (used by TraDatabase.iread)

  • Stop condition for time_stop in TraDatabase.iread

  • Use TRAI for TraDatabase.iread as a time sorted index for binary search (SetID is not!)

  • Check for empty time ranges in TraDatabase.iread

0.3.0 - 2020-11-05

Added

  • Query filter for pridb/tradb (i)read functions

0.2.4 - 2020-11-01

Fixed

  • SQL schemas for pridb/tradb/trfdb creation, add fieldinfos

0.2.3 - 2020-09-01

Fixed

  • AIC timepicker

  • Add threshold for monotonic time check (1 ns) to ignore rounding issues

  • Suppress exception chaining

0.2.2 - 2020-07-10

Added

  • Database classes are now pickable and can be used in multiprocessing

  • SQLite transactions for all writes

  • Faster blob encoding (vallenae.io.encode_data_blob)

  • Faster RMS computation with Numba (vallenae.features.rms)

Fixed

  • Catch possible global_info table parsing errors

0.2.1 - 2020-02-10

Fixed

  • Examples outputs if not run as notebook

  • Out-of-bound time_start, time_stop with SQL binary search

  • Optional signal strength for e.g. spotWave data acquisition

0.2.0 - 2020-02-06

Added

  • Database creation with mode="rwc", e.g. vallenae.io.PriDatabase.__init__

Fixed

  • Number field in vallenae.io.MarkerRecord optional

  • Scaling of parametric inputs optional

  • Keep column order of query if new columns are added to the database

  • Return array with float32 from vallenae.io.TraDatabase.read_continuous_wave (instead of float64)

0.1.0 - 2020-01-24

Initial public release

ToDos

Todo

Status flag

(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.3/lib/python3.7/site-packages/vallenae/io/pridb.py:docstring of vallenae.io.pridb.PriDatabase.write_hit, line 10.)

Todo

Status flag

(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.3/lib/python3.7/site-packages/vallenae/io/pridb.py:docstring of vallenae.io.pridb.PriDatabase.write_parametric, line 10.)

Todo

Status flag

(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.3/lib/python3.7/site-packages/vallenae/io/pridb.py:docstring of vallenae.io.pridb.PriDatabase.write_status, line 10.)

Todo

Status flag

(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.3/lib/python3.7/site-packages/vallenae/io/tradb.py:docstring of vallenae.io.tradb.TraDatabase.write, line 9.)

Todo

Remove RMS

(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.3/lib/python3.7/site-packages/vallenae/io/datatypes.py:docstring of vallenae.io.datatypes.TraRecord, line 3.)

Todo

Weak performance, if used with default parameter alpha

(The original entry is located in /home/docs/checkouts/readthedocs.org/user_builds/pyvallenae/envs/0.5.3/lib/python3.7/site-packages/vallenae/timepicker/timepicker.py:docstring of vallenae.timepicker.timepicker.hinkley, line 22.)

Indices and tables