Vallen AE Python Tools

Extract and analyze Acoustic Emission measurement data.

The IO module vallenae.io enables reading (and writing) of Vallen Systeme SQLite database files:

  • *.pridb: Primary database

  • *.tradb: Transient data

  • *.trfdb: Transient features

The remaining modules are system-independent and try to comprise the most common state-of-the-art algorithms in Acoustic Emission:

IO

Read/write Vallen Systeme database and setup files.

Database classes

Classes to read/write pridb, tradb and trfdb database files.

Warning

Writing is still experimental

PriDatabase(filename[, mode])

IO Wrapper for pridb database file.

TraDatabase(filename[, mode, compression])

IO Wrapper for tradb database file.

TrfDatabase(filename[, mode])

IO Wrapper for trfdb (transient feature) database file.

All database classes implement two different interfaces to access data:

Standard read_*

Read data to pandas.DataFrame, e.g. with PriDatabase.read_hits

>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb")
>>> df = pridb.read_hits()  # save all hits to pandas dataframe
>>> df[["time", "channel"]]  # output columns hit and channel
            time  channel
set_id
10      3.992771        3
11      3.992775        2
12      3.992813        4
13      3.992814        1

Streaming iread_*

Iterate through the data row by row. This is a memory-efficient solution ideal for batch processing. The return types are specific typing.NamedTuple, see Data types.

Example with PriDatabase.iread_hits:

>>> pridb = vae.io.PriDatabase("./examples/steel_plate/sample.pridb")
>>> for hit in pridb.iread_hits():
...     print(f"time: {hit.time:0.4f}, channel: {hit.channel}")
...
time: 3.9928,   channel: 3
time: 3.9928,   channel: 2
time: 3.9928,   channel: 4
time: 3.9928,   channel: 1
>>> type(hit)
<class 'vallenae.io.datatypes.HitRecord'>

Data types

Records of the database are represented as typing.NamedTuple. Each record implements a class method from_sql to init from a SQLite row dictionary (column name: value).

HitRecord(time, channel, param_id, ...[, ...])

Hit record in pridb (SetType = 2).

MarkerRecord(time, set_type, data[, number, ...])

Marker record in pridb (SetType = 4, 5, 6).

StatusRecord(time, channel, param_id, ...[, ...])

Status data record in pridb (SetType = 3).

ParametricRecord(time, param_id[, set_id, ...])

Parametric data record in pridb (SetType = 1).

TraRecord(time, channel, param_id, ...[, ...])

Transient data record in tradb.

FeatureRecord(trai, features)

Transient feature record in trfdb.

Compression

Transient signals in the tradb are stored as BLOBs of 16-bit ADC values – either uncompressed or compressed (FLAC). Following functions convert between BLOBs and arrays of voltage values.

decode_data_blob(data_blob, data_format, ...)

Decodes (compressed) 16-bit ADC values from BLOB to array of voltage values.

encode_data_blob(data, data_format, ...[, raw])

Encodes array of voltage values to BLOB of 16-bit ADC values for memory-efficient storage.

Features

Acoustic Emission

peak_amplitude(data)

Compute maximum absolute amplitude.

peak_amplitude_index(data)

Compute index of peak amplitude.

is_above_threshold(data, threshold)

Checks if absolute amplitudes are above threshold.

first_threshold_crossing(data, threshold)

Compute index of first threshold crossing.

rise_time(data, threshold, samplerate[, ...])

Compute the rise time.

energy(data, samplerate)

Compute the energy of a hit.

signal_strength(data, samplerate)

Compute the signal strength of a hit.

counts(data, threshold)

Compute the number of positive threshold crossings of a hit (counts).

rms(data)

Compute the root mean square (RMS) of an array.

Conversion

amplitude_to_db(amplitude[, reference])

Convert amplitude from volts to decibel (dB).

db_to_amplitude(amplitude_db[, reference])

Convert amplitude from decibel (dB) to volts.

Timerpicker

The determination of signal arrival times has a major influence on the localization accuracy. Usually, arrival times are determined by the first threshold crossing (either fixed or adaptive). Following popular methods have been proposed in the past to automatically pick time of arrivals:

hinkley(arr[, alpha])

Hinkley criterion for arrival time estimation.

aic(arr)

Akaike Information Criterion (AIC) for arrival time estimation.

energy_ratio(arr[, win_len])

Energy ratio for arrival time estimation.

modified_energy_ratio(arr[, win_len])

Modified energy ratio method for arrival time estimation.

Examples

A collection of examples how to read and analyse Acoustic Emission data.

Export to WAV (incremental)

Export to WAV (incremental)

Read pridb

Read pridb

Read and plot transient data

Read and plot transient data

Timepicker

Timepicker

Timepicker batch processing

Timepicker batch processing

Localisation

Localisation

Go fast with multiprocessing

Go fast with multiprocessing

Custom feature extraction

Custom feature extraction

Spectrogram

Spectrogram

Export to WAV

Export to WAV

Export to WAV (incremental)

Generate WAV files from tradb. We use the vallenae.io.TraDatabase.read_continuous_wave method to read the transient data as a continuous array.

This example reads and writes the data incrementally to handle big file sizes that don’t fit into memory at once.

Export channel 1 to /tmp/tmp7tkuetjt/bearing_plain_ch1.wav
Generated WAV files:
/tmp/tmp7tkuetjt/bearing_plain_ch1.wav

from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional

import vallenae as vae
from soundfile import SoundFile

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()


def export_wav_incremental(
    filename_wav: Path,
    tradb: vae.io.TraDatabase,
    channel: int,
    time_start: Optional[float] = None,
    time_stop: Optional[float] = None,
    time_block: int = 1,
):
    """
    Export data from tradb to a WAV file.

    Args:
        filename_wav: Path to the generated WAV file
        tradb: `TraDatabase` instance
        channel: Channel number
        time_start: Start reading at relative time (in seconds). Start at beginning if `None`
        time_start: Stop reading at relative time (in seconds). Read until end if `None`
        time_block: Block size in seconds
    """
    con = tradb.connection()
    if time_start is None:
        time_start = 0
    if time_stop is None:
        time_stop = con.execute("SELECT MAX(Time) FROM view_tr_data").fetchone()[0]

    samplerates = con.execute("SELECT DISTINCT(SampleRate) FROM tr_data").fetchone()
    assert len(samplerates) == 1
    samplerate = samplerates[0]

    blocks = int((time_stop - time_start) // time_block + 1)
    with SoundFile(filename_wav, "w", samplerate=samplerate, channels=1, subtype="PCM_16") as f:
        for block in range(blocks):
            time_block_start = time_start + block * time_block
            time_block_stop = min(time_start + (block + 1) * time_block, time_stop)
            y, _ = tradb.read_continuous_wave(
                channel=channel,
                time_start=time_block_start,
                time_stop=time_block_stop,
                time_axis=False,
                show_progress=False,
                raw=True,  # read as ADC values (int16)
            )
            f.write(y)
            f.flush()


def main():
    filename_tradb = HERE / "bearing" / "bearing_plain.tradb"
    # use a temporary file for this example
    with TemporaryDirectory() as tmpdir:
        with vae.io.TraDatabase(filename_tradb) as tradb:
            for channel in tradb.channel():
                filename_wav = Path(tmpdir) / f"{filename_tradb.stem}_ch{channel}.wav"
                print(f"Export channel {channel} to {filename_wav}")
                export_wav_incremental(
                    filename_wav=filename_wav,
                    tradb=tradb,
                    channel=channel,
                    time_start=0,  # read from t = 0 s
                    time_stop=None,  # read until end if None
                    time_block=1,  # read/write in block sizes of 1 s
                )

        # list all generated wav files
        print("Generated WAV files:")
        for file in Path(tmpdir).iterdir():
            print(file)


if __name__ == "__main__":
    main()

Total running time of the script: (0 minutes 0.059 seconds)

Gallery generated by Sphinx-Gallery

Read pridb

from pathlib import Path

import matplotlib.pyplot as plt
import vallenae as vae

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
PRIDB = HERE / "steel_plate" / "sample.pridb"

Open pridb

pridb = vae.io.PriDatabase(PRIDB)

print("Tables in database: ", pridb.tables())
print("Number of rows in data table (ae_data): ", pridb.rows())
print("Set of all channels: ", pridb.channel())
Tables in database:  {'ae_data', 'ae_fieldinfo', 'data_integrity', 'acq_setup', 'ae_markers', 'ae_params', 'ae_globalinfo'}
Number of rows in data table (ae_data):  18
Set of all channels:  {1, 2, 3, 4}

Read hits to Pandas DataFrame

df_hits = pridb.read_hits()
# Print a few columns
print(df_hits[["time", "channel", "amplitude", "counts", "energy"]])
Hits:   0%|          | 0/4 [00:00<?, ?it/s]
Hits: 100%|██████████| 4/4 [00:00<00:00, 7660.83it/s]
            time  channel  amplitude  counts        energy
set_id
10      3.992771        3   0.046539    2180  2.799510e+07
11      3.992775        2   0.059621    2047  2.276279e+07
12      3.992813        4   0.034119    1854  1.286700e+07
13      3.992814        1   0.029115    1985  1.265275e+07

Query Pandas DataFrame

DataFrames offer powerful features to query and aggregate data, e.g. plot summed energy per channel

ax = df_hits.groupby("channel").sum()["energy"].plot.bar(figsize=(8, 3))
ax.set_xlabel("Channel")
ax.set_ylabel("Summed Energy [eu = 1e-14 V²s]")
plt.tight_layout()
plt.show()
ex1 read pridb

Read markers

df_markers = pridb.read_markers()
print(df_markers)
Marker:   0%|          | 0/5 [00:00<?, ?it/s]
Marker: 100%|██████████| 5/5 [00:00<00:00, 14354.22it/s]
          time  set_type                                        data  number
set_id
1         0.00         6                                                   1
2         0.00         4                                10:52 Resume       1
3         0.00         5                         2019-09-20 10:54:52    <NA>
4         0.00         4  TimeZone: +02:00 (W. Europe Standard Time)       2
18      100.07         4                               10:56 Suspend       3

Read parametric data

df_parametric = pridb.read_parametric()
print(df_parametric)
Parametric:   0%|          | 0/9 [00:00<?, ?it/s]
Parametric: 100%|██████████| 9/9 [00:00<00:00, 13842.59it/s]
        time  param_id  pctd  pcta
set_id
5       0.00         1     0     0
6       1.00         1     0     0
7       2.00         1     0     0
8       3.00         1     0     0
9       3.99         1     0     0
14      4.00         1     0     0
15      5.00         1     0     0
16      6.00         1     0     0
17      6.45         1     0     0

Total running time of the script: (0 minutes 0.217 seconds)

Gallery generated by Sphinx-Gallery

Read and plot transient data

Transient Wave Plot; trai = 4
from pathlib import Path

import matplotlib.pyplot as plt
import vallenae as vae

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "steel_plate" / "sample_plain.tradb"  # uncompressed
TRAI = 4  # just an example, no magic here


def main():
    # Read waveform from tradb
    with vae.io.TraDatabase(TRADB) as tradb:
        y, t = tradb.read_wave(TRAI)

    y *= 1e3  # in mV
    t *= 1e6  # for µs

    # Plot waveforms
    plt.figure(figsize=(8, 4), tight_layout=True)
    plt.plot(t, y)
    plt.xlabel("Time [µs]")
    plt.ylabel("Amplitude [mV]")
    plt.title(f"Transient Wave Plot; trai = {TRAI}")
    plt.show()


if __name__ == "__main__":
    main()

Total running time of the script: (0 minutes 0.269 seconds)

Gallery generated by Sphinx-Gallery

Timepicker

Following example showcases the results of different timepicking methods. For more informations, please refer to the functions documentation (vallenae.timepicker).

import time
from pathlib import Path

import matplotlib.pyplot as plt
import vallenae as vae

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "steel_plate" / "sample_plain.tradb"
TRAI = 4
SAMPLES = 2000

Read waveform from tradb

tradb = vae.io.TraDatabase(TRADB)

y, t = tradb.read_wave(TRAI)
# crop first samples
t = t[:SAMPLES]
y = y[:SAMPLES]
# unit conversion
t *= 1e6  # convert to µs
y *= 1e3  # convert to mV

Prepare plotting with time-picker results

def plot(t_wave, y_wave, y_picker, index_picker, name_picker):
    _, ax1 = plt.subplots(figsize=(8, 4), tight_layout=True)
    ax1.set_xlabel("Time [µs]")
    ax1.set_ylabel("Amplitude [mV]", color="g")
    ax1.plot(t_wave, y_wave, color="g")
    ax1.tick_params(axis="y", labelcolor="g")

    ax2 = ax1.twinx()
    ax2.set_ylabel(f"{name_picker}", color="r")
    ax2.plot(t_wave, y_picker, color="r")
    ax2.tick_params(axis="y", labelcolor="r")

    plt.axvline(t_wave[index_picker], color="k", linestyle=":")
    plt.show()

Hinkley Criterion

hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=5)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")
ex3 timepicker

The negative trend correlates to the chosen alpha value and can influence the results strongly. Results with alpha = 50 (less negative trend):

hc_arr, hc_index = vae.timepicker.hinkley(y, alpha=50)
plot(t, y, hc_arr, hc_index, "Hinkley Criterion")
ex3 timepicker

Akaike Information Criterion (AIC)

aic_arr, aic_index = vae.timepicker.aic(y)
plot(t, y, aic_arr, aic_index, "Akaike Information Criterion")
ex3 timepicker

Energy Ratio

er_arr, er_index = vae.timepicker.energy_ratio(y)
plot(t, y, er_arr, er_index, "Energy Ratio")
ex3 timepicker

Modified Energy Ratio

mer_arr, mer_index = vae.timepicker.modified_energy_ratio(y)
plot(t, y, mer_arr, mer_index, "Modified Energy Ratio")
ex3 timepicker

Performance comparison

All timepicker implementations are using Numba for just-in-time (JIT) compilations. Usually the first function call is slow, because it will trigger the JIT compiler. To compare the performance to a native or numpy implementation, the average of multiple executions should be compared.

def timeit(func, loops=100):
    time_start = time.perf_counter()
    for _ in range(loops):
        func()
    return 1e6 * (time.perf_counter() - time_start) / loops  # elapsed time in µs

timer_results = {
    "Hinkley": timeit(lambda: vae.timepicker.hinkley(y, 5)),
    "AIC": timeit(lambda: vae.timepicker.aic(y)),
    "Energy Ratio": timeit(lambda: vae.timepicker.energy_ratio(y)),
    "Modified Energy Ratio": timeit(lambda: vae.timepicker.modified_energy_ratio(y)),
}

for name, execution_time in timer_results.items():
    print(f"{name}: {execution_time:0.3f} µs")

plt.figure(figsize=(8, 3), tight_layout=True)
plt.bar(timer_results.keys(), timer_results.values())  # noqa: SIM911
plt.ylabel("Time [µs]")
plt.show()
ex3 timepicker
Hinkley: 10.551 µs
AIC: 77.192 µs
Energy Ratio: 12.600 µs
Modified Energy Ratio: 20.961 µs

Total running time of the script: (0 minutes 3.184 seconds)

Gallery generated by Sphinx-Gallery

Timepicker batch processing

Following examples shows how to stream transient data row by row, compute timepicker results and save the results to a feature database (trfdb).

from pathlib import Path
from shutil import copyfile
from tempfile import gettempdir

import matplotlib.pyplot as plt
import pandas as pd
import vallenae as vae

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "steel_plate" / "sample_plain.tradb"
TRFDB = HERE / "steel_plate" / "sample.trfdb"
TRFDB_TMP = Path(gettempdir()) / "sample.trfdb"

Open tradb (readonly) and trfdb (readwrite)

copyfile(TRFDB, TRFDB_TMP)  # copy trfdb, so we don't overwrite it

tradb = vae.io.TraDatabase(TRADB)
trfdb = vae.io.TrfDatabase(TRFDB_TMP, mode="rw")  # allow writing

Read current trfdb

print(trfdb.read())
Trf:   0%|          | 0/4 [00:00<?, ?it/s]
Trf: 100%|██████████| 4/4 [00:00<00:00, 27369.03it/s]
         FFT_CoG     FFT_FoM         PA  ...  CTP          FI          FR
trai                                     ...
1     147.705078  134.277344  46.483864  ...   11  222.672058  110.182449
2     144.042969  139.160156  59.450512  ...   35  182.291672   98.019981
3     155.029297  164.794922  33.995209  ...   55  155.191879   95.493233
4     159.912109  139.160156  29.114828  ...   29  181.023727  101.906227

[4 rows x 8 columns]

Compute arrival time offsets with different timepickers

To improve localisation, time of arrival estimates using the first threshold crossing can be refined with timepickers. Therefore, arrival time offsets between the first threshold crossings and the timepicker results are computed.

def dt_from_timepicker(timepicker_func, tra: vae.io.TraRecord):
    # Index of the first threshold crossing is equal to the pretrigger samples
    index_ref = tra.pretrigger
    # Only analyse signal until peak amplitude
    index_peak = vae.features.peak_amplitude_index(tra.data)
    data = tra.data[:index_peak]
    # Get timepicker result
    _, index_timepicker = timepicker_func(data)
    # Compute offset in µs
    return (index_timepicker - index_ref) * 1e6 / tra.samplerate

Transient data is streamed from the database row by row using vallenae.io.TraDatabase.iread. Only one transient data set is loaded into memory at a time. That makes the streaming interface ideal for batch processing. The timepicker results are saved to the trfdb using vallenae.io.TrfDatabase.write.

for tra in tradb.iread():
    # Calculate arrival time offsets with different timepickers
    feature_set = vae.io.FeatureRecord(
        trai=tra.trai,
        features={
            "ATO_Hinkley": dt_from_timepicker(vae.timepicker.hinkley, tra),
            "ATO_AIC": dt_from_timepicker(vae.timepicker.aic, tra),
            "ATO_ER": dt_from_timepicker(vae.timepicker.energy_ratio, tra),
            "ATO_MER": dt_from_timepicker(vae.timepicker.modified_energy_ratio, tra),
        }
    )
    # Save results to trfdb
    trfdb.write(feature_set)

Read results from trfdb

print(trfdb.read().filter(regex="ATO"))
Trf:   0%|          | 0/4 [00:00<?, ?it/s]
Trf: 100%|██████████| 4/4 [00:00<00:00, 80273.76it/s]
      ATO_Hinkley  ATO_AIC  ATO_ER  ATO_MER
trai
1            30.4     -1.8    -4.0     29.2
2            36.4     -0.8    -1.0     -0.4
3            67.0     -1.8    -3.2     60.4
4            65.8     -1.0    -2.2     64.2

Plot results

ax = trfdb.read()[["ATO_Hinkley", "ATO_AIC", "ATO_ER", "ATO_MER"]].plot.barh()
ax.invert_yaxis()
ax.set_xlabel("Arrival time offset [µs]")
plt.show()
ex4 timepicker batch
Trf:   0%|          | 0/4 [00:00<?, ?it/s]
Trf: 100%|██████████| 4/4 [00:00<00:00, 87838.83it/s]

Plot waveforms and arrival times

_, axes = plt.subplots(4, 1, tight_layout=True, figsize=(8, 8))
for row, ax in zip(trfdb.read().itertuples(), axes):
    trai = row.Index

    # read waveform from tradb
    y, t = tradb.read_wave(trai)

    # plot waveform
    ax.plot(t[400:1000] * 1e6, y[400:1000] * 1e3, "k")  # crop and convert to µs/mV
    ax.set_title(f"trai = {trai}")
    ax.set_xlabel("Time [µs]")
    ax.set_ylabel("Amplitude [mV]")
    ax.label_outer()
    # plot arrival time offsets
    ax.axvline(row.ATO_Hinkley, color="C0")
    ax.axvline(row.ATO_AIC, color="C1")
    ax.axvline(row.ATO_ER, color="C2")
    ax.axvline(row.ATO_MER, color="C3")

axes[0].legend(["Waveform", "Hinkley", "AIC", "ER", "MER"])
plt.show()
trai = 1, trai = 2, trai = 3, trai = 4
Trf:   0%|          | 0/4 [00:00<?, ?it/s]
Trf: 100%|██████████| 4/4 [00:00<00:00, 68200.07it/s]

Use results in VisualAE

The computed arrival time offsets can be directly used in VisualAE. We only need to specify the unit. VisualAE requires them to be in µs. Units and other column-related meta data is saved in the trf_fieldinfo table. Field infos can be retrieved with vallenae.io.TrfDatabase.fieldinfo:

print(trfdb.fieldinfo())
{'FFT_CoG': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(C.o.Gravity)', 'Description': 'Center of gravity of spectrum', 'ShortName': None, 'FormatStr': None}, 'FFT_FoM': {'SetTypes': 2, 'Unit': '[kHz]', 'LongName': 'F(max. Amp.)', 'Description': 'Frequency of maximum of spectrum', 'ShortName': None, 'FormatStr': None}, 'PA': {'SetTypes': 8, 'Unit': '[mV]', 'LongName': 'Peak Amplitude', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'RT': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Rise Time', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'Dur': {'SetTypes': 8, 'Unit': '[µs]', 'LongName': 'Duration (available)', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'CTP': {'SetTypes': 8, 'Unit': None, 'LongName': 'Cnts to peak', 'Description': None, 'ShortName': None, 'FormatStr': '#'}, 'FI': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Initiation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}, 'FR': {'SetTypes': 8, 'Unit': '[kHz]', 'LongName': 'Reverberation Freq.', 'Description': None, 'ShortName': None, 'FormatStr': None}}

Show results as table:

print(pd.DataFrame(trfdb.fieldinfo()))
                                   FFT_CoG  ...                   FR
SetTypes                                 2  ...                    8
Unit                                 [kHz]  ...                [kHz]
LongName                    F(C.o.Gravity)  ...  Reverberation Freq.
Description  Center of gravity of spectrum  ...                 None
ShortName                             None  ...                 None
FormatStr                             None  ...                 None

[6 rows x 8 columns]
Write units to trfdb

Field infos can be written with vallenae.io.TrfDatabase.write_fieldinfo:

trfdb.write_fieldinfo("ATO_Hinkley", {"Unit": "[µs]", "LongName": "Arrival Time Offset (Hinkley)"})
trfdb.write_fieldinfo("ATO_AIC", {"Unit": "[µs]", "LongName": "Arrival Time Offset (AIC)"})
trfdb.write_fieldinfo("ATO_ER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (ER)"})
trfdb.write_fieldinfo("ATO_MER", {"Unit": "[µs]", "LongName": "Arrival Time Offset (MER)"})

print(pd.DataFrame(trfdb.fieldinfo()).filter(regex="ATO"))
                               ATO_Hinkley  ...                    ATO_MER
SetTypes                              None  ...                       None
Unit                                  [µs]  ...                       [µs]
LongName     Arrival Time Offset (Hinkley)  ...  Arrival Time Offset (MER)
Description                           None  ...                       None
ShortName                             None  ...                       None
FormatStr                             None  ...                       None

[6 rows x 4 columns]
Load results in VisualAE

Time arrival offsets can be specified in the settings of Location Processors - Channel Positions - Arrival Time Offset. (Make sure to rename the generated trfdb to match the filename of the pridb.)

_images/vae_arrival_time_offset.png

Total running time of the script: (0 minutes 0.834 seconds)

Gallery generated by Sphinx-Gallery

Localisation

Location Result and LUCY-Heatmap
Hits:   0%|          | 0/4 [00:00<?, ?it/s]
Hits: 100%|██████████| 4/4 [00:00<00:00, 9310.33it/s]
Runtime for 1 call to differential_evolution(): 0.6603 s
             message: Optimization terminated successfully.
             success: True
                 fun: 0.001115889206067505
                   x: [ 2.217e-01  3.657e-01]
                 nit: 101
                nfev: 8175
          population: [[ 2.216e-01  3.657e-01]
                       [ 2.216e-01  3.657e-01]
                       ...
                       [ 2.219e-01  3.655e-01]
                       [ 2.218e-01  3.656e-01]]
 population_energies: [ 1.116e-03  1.118e-03 ...  1.144e-03  1.127e-03]
                 jac: [-7.818e-04  8.985e-04]

import math
import time
from pathlib import Path
from typing import Dict, Optional, Tuple
from xml.etree import ElementTree

import matplotlib.pyplot as plt
import numpy as np
import vallenae as vae
from numba import f8, njit
from numpy.linalg import norm
from scipy.optimize import differential_evolution

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
PRIDB = HERE / "steel_plate" / "sample.pridb"
SETUP = HERE / "steel_plate" / "sample.vaex"
NUMBER_SENSORS = 4


@njit(f8(f8[:], f8, f8[:, :], f8[:]))
def lucy_error_fun(
    test_pos: np.ndarray,
    speed: float,
    sens_poss: np.ndarray,
    measured_delta_ts: np.ndarray,
) -> float:
    """
    Implementation of the LUCY computation in 2D as documented in
    the Vallen online help.

    Args:
        test_pos: Emitter position to test.
        speed: Assumed speed of sound in a plate-like structure.
        sens_poss: Sensor positions, often a 4x2 array, has to match
            the sorting of the delta-ts.
        measured_delta_ts: The measured time differences in seconds, has to
            match the order of the sensor positions.

    Returns:
        The LUCY value as a float. Ideally 0, in practice never 0, always positive.
    """
    m = len(measured_delta_ts)
    n = m + 1
    measured_delta_dists = speed * measured_delta_ts
    theo_dists = np.zeros(n)
    theo_delta_dists = np.zeros(m)
    for i in range(n):
        theo_dists[i] = norm(test_pos - sens_poss[i, :])
    for i in range(m):
        theo_delta_dists[i] = theo_dists[i + 1] - theo_dists[0]

    # LUCY definition taken from the vallen online help:
    return norm(theo_delta_dists - measured_delta_dists) / math.sqrt(n - 1)


def get_channel_positions(setup_file: str) -> Dict[int, Tuple[float, float]]:
    tree = ElementTree.parse(setup_file)
    nodes = tree.getroot().findall(".//ChannelPos")
    if nodes is None:
        raise RuntimeError("Can not retrieve channel positions from %s", setup_file)
    return {
        int(elem.get("Chan")): (float(elem.get("X")), float(elem.get("Y")))  # type: ignore
        for elem in nodes if elem is not None
    }


def get_velocity(setup_file: str) -> Optional[float]:
    tree = ElementTree.parse(setup_file)
    node = tree.getroot().find(".//Location")
    if node is not None:
        velocity_str = node.get("Velocity")
        if velocity_str is not None:
            return float(velocity_str) * 1e3  # convert to m/s
    raise RuntimeError("Can not retrieve velocity from %s", setup_file)


def main():
    # Consts plotting
    text_delta_y = 0.03
    text_delta_x = -0.12

    # Consts LUCY grid
    grid_delta = 0.01
    location_search_bounds = [(0.0, 0.80), (0.0, 0.80)]

    # Read from pridb
    pridb = vae.io.PriDatabase(PRIDB)
    hits = pridb.read_hits()
    pridb.close()

    channel_order = hits["channel"].to_numpy()
    arrival_times = hits["time"].to_numpy()
    delta_ts = (arrival_times - arrival_times[0])[1:]

    # Get localisation parameters from .vaex file
    velocity = get_velocity(SETUP)
    pos_dict = get_channel_positions(SETUP)

    # Order sensor positions by hit occurence
    pos_ordered = np.array([pos_dict[ch] for ch in channel_order])

    # Compute heatmap
    def lucy_instance_2args(x, y):
        return lucy_error_fun(np.array([x, y]), velocity, pos_ordered, delta_ts)

    x_range = np.arange(location_search_bounds[0][0], location_search_bounds[0][1], grid_delta)
    y_range = x_range
    x_grid, y_grid = np.meshgrid(x_range, y_range)
    z_grid = np.vectorize(lucy_instance_2args)(x_grid, y_grid)

    # Plot heatmap
    plt.figure(tight_layout=True)
    plt.pcolormesh(x_grid, y_grid, z_grid, cmap="cool")
    plt.colorbar()
    plt.title("Location Result and LUCY-Heatmap")
    plt.xlabel("x [m]")
    plt.ylabel("y [m]")

    # Compute location
    def lucy_instance_single_arg(pos):
        return lucy_error_fun(pos, velocity, pos_ordered, delta_ts)

    start = time.perf_counter()
    # These are excessive search / overkill parameters:
    location_result = differential_evolution(
        lucy_instance_single_arg,
        location_search_bounds,
        popsize=40,
        polish=True,
        strategy="rand1bin",
        recombination=0.1,
        mutation=1.3,
    )
    end = time.perf_counter()
    print(f"Runtime for 1 call to differential_evolution(): {(end - start):0.4} s")
    print(location_result)

    # Plot location result
    x_res = location_result.x[0]
    y_res = location_result.x[1]
    plt.plot([x_res], [y_res], "bo")
    plt.text(
        x_res + text_delta_x,
        y_res + text_delta_y,
        "location result",
        fontsize=9,
        color="b",
    )

    # Plot sensor positions
    for channel, (x, y) in pos_dict.items():
        text = f"S{channel} (x={x:0.2f}m | y={y:0.2f}m)"
        plt.scatter(x, y, marker="o", color="w")
        plt.text(x + text_delta_x, y + text_delta_y, text, fontsize=9, color="w")

    plt.show()


if __name__ == "__main__":
    main()

Total running time of the script: (0 minutes 1.836 seconds)

Gallery generated by Sphinx-Gallery

Go fast with multiprocessing

The streaming interfaces with iterables allow efficient batch processing as shown here. But still only one core/thread will be utilized. We will change that will multiprocessing.

Following example shows a batch feature extraction procedure using multiple CPU cores.

import multiprocessing
import os
import time
from itertools import cycle
from pathlib import Path
from typing import Iterable

import matplotlib.pyplot as plt
import vallenae as vae

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "steel_plate" / "sample_plain.tradb"

Prepare streaming reads

Our sample tradb only contains four data sets. That is not enough data for demonstrating batch processing. Therefore, we will simulate more data by looping over the data sets with following generator/iterable:

def tra_generator(loops: int = 1000) -> Iterable[vae.io.TraRecord]:
    with vae.io.TraDatabase(TRADB) as tradb:
        for loop, tra in enumerate(cycle(tradb.iread())):
            if loop > loops:
                break
            yield tra

Define feature extraction function

A simple function from the module _feature_extraction is applied to all data sets and returns computed features. The function is defined in another module to work with multiprocessing.Pool: https://bugs.python.org/issue25053

from _feature_extraction import feature_extraction  # noqa

Compute with single thread/core

Note

The examples are executed on the CI / readthedocs server with limited resources. Therefore, the shown computation times and speedups are below the capability of modern machines.

Run computation in a single thread and get the time:

def time_elapsed_ms(t0):
    return 1000.0 * (time.perf_counter() - t0)

if __name__ == "__main__":  # guard needed for multiprocessing on Windows
    time_start = time.perf_counter()
    for tra in tra_generator():
        results = feature_extraction(tra)
        # do something with the results
    time_single_thread = time_elapsed_ms(time_start)

    print(f"Time single thread: {time_single_thread:.2f} ms")
Time single thread: 1239.29 ms

Compute with multiple processes/cores

First get number of available cores in your machine:

print(f"Available CPU cores: {os.cpu_count()}")
Available CPU cores: 2

But how can we utilize those cores? The common answer for most programming languages is multithreading. Threads run in the same process and heap, so data can be shared between them (with care). Sadly, Python uses a global interpreter lock (GIL) that locks heap memory, because Python objects are not thread-safe. Therefore, threads are blocking each other and no speedups are gained by using multiple threads.

The solution for Python is multiprocessing to work around the GIL. Every process has its own heap and GIL. Multiprocessing will introduce overhead for interprocess communication and data serialization/deserialization. To reduce the overhead, data is sent in bigger chunks.

Run computation on 4 cores with chunks of 128 data sets and get the time / speedup:

if __name__ == "__main__":  # guard needed for multiprocessing on Windows
    with multiprocessing.Pool(4) as pool:
        time_start = time.perf_counter()
        for _results in pool.imap(feature_extraction, tra_generator(), chunksize=128):
            pass  # do something with the results
        time_multiprocessing = time_elapsed_ms(time_start)

    print(f"Time multiprocessing: {time_multiprocessing:.2f} ms")
    print(f"Speedup: {(time_single_thread / time_multiprocessing):.2f}")
Time multiprocessing: 1295.64 ms
Speedup: 0.96
Variation of the chunksize

Following results show how the chunksize impacts the overall performance. The speedup is measured for different chunksizes and plotted against the chunksize:

if __name__ == "__main__":  # guard needed for multiprocessing on Windows
    chunksizes = (10, 40, 60, 80, 100, 120, 140, 160, 200)
    speedup_chunksizes = []
    with multiprocessing.Pool(4) as pool:
        for chunksize in chunksizes:
            time_start = time.perf_counter()
            for _results in pool.imap(feature_extraction, tra_generator(), chunksize=chunksize):
                pass  # do something with the results
            speedup_chunksizes.append(time_single_thread / time_elapsed_ms(time_start))

    plt.figure(tight_layout=True, figsize=(6, 3))
    plt.plot(chunksizes, speedup_chunksizes)
    plt.xlabel("Chunksize")
    plt.ylabel("Speedup")
    plt.show()
ex6 multiprocessing

Total running time of the script: (0 minutes 14.106 seconds)

Gallery generated by Sphinx-Gallery

Custom feature extraction

Following examples shows how to compute custom features and save them in the transient feature database (trfdb) to visualize them in VisualAE.

The feature extraction can be live during acquisition. VisualAE will be notified, that a writer to the trfdb is active and waits for the features to be computed. Therefore, the computed features can be visualized in real time.

from pathlib import Path
from tempfile import gettempdir

import matplotlib.pyplot as plt
import numpy as np
import vallenae as vae

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
PRIDB = HERE / "bearing" / "bearing.pridb"
TRADB = HERE / "bearing" / "bearing_plain.tradb"
# TRFDB = HERE / "bearing" / "bearing.trfdb"
TRFDB_TMP = Path(gettempdir()) / "bearing_custom.trfdb"  # use a temp file for demo

Custom feature extraction algorithms

def rms(data: np.ndarray) -> float:
    """Root mean square (RMS)."""
    return np.sqrt(np.mean(data ** 2))


def crest_factor(data: np.ndarray) -> float:
    """Crest factor (ratio of peak amplitude and RMS)."""
    return np.max(np.abs(data)) / rms(data)


def spectral_peak_frequency(spectrum_: np.ndarray, samplerate: int) -> float:
    """
    Peak frequency in a spectrum.

    Args:
        spectrum: FFT amplitudes
        samplerate: Sample rate of the spectrum in Hz

    Returns:
        Peak frequency in Hz
    """
    def bin_to_hz(samplerate: int, samples: int, index: int):
        return 0.5 * samplerate * index / (samples - 1)

    peak_index = np.argmax(spectrum_)
    return bin_to_hz(samplerate, len(spectrum_), peak_index)

Open tradb and trfdb

tradb = vae.io.TraDatabase(TRADB)
trfdb = vae.io.TrfDatabase(TRFDB_TMP, mode="rwc")

Helper function to notify VisualAE, that the transient feature database is active/closed

def set_file_status(trfdb_: vae.io.TrfDatabase, status: int):
    """Notify VisualAE that trfdb is active/closed."""
    trfdb_.connection().execute(
        f"UPDATE trf_globalinfo SET Value = {status} WHERE Key == 'FileStatus'"
    )

Read tra records, compute features and save to trfdb

The vallenae.io.TraDatabase.listen method will read the tradb row by row and can be used during acquisition. Only if the acquisition is closed and no new records are available, the function returns.

set_file_status(trfdb, 2)  # 2 = active

for tra in tradb.listen(existing=True, wait=False):
    spectrum = np.fft.rfft(tra.data)
    features = vae.io.FeatureRecord(
        trai=tra.trai,
        features={
            "RMS": rms(tra.data),
            "CrestFactor": crest_factor(tra.data),
            "SpectralPeakFreq": spectral_peak_frequency(spectrum, tra.samplerate),
        }
    )
    trfdb.write(features)

set_file_status(trfdb, 0)  # 0 = closed
Write field infos to trfdb

Field infos can be written with vallenae.io.TrfDatabase.write_fieldinfo:

trfdb.write_fieldinfo("RMS", {"Unit": "[V]", "LongName": "Root mean square"})
trfdb.write_fieldinfo("CrestFactor", {"Unit": "[]", "LongName": "Crest factor"})
trfdb.write_fieldinfo("SpectralPeakFreq", {"Unit": "[Hz]", "LongName": "Spectral peak frequency"})

Read results from trfdb

df_trfdb = trfdb.read()
print(df_trfdb)
Trf:   0%|          | 0/490 [00:00<?, ?it/s]
Trf: 100%|██████████| 490/490 [00:00<00:00, 1920756.04it/s]
           RMS  CrestFactor  SpectralPeakFreq
trai
1     0.000129     3.684550           63800.0
2     0.000134     3.843248           65500.0
3     0.000108     3.906766           58500.0
4     0.000115     3.876217           58000.0
5     0.000106     3.548410           72000.0
...        ...          ...               ...
486   0.000006     3.624268          162900.0
487   0.000006     3.486689          250000.0
488   0.000006     3.530853          164100.0
489   0.000006     4.059815          174000.0
490   0.000006     3.545406          164700.0

[490 rows x 3 columns]

Plot AE features and custom features

Read pridb and join it with trfdb:

with vae.io.PriDatabase(PRIDB) as pridb:
    df_pridb = pridb.read_hits()

df_combined = df_pridb.join(df_trfdb, on="trai", how="left")
print(df_combined)
Hits:   0%|          | 0/490 [00:00<?, ?it/s]
Hits: 100%|██████████| 490/490 [00:00<00:00, 100849.35it/s]
        time  channel  param_id  ...       RMS  CrestFactor  SpectralPeakFreq
set_id                           ...
6       0.00        1         2  ...  0.000129     3.684550           63800.0
8       0.01        1         2  ...  0.000134     3.843248           65500.0
10      0.02        1         2  ...  0.000108     3.906766           58500.0
12      0.03        1         2  ...  0.000115     3.876217           58000.0
14      0.04        1         2  ...  0.000106     3.548410           72000.0
...      ...      ...       ...  ...       ...          ...               ...
976     4.85        1         2  ...  0.000006     3.624268          162900.0
978     4.86        1         2  ...  0.000006     3.486689          250000.0
980     4.87        1         2  ...  0.000006     3.530853          164100.0
982     4.88        1         2  ...  0.000006     4.059815          174000.0
984     4.89        1         2  ...  0.000006     3.545406          164700.0

[490 rows x 15 columns]

Plot joined features from pridb and trfdb

features = [
    # from pridb
    "amplitude",
    "energy",
    "counts",
    # from trfdb - custom
    "RMS",
    "CrestFactor",
    "SpectralPeakFreq",
]
df_combined.plot(
    x="time",
    y=features,
    xlabel="Time [s]",
    title=features,
    legend=False,
    subplots=True,
    figsize=(8, 10),
)
plt.suptitle("AE Features from pridb and custom features from trfdb")
plt.tight_layout()
plt.show()
AE Features from pridb and custom features from trfdb, amplitude, energy, counts, RMS, CrestFactor, SpectralPeakFreq

Display custom features in VisualAE

_images/vae_custom_features.png

Total running time of the script: (0 minutes 0.915 seconds)

Gallery generated by Sphinx-Gallery

Spectrogram

Generate spectrogram from tradb. The vallenae.io.TraDatabase.read_continuous_wave method is used to read the transient data as a continuous array.

from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import vallenae as vae
from matplotlib.colors import LogNorm
from scipy import signal

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()
TRADB = HERE / "bearing" / "bearing_plain.tradb"

Read transient data as continous array

The signal is exactly cropped to the given time range (time_start, time_stop). Time gaps are filled with 0’s.

with vae.io.TraDatabase(TRADB) as tradb:
    y, fs = tradb.read_continuous_wave(
        channel=1,
        time_start=None,
        time_stop=None,
        time_axis=False,  # return samplerate instead of time axis
        show_progress=False,
    )
    t = np.arange(0, len(y)) / fs  # generate time axis

Compute Short-Time Fourier Transform (STFT)

nfft = 4096
noverlap = 2048
fz, tz, zxx = signal.stft(y, fs=fs, window="hann", nperseg=nfft, noverlap=noverlap)

Plot time data and spectrogram

fig, ax = plt.subplots(
    nrows=2,
    sharex=True,
    figsize=(8, 6),
    tight_layout=True,
    gridspec_kw={"height_ratios": (1, 2)},
)
ax[0].plot(t, y * 1e3)
ax[0].set(xlabel="Time [s]", ylabel="Amplitude [mV]", title="Waveform")
ax[1].pcolormesh(tz, fz / 1e3, np.abs(zxx), norm=LogNorm())
ax[1].set(xlabel="Time [s]", ylabel="Frequency [kHz]", title="STFT")
plt.show()
Waveform, STFT

Total running time of the script: (0 minutes 2.220 seconds)

Gallery generated by Sphinx-Gallery

Export to WAV

Generate WAV files from tradb. We use the vallenae.io.TraDatabase.read_continuous_wave method to read the transient data as a continuous array.

The signal can optionally be decimated to reduce the size of the generated WAV files (using the scipy.signal.decimate function).

Export channel 1 to /tmp/tmpd3z4xw7g/bearing_plain_ch1.wav
Generated WAV files:
/tmp/tmpd3z4xw7g/bearing_plain_ch1.wav

from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Optional

import numpy as np
import vallenae as vae
from scipy import signal
from scipy.io import wavfile

HERE = Path(__file__).parent if "__file__" in locals() else Path.cwd()


def export_wav(
    filename_wav: Path,
    tradb: vae.io.TraDatabase,
    channel: int,
    time_start: Optional[float] = None,
    time_stop: Optional[float] = None,
    decimation_factor: int = 1,
):
    """
    Export data from tradb to a WAV file.

    Args:
        filename_wav: Path to the generated WAV file
        tradb: `TraDatabase` instance
        channel: Channel number
        time_start: Start reading at relative time (in seconds). Start at beginning if `None`
        time_start: Stop reading at relative time (in seconds). Read until end if `None`
        decimation_factor: Decimate signal with given factor
    """
    y, fs = tradb.read_continuous_wave(
        channel=channel,
        time_start=time_start,
        time_stop=time_stop,
        time_axis=False,
        show_progress=False,
        raw=True,  # read as ADC values (int16)
    )

    if decimation_factor > 1:
        y = signal.decimate(y, decimation_factor).astype(np.int16)
        fs //= decimation_factor

    wavfile.write(filename_wav, fs, y)


def main():
    filename_tradb = HERE / "bearing" / "bearing_plain.tradb"

    # use a temporary file for this example
    with TemporaryDirectory() as tmpdir:
        with vae.io.TraDatabase(filename_tradb) as tradb:
            for channel in tradb.channel():
                filename_wav = Path(tmpdir) / f"{filename_tradb.stem}_ch{channel}.wav"
                print(f"Export channel {channel} to {filename_wav}")
                export_wav(
                    filename_wav=filename_wav,
                    tradb=tradb,
                    channel=channel,
                    time_start=0,  # read from t = 0 s
                    time_stop=None,  # read until end if None
                    decimation_factor=5,  # custom decimation factor
                )

        # list all generated wav files
        print("Generated WAV files:")
        for file in Path(tmpdir).iterdir():
            print(file)


if __name__ == "__main__":
    main()

Total running time of the script: (0 minutes 0.094 seconds)

Gallery generated by Sphinx-Gallery

Gallery generated by Sphinx-Gallery

Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog, and this project adheres to Semantic Versioning.

Unreleased

0.9.0 - 2024-02-14

Added

  • Example for incremental WAV export

  • Python 3.12 support

0.8.0 - 2023-07-13

Added

  • Flag TraRecord.raw if data is stored as ADC values (int16)

  • Flag raw for TraDatabase read methods to read data as ADC values:

    • TraDatabase.iread

    • TraDatabase.read

    • TraDatabase.read_wave

    • TraDatabase.read_continuous_wave

    • TraDatabase.listen

  • Bearing example data

  • Spectrogram example

  • WAV export example using the new raw flag

  • CI for Python 3.11

Changed

  • Remove scipy dependency (only needed for examples)

  • Migrate from setuptools to hatch (replace setup.py with pyproject.toml)

Fixed

  • Multiprocessing example for Windows

0.7.0 - 2022-11-10

Added

  • Example for custom feature extraction

  • PyInstaller hook

  • CI for Python 3.10

Changed

  • Make Numba dependency optional (fallback timepicker implementations with NumPy)

Fixed

  • Counts computations (first sample above threshold is not a count)

0.6.0 - 2021-09-02

Added

  • CI for Python 3.9

Changed

  • Remove superfluous data_format field from TraRecord data type

0.5.4 - 2021-05-25

Fixed

  • Limit number of buffered records in listen methods

  • Time axis rounding errors, e.g. for TraDatabase.read_wave with time_axis=True

0.5.3 - 2021-05-04

Fixed

  • SQLite URI for absolute linux paths

0.5.2 - 2021-05-04

Fixed

  • SQLite URI for special characters (#, ?)

0.5.1 - 2021-03-25

Fixed

  • Buffering of SQL results in listen methods to allow SQL queries in between

0.5.0 - 2021-03-18

Added

  • Query filter parameter to TrfDatabase.read and TrfDatabase.iread

  • listen method for PriDatabase, TraDatabase and TrfDatabase to retrieve new records live

Changed

  • Order feature records by TRAI for TrfDatabase.read and TrfDatabase.iread

0.4.0 - 2021-02-14

Added

  • CI with GitHub actions on Linux, MacOS and Windows

  • Workflow with GitHub actions to publish to PyPI on new releases

  • pyproject.toml as the main config file for pylint, pytest, tox, coverage, …

Changed

  • Return exact time range with TraDatabase.read_continuous_wave

  • Return “absolute” time axis with TraDatabase.read_continuous_wave (instead of starting at t = 0 s)

Fixed

  • Fix database close if exception raised in __init__ (e.g. file not found)

  • Example ex6_multiprocessing for MacOS

  • Find lower/upper bounds for same values (times) in binary search (used by TraDatabase.iread)

  • Stop condition for time_stop in TraDatabase.iread

  • Use TRAI for TraDatabase.iread as a time sorted index for binary search (SetID is not!)

  • Check for empty time ranges in TraDatabase.iread

0.3.0 - 2020-11-05

Added

  • Query filter for pridb/tradb (i)read functions

0.2.4 - 2020-11-01

Fixed

  • SQL schemas for pridb/tradb/trfdb creation, add fieldinfos

0.2.3 - 2020-09-01

Fixed

  • AIC timepicker

  • Add threshold for monotonic time check (1 ns) to ignore rounding issues

  • Suppress exception chaining

0.2.2 - 2020-07-10

Added

  • Database classes are now pickable and can be used in multiprocessing

  • SQLite transactions for all writes

  • Faster blob encoding (vallenae.io.encode_data_blob)

  • Faster RMS computation with Numba (vallenae.features.rms)

Fixed

  • Catch possible global_info table parsing errors

0.2.1 - 2020-02-10

Fixed

  • Examples outputs if not run as notebook

  • Out-of-bound time_start, time_stop with SQL binary search

  • Optional signal strength for e.g. spotWave data acquisition

0.2.0 - 2020-02-06

Added

  • Database creation with mode="rwc", e.g. vallenae.io.PriDatabase.__init__

Fixed

  • Number field in vallenae.io.MarkerRecord optional

  • Scaling of parametric inputs optional

  • Keep column order of query if new columns are added to the database

  • Return array with float32 from vallenae.io.TraDatabase.read_continuous_wave (instead of float64)

0.1.0 - 2020-01-24

Initial public release

Indices and tables