Source code for RigolWFM.agilent

"""
Adapter layer for Agilent / Keysight oscilloscope binary waveform exports.

This module bridges the generated `agilent_agxx_bin` Kaitai parser and the rest of
RigolWFM. The container layout is described by the checked-in vendor parsers,
the wavebin sample corpus, and the Agilent 6000 / InfiniiVision 2000 manuals.

The `AGxx` file family stores one or more waveform records in a simple binary
container. Analog traces are already calibrated and stored as float32 sample
arrays. Peak Detect captures can store multiple analog buffers per waveform
header, and segmented-memory captures can repeat the same channel label for
multiple segments.
"""

import math
from typing import Any, Optional

import numpy as np
import numpy.typing as npt

import RigolWFM.agilent_agxx_bin
from RigolWFM.mso5000 import ChannelHeader, _estimate_volts_per_division, _proxy_raw






[docs] class AgilentWaveform: """Normalized parser result consumed by `RigolWFM.channel.Channel`.""" header: Header def __init__(self) -> None: """Initialize the normalized wrapper.""" self.header = Header() @property def parser_name(self) -> str: """Return the normalized parser name used by `Wfm.from_file()`.""" return "agilent_bin" def __str__(self) -> str: """Return a parser tag compatible with the rest of `Wfm.from_file()`.""" return f"x.{self.parser_name}"
def _frame_parts(frame_string: str) -> tuple[str, str]: """Split `MODEL:SERIAL` frame strings into their components.""" cleaned = frame_string.strip() if ":" in cleaned: model, serial = cleaned.split(":", 1) return model.strip(), serial.strip() return cleaned, "" def _channel_slot(label: str, fallback: int) -> Optional[int]: """Map waveform labels onto zero-based analog channel slots.""" label_upper = label.upper().strip() if not label_upper: return fallback if label_upper in {"EXT", "EXTERNAL"} or label_upper.startswith("LA"): return None if label_upper.startswith("CH"): suffix = label_upper[2:] else: suffix = label_upper try: index = int(suffix) - 1 except ValueError: return fallback if 0 <= index < 4: return index return None def _normalized_channel_name(label: str, slot: int) -> str: """Return a stable public-facing channel name.""" cleaned = label.strip().upper() if cleaned.startswith("CH"): return cleaned if cleaned.isdigit(): return f"CH{cleaned}" return cleaned or f"CH{slot + 1}" def _is_segmented_waveform(wfm_header: Any) -> bool: """Return True when the waveform header represents a saved segment.""" return bool(wfm_header.segment_index) or not math.isclose(wfm_header.time_tag, 0.0, abs_tol=1e-15)
[docs] def from_file(file_name: str) -> AgilentWaveform: """Parse an Agilent / Keysight `AGxx` `.bin` file and normalize it.""" AgilentAgxxBin: Any = RigolWFM.agilent_agxx_bin.AgilentAgxxBin # type: ignore[attr-defined] raw = AgilentAgxxBin.from_file(file_name) supported_buffer_types = { AgilentAgxxBin.BufferTypeEnum.normal_float32, AgilentAgxxBin.BufferTypeEnum.maximum_float32, AgilentAgxxBin.BufferTypeEnum.minimum_float32, } ignored_buffer_types = { AgilentAgxxBin.BufferTypeEnum.counts_i32, AgilentAgxxBin.BufferTypeEnum.logic_u8, AgilentAgxxBin.BufferTypeEnum.digital_u8, } obj = AgilentWaveform() header = obj.header header.cookie = raw.file_header.cookie.decode("ascii", errors="ignore") header.version = raw.file_header.version header.n_waveforms = raw.file_header.n_waveforms analog_found = 0 analog_slot = 0 for waveform in raw.waveforms: wfm_header = waveform.wfm_header label = wfm_header.waveform_label.strip() waveform_type = int(wfm_header.waveform_type) if _is_segmented_waveform(wfm_header): raise ValueError( "Segmented Agilent/Keysight captures are not yet supported by " "the normalized parser. Use RigolWFM.agilent_agxx_bin.AgilentAgxxBin for " "low-level access to per-segment waveforms." ) analog_buffers = [] for buffer in waveform.buffers: data_header = buffer.data_header buffer_type = data_header.buffer_type is_digital = waveform_type == AgilentAgxxBin.WaveformTypeEnum.logic or buffer_type in ignored_buffer_types if is_digital: continue if data_header.bytes_per_point != 4 or buffer_type not in supported_buffer_types: raise ValueError( "Unsupported Agilent/Keysight waveform buffer " f"(buffer_type={int(buffer_type)}, " f"bytes_per_point={data_header.bytes_per_point}). " "Only normal, minimum, and maximum float32 waveform buffers " "are supported." ) analog_buffers.append(buffer) if not analog_buffers: continue if len(analog_buffers) > 1: raise ValueError( "Peak Detect / multi-buffer Agilent/Keysight waveforms are not " "yet supported by the normalized parser. Use " "RigolWFM.agilent_agxx_bin.AgilentAgxxBin to inspect all buffers." ) slot = _channel_slot(label, analog_slot) if slot is None or slot >= 4: continue if header.channel_data[slot] is not None: raise ValueError( "Multiple Agilent/Keysight waveform records map to the same " "analog channel slot. Repeated channel labels usually indicate " "segmented-memory output, which is not yet normalized by " "Wfm.from_file()." ) analog_buffer = analog_buffers[0] data = np.frombuffer(analog_buffer.data_raw, dtype="<f4").copy() if len(data) != wfm_header.n_pts: raise ValueError( "Agilent/Keysight waveform point count does not match its " f"float32 buffer length. Header reports {wfm_header.n_pts}, " f"buffer contains {len(data)} samples." ) np.nan_to_num(data, copy=False) raw_proxy = _proxy_raw(data) if header.n_pts == 0: header.n_pts = len(data) header.x_origin = wfm_header.x_origin header.x_increment = wfm_header.x_increment header.x_display_range = wfm_header.x_display_range model, serial = _frame_parts(wfm_header.frame_string) header.model = model or "Keysight" header.serial_number = serial elif len(data) != header.n_pts: raise ValueError( "Agilent/Keysight analog channels have mismatched point counts. " f"Expected {header.n_pts}, found {len(data)}." ) unit_code = getattr(wfm_header.y_units, "value", int(wfm_header.y_units)) channel = ChannelHeader(_normalized_channel_name(label, slot), enabled=True, unit_code=unit_code) channel.volt_per_division = _estimate_volts_per_division(data) header.ch[slot] = channel header.x_origins[slot] = wfm_header.x_origin header.x_increments[slot] = wfm_header.x_increment header.channel_data[slot] = data header.raw_data[slot] = raw_proxy analog_found += 1 analog_slot += 1 if analog_found == 0: raise ValueError("No supported analog waveform records were found in this Agilent/Keysight capture.") return obj