Source code for vallenae.processor._event_builder

from __future__ import annotations

import math
from collections.abc import Container, Iterable, Iterator, Mapping
from dataclasses import dataclass
from enum import IntEnum
from typing import cast

from ..io.datatypes import HitFlags, HitRecord


[docs] class ChannelFunction(IntEnum): """ Per-channel role used by the Event Builder. Matches the channel function assignment of VisualAE's Location Settings dialog. """ #: Channel is excluded from event building entirely. UNUSED = 0 #: Channel participates in event building and can start an event. NORMAL = 1 #: Inhibits event building for FHCDT when this would be the first-hit channel; #: included as a subsequent hit if an event is already open. GUARD = 2 #: Acts as `ChannelFunction.GUARD` when it would be the first-hit channel; #: acts as `ChannelFunction.NORMAL` otherwise. COMBINED = 3
[docs] class EventCloseReason(IntEnum): """Reason the Event Builder closed an event data set.""" #: Time between first-hit and last seen exceeded DT1X-Max. DT1X_EXPIRED = 1 #: Time between two consecutive hits exceeded DTNX-Max. DTNX_EXPIRED = 2 #: A channel produced a second hit while `allow_multiple_hits_per_channel` was disabled. DUPLICATE_CHANNEL = 3 #: No more hits available; emitted by `flush`. END_OF_STREAM = 4
[docs] @dataclass class Event: """ Event data set produced by the `EventBuilder`. An event represents the group of hits assigned to one acoustic-emission source by the Event Builder. Hits are time-sorted and the first hit (`hits[0]`) is always the Start-of-Event (SoE). """ hits: list[HitRecord] #: Hits assigned to this event, time-sorted; `hits[0]` is the SoE. close_reason: EventCloseReason #: Why the event was closed. @property def time(self) -> float: """Time of the Start-of-Event in seconds (= `hits[0].time`).""" return self.hits[0].time @property def first_hit_channel(self) -> int: """First-hit channel (1.CH in VisualAE), equal to `hits[0].channel`.""" return self.hits[0].channel @property def channel_sequence(self) -> list[int]: """Channels in the order they were hit (1.CH..n.CH in VisualAE).""" return [hit.channel for hit in self.hits] @property def dt_to_first(self) -> list[float]: """ Time differences between the first hit and each subsequent hit in seconds. Corresponds to VisualAE's DT12..DT1n. Length is `len(hits) - 1` (empty for a single-hit event). """ t0 = self.hits[0].time return [hit.time - t0 for hit in self.hits[1:]]
[docs] @dataclass class EventBuilder: """ Group hits into `Event` data sets using the VisualAE Event Builder algorithm. Channel functions (Normal / Guard / Combined / Unused) are honoured. A guard hit (or a combined hit when it would be the first-hit channel) while no event is open inhibits event opening for the next `fhcdt`; a guard or combined hit during an open event is included in that event's `Event.hits` (the downstream location processor is responsible for excluding it from location calculation). The Event Builder is a synchronous state machine. Hits must be fed in time-sorted order. Out-of-order input raises `ValueError`. Args: fhcdt: First Hit Channel Discrimination Time, in seconds. The gap required before a hit qualifies as Start-of-Event (SoE). Every hit on a listed channel resets this gap timer. dt1x_max: Maximum time between SoE and last hit in event, in seconds. Defines the overall event window starting at the SoE. dtnx_max: Maximum time between consecutive hits in event, in seconds. The hit-to-hit continuity window. Setting `dtnx_max == dt1x_max` effectively deactivates this criterion. channels: Channels to consider. Accepts three forms: - `None` (default): accept all channels; treat every hit as `ChannelFunction.NORMAL`. Hits on unknown channels are included as normal hits. - An iterable of channel numbers (e.g. `[1, 2, 3]`): only those channels participate, all as `ChannelFunction.NORMAL`. Hits on other channels are dropped. - A mapping of channel number to `ChannelFunction`: full per-channel control. Channels not present are treated as `ChannelFunction.UNUSED` and dropped. allow_multiple_hits_per_channel: If `False` (default), a second hit on a channel already represented in the open event closes the event with `EventCloseReason.DUPLICATE_CHANNEL`; the duplicate hit is not used for the closing event nor for the next one. If `True`, subsequent hits on a channel are kept in the event. Example: >>> from vallenae.processor import EventBuilder >>> builder = EventBuilder(fhcdt=2e-3, dt1x_max=2e-3, dtnx_max=2e-3) >>> events = list(builder.process_all(pridb.iread_hits())) # doctest: +SKIP """ fhcdt: float dt1x_max: float dtnx_max: float channels: Mapping[int, ChannelFunction] | Container[int] | None = None allow_multiple_hits_per_channel: bool = False def __post_init__(self) -> None: self._open_event: list[HitRecord] | None = None self._t_last_seen: float = -math.inf # time of last listed hit (resets FHCDT timer) self._t_last_input: float | None = None # monotonicity check def _channel_function(self, channel: int) -> ChannelFunction: channels = self.channels if channels is None: return ChannelFunction.NORMAL if isinstance(channels, Mapping): mapping = cast("Mapping[int, ChannelFunction]", channels) return mapping.get(channel, ChannelFunction.UNUSED) return ChannelFunction.NORMAL if channel in channels else ChannelFunction.UNUSED
[docs] def process(self, hit: HitRecord) -> list[Event]: """ Feed a single hit into the state machine. Returns events that became ready as a direct consequence of this hit (0 or 1 events). Use `flush` at the end of the stream to drain any still-open event. """ if self._t_last_input is not None and hit.time < self._t_last_input: raise ValueError( f"hits must be time-sorted: got hit at t={hit.time:g} " f"after t={self._t_last_input:g}" ) self._t_last_input = hit.time func = self._channel_function(hit.channel) if func == ChannelFunction.UNUSED: return [] if self._open_event is None: self._try_start_event(hit, func) return [] if hit.time - self._open_event[0].time > self.dt1x_max: closed = self._close_event(EventCloseReason.DT1X_EXPIRED) self._try_start_event(hit, func) return [closed] if hit.time - self._open_event[-1].time > self.dtnx_max: closed = self._close_event(EventCloseReason.DTNX_EXPIRED) self._try_start_event(hit, func) return [closed] if not self.allow_multiple_hits_per_channel and any( h.channel == hit.channel for h in self._open_event ): closed = self._close_event(EventCloseReason.DUPLICATE_CHANNEL) # Per VisualAE: the offending hit is not used for the closing event nor the next one. # Still update _t_last_seen since this hit is on a listed channel and affects future # FHCDT gap calculation. self._t_last_seen = hit.time return [closed] self._open_event.append(hit) return []
[docs] def process_all(self, hits: Iterable[HitRecord]) -> Iterator[Event]: """ Feed an iterable of hits and yield events as they close. Equivalent to calling `process` on each hit followed by `flush` at the end. The end-of-stream flush is automatic. """ for hit in hits: yield from self.process(hit) yield from self.flush()
[docs] def flush(self) -> list[Event]: """ Emit the in-flight event (if any) and reset state. Returns the closed event with `EventCloseReason.END_OF_STREAM`, or an empty list if no event was open. After calling, the builder is ready for a fresh stream. Idempotent — a second call without intervening input returns `[]`. """ closed: list[Event] = [] if self._open_event is not None: closed.append(self._close_event(EventCloseReason.END_OF_STREAM)) self._t_last_seen = -math.inf self._t_last_input = None return closed
def _try_start_event(self, hit: HitRecord, func: ChannelFunction) -> None: """Try to start a new event from `hit`. Always updates `_t_last_seen`.""" prev_t = self._t_last_seen self._t_last_seen = hit.time if func != ChannelFunction.NORMAL: return # GUARD or COMBINED — inhibits, does not start an event if hit.status & HitFlags.AFTER_TIMEOUT: return # A-flag — can never be SoE if hit.time - prev_t <= self.fhcdt: return # FHCDT gap not satisfied self._open_event = [hit] def _close_event(self, reason: EventCloseReason) -> Event: assert self._open_event is not None # Snapshot the last in-event hit time so the next SoE check has the right FHCDT reference. self._t_last_seen = self._open_event[-1].time event = Event(hits=self._open_event, close_reason=reason) self._open_event = None return event