"""
Adapter layer for Teledyne LeCroy oscilloscope waveform files (.trc).
This module bridges the generated LeCroy KSY parsers and the rest of RigolWFM.
It normalises the binary data into the header/data shape expected by
`RigolWFM.wfm.Wfm` and `RigolWFM.channel`.
Two template versions are supported:
LECROY_2_3 (346-byte WAVEDESC):
ksy/lecroy_2_3_le_trc.ksy → RigolWFM/lecroy_2_3_le_trc.py (LOFIRST / LE)
ksy/lecroy_2_3_be_trc.ksy → RigolWFM/lecroy_2_3_be_trc.py (HIFIRST / BE)
LECROY_1_0 (320-byte WAVEDESC, older format):
ksy/lecroy_1_0_le_trc.ksy → RigolWFM/lecroy_1_0_le_trc.py (LOFIRST / LE)
ksy/lecroy_1_0_be_trc.ksy → RigolWFM/lecroy_1_0_be_trc.py (HIFIRST / BE)
SCPI prefix
-----------
Files transferred via GPIB/SCPI may carry an IEEE 488.2 block-data prefix of
the form ``#N<N digits of byte count>`` before the WAVEDESC marker. This
module searches for ``WAVEDESC`` and strips any such prefix before handing the
bytes to the KSY parser.
Endianness detection
--------------------
COMM_ORDER is a u2 at offset 34 inside the WAVEDESC block. The low byte at
that position is 0 for big-endian (HIFIRST) and 1 for little-endian (LOFIRST).
LECROY_1_0 calibration:
volts[i] = VERTICAL_GAIN * adc[i] - ACQ_VERT_OFFSET
LECROY_2_3 calibration:
volts[i] = VERTICAL_GAIN * adc[i] - VERTICAL_OFFSET
Time axis (both versions):
t[i] = HORIZ_OFFSET + i * HORIZ_INTERVAL (i = 0 … WAVE_ARRAY_COUNT − 1)
"""
import io as _io
from typing import Any, Optional
import numpy as np
import numpy.typing as npt
from kaitaistruct import KaitaiStream # type: ignore[import]
import RigolWFM.channel
import RigolWFM.lecroy_1_0_be_trc
import RigolWFM.lecroy_1_0_le_trc
import RigolWFM.lecroy_2_3_be_trc
import RigolWFM.lecroy_2_3_le_trc
_Lecroy23Le: Any = RigolWFM.lecroy_2_3_le_trc.Lecroy23LeTrc # type: ignore[attr-defined]
_Lecroy23Be: Any = RigolWFM.lecroy_2_3_be_trc.Lecroy23BeTrc # type: ignore[attr-defined]
_Lecroy10Le: Any = RigolWFM.lecroy_1_0_le_trc.Lecroy10LeTrc # type: ignore[attr-defined]
_Lecroy10Be: Any = RigolWFM.lecroy_1_0_be_trc.Lecroy10BeTrc # type: ignore[attr-defined]
_LECROY_MAGIC = b"WAVEDESC"
_COUPLING_MAP = {
0: "DC", # dc_50_ohm
1: "GND", # ground
2: "DC", # dc_1m_ohm
3: "GND", # ground_b
4: "AC", # ac_1m_ohm
}
def _find_wavedesc(data: bytes) -> int:
"""Return the byte offset of the WAVEDESC marker in data.
Searches the file for the first plausible 8-byte ASCII string
``WAVEDESC`` and validates the surrounding header bytes enough to avoid
obvious false positives in arbitrary payload data.
"""
start = 0
while True:
pos = data.find(_LECROY_MAGIC, start)
if pos < 0:
raise ValueError("WAVEDESC marker not found in file")
candidate = data[pos:]
if len(candidate) >= 35 and candidate[34] in (0, 1):
return pos
start = pos + 1
def _read_file_bytes(file_name: str) -> bytes:
"""Read file contents, raising ValueError on OS errors."""
try:
with open(file_name, "rb") as f:
return f.read()
except OSError as exc:
raise ValueError(f"Cannot open LeCroy file '{file_name}': {exc}") from exc
[docs]
def from_file(file_name: str) -> LeCroyWaveform:
"""Parse a LeCroy .trc file and normalize it for `Wfm.from_file()`.
Handles both LECROY_2_3 and LECROY_1_0 WAVEDESC formats. Files with a
leading SCPI ``#N<digits>`` block-data prefix are automatically detected
and stripped before parsing.
Args:
file_name: path to a LeCroy .trc waveform file.
Returns:
A `LeCroyWaveform` object whose `header` follows the shape expected
by `RigolWFM.channel.Channel` and `RigolWFM.wfm.Wfm`.
Raises:
ValueError: if the file cannot be parsed as a valid LeCroy waveform.
"""
data = _read_file_bytes(file_name)
try:
wavedesc_pos = _find_wavedesc(data)
except ValueError as exc:
raise ValueError(f"Not a valid LeCroy file '{file_name}': {exc}") from exc
# Slice to WAVEDESC so the KSY parser sees offset 0 = start of WAVEDESC
payload = data[wavedesc_pos:]
# Determine endianness from COMM_ORDER low byte at WAVEDESC offset 34
if len(payload) < 35:
raise ValueError(f"File '{file_name}' WAVEDESC block is too short")
is_le = payload[34] == 1
# Determine template version from WAVEDESC offsets 16–31
template_bytes = payload[16:32]
template = template_bytes.split(b"\x00")[0].decode("ascii", errors="ignore").strip()
is_v1 = template == "LECROY_1_0"
try:
stream = KaitaiStream(_io.BytesIO(payload))
if is_v1:
raw = _Lecroy10Le(stream) if is_le else _Lecroy10Be(stream)
else:
raw = _Lecroy23Le(stream) if is_le else _Lecroy23Be(stream)
except Exception as exc:
raise ValueError(f"Could not parse LeCroy file '{file_name}': {exc}") from exc
wd = raw.wavedesc
# Decode instrument name (null-terminated ASCII, 16 bytes)
try:
model_str = wd.instrument_name.split(b"\x00")[0].decode("ascii", errors="ignore").strip()
except Exception:
model_str = "LeCroy"
# --- LECROY_1_0: wave_source is a plain s2, 1-indexed (1=CH1 … 4=CH4) ---
# --- LECROY_2_3: wave_source is an IntEnum, 0-indexed (0=CH1 … 3=CH4) ---
if is_v1:
wave_source_val = int(wd.wave_source)
slot = max(0, min(3, wave_source_val - 1)) # 1-indexed → 0-based
else:
wave_source_val = getattr(wd.wave_source, "value", int(wd.wave_source))
slot = max(0, min(3, wave_source_val))
ch_name = f"CH{slot + 1}"
coupling_val = getattr(wd.vert_coupling, "value", int(wd.vert_coupling))
coupling_str = _COUPLING_MAP.get(coupling_val, "DC")
probe_att = float(getattr(wd, "probe_att", 1.0))
if probe_att <= 0:
probe_att = 1.0
is_16bit = bool(wd.is_16bit)
n_pts = int(wd.wave_array_count)
vertical_gain = float(wd.vertical_gain)
# Calibration offset differs between template versions
if is_v1:
vertical_offset = float(wd.acq_vert_offset)
else:
vertical_offset = float(wd.vertical_offset)
try:
raw_bytes = raw.wave_array_1
except Exception as exc:
raise ValueError(f"No waveform data in '{file_name}' (file may be truncated): {exc}") from exc
if not raw_bytes:
raise ValueError(f"No waveform data in '{file_name}'")
# Decode sample array
byte_order = "<" if is_le else ">"
adc: npt.NDArray[np.signedinteger[Any]]
if is_16bit:
dtype = np.dtype(f"{byte_order}i2")
adc = np.frombuffer(raw_bytes, dtype=dtype).astype(np.int16)
else:
adc = np.frombuffer(raw_bytes, dtype=np.int8).copy()
# Clamp to declared sample count
if len(adc) > n_pts > 0:
adc = adc[:n_pts]
n_pts = len(adc)
volts = (vertical_gain * adc.astype(np.float64) - vertical_offset).astype(np.float32)
# MAX_VALUE and MIN_VALUE are ADC counts in both LECROY_1_0 and LECROY_2_3.
# volt_per_div = (adc_range) * volts_per_count / 8 divisions
max_value = float(wd.max_value)
min_value = float(wd.min_value)
adc_range = max_value - min_value
volt_per_div = adc_range * abs(vertical_gain) / 8.0 if adc_range != 0 else 1.0
# Build normalized objects
obj = LeCroyWaveform()
h = obj.header
h.model = model_str or "LeCroy"
h.n_pts = n_pts
h.x_origin = float(wd.horiz_offset)
h.x_increment = float(wd.horiz_interval)
ch = h.ch[slot]
ch.name = ch_name
ch.enabled = True
ch.coupling = coupling_str
ch.probe_value = probe_att
ch.volt_per_division = volt_per_div
ch.volt_scale = vertical_gain
ch.volt_offset = vertical_offset
h.channel_data[slot] = volts
# raw_data stored as uint8 (high byte for 16-bit, direct cast for 8-bit)
if is_16bit:
raw16 = adc.astype(np.int16).view(np.uint16)
h.raw_data[slot] = (raw16 >> 8).astype(np.uint8)
else:
h.raw_data[slot] = adc.view(np.uint8)
return obj