"""
Adapter layer for Rigol MSO5000-family binary waveform exports.
`RigolWFM.rigol_mso5000_bin` is the generated low-level Kaitai parser for the
exported `.bin` container, but the rest of RigolWFM expects a normalized object
with fixed analog channel slots, common timing fields, and decoded sample
arrays.
This module performs that normalization for standard RG01 captures, including
analog float32 records and logic-analyzer records such as the checked-in
`MSO5074-C.bin` fixture.
"""
import os
from enum import IntEnum
from typing import Any, Optional
import numpy as np
import numpy.typing as npt
import RigolWFM.rigol_mso5000_bin
[docs]
class UnitEnum(IntEnum):
"""Unit types used by MSO5000 waveform exports."""
unknown = 0
v = 1
s = 2
constant = 3
a = 4
db = 5
hz = 6
def _channel_slot(label: str, fallback: int) -> Optional[int]:
"""Map channel labels like `CH2` to zero-based channel slots."""
label_upper = label.upper()
if label_upper.startswith("LA"):
return None
if label_upper.startswith("CH"):
try:
index = int(label_upper[2:]) - 1
if 0 <= index < 4:
return index
except ValueError:
pass
return fallback
def _estimate_volts_per_division(values: npt.NDArray[np.float32]) -> float:
"""Estimate a readable volts/division from calibrated samples."""
if values.size == 0:
return 1.0
finite = values[np.isfinite(values)]
if finite.size == 0:
return 1.0
span = float(np.max(finite) - np.min(finite))
if span <= 0:
return max(abs(float(finite[0])) / 4.0, 1e-3)
return max(span / 8.0, 1e-3)
def _proxy_raw(values: npt.NDArray[np.float32]) -> npt.NDArray[np.uint8]:
"""Create a stable uint8 proxy for calibrated volt samples."""
if values.size == 0:
return np.empty((0,), dtype=np.uint8)
finite = values[np.isfinite(values)]
if finite.size == 0:
return np.full(values.shape, 127, dtype=np.uint8)
low = float(np.min(finite))
high = float(np.max(finite))
if high <= low:
return np.full(values.shape, 127, dtype=np.uint8)
center = (high + low) / 2.0
half_span = max((high - low) / 2.0, 1e-12)
raw = 127.0 - 127.0 * (values.astype(np.float64) - center) / half_span
return np.clip(np.rint(raw), 0, 255).astype(np.uint8)
def _model_from_frame(frame_string: str) -> str:
"""Extract the model name from `MODEL:SERIAL` frame strings."""
if ":" in frame_string:
return frame_string.split(":", 1)[0]
return frame_string
def _expected_rg01_file_size(raw: Any) -> int:
"""Return the expected byte size for a standard RG01 file."""
total = 12
for waveform in raw.waveforms:
total += waveform.wfm_header.header_size
total += waveform.data_header.header_size
total += waveform.data_header.buffer_size
return total
def _looks_like_malformed_5074(file_name: str) -> bool:
"""Return True for the old concatenated MSO5074 export style we no longer support."""
with open(file_name, "rb") as handle:
raw = handle.read()
return (
raw.startswith(b"RG01")
and raw.count(b"RG01") > 1
and len(raw) >= 16
and int.from_bytes(raw[12:16], "little") == 144
)
def _validate_standard_rg01_layout(file_name: str, raw: Any) -> None:
"""Reject malformed concatenated RG01 files that are not standard MSO5000 exports."""
expected = _expected_rg01_file_size(raw)
actual = os.path.getsize(file_name)
if expected != actual and _looks_like_malformed_5074(file_name):
raise ValueError(
"Unsupported malformed MSO5074 concatenated RG01 export. "
"Use a standard MSO5000/MSO5074 RG01 waveform file instead."
)
def _logic_byte_samples(data_header: Any, data_raw: bytes) -> npt.NDArray[np.uint8]:
"""Decode a logic-analyzer buffer into one byte per sample."""
buffer_type = data_header.buffer_type
bytes_per_point = data_header.bytes_per_point
if int(buffer_type) == 6 and bytes_per_point == 1:
return np.frombuffer(data_raw, dtype=np.uint8).copy()
if int(buffer_type) == 5 and bytes_per_point == 4:
data = np.frombuffer(data_raw, dtype="<f4").copy()
np.nan_to_num(data, copy=False)
return np.clip(data, 0, 255).astype(np.uint8)
raise ValueError(
"Unsupported MSO5000 logic waveform buffer "
f"(buffer_type={int(buffer_type)}, bytes_per_point={bytes_per_point})."
)
def _logic_bit_traces(samples: npt.NDArray[np.uint8], bit_base: int) -> dict[str, npt.NDArray[np.uint8]]:
"""Expand packed logic samples into named digital traces."""
traces: dict[str, npt.NDArray[np.uint8]] = {}
for bit in range(8):
traces[f"D{bit_base + bit}"] = ((samples >> bit) & 1).astype(np.uint8)
return traces
[docs]
def from_file(file_name: str) -> Mso5000Waveform:
"""Parse a Rigol MSO5000 `.bin` file and normalize it for `Wfm.from_file()`."""
RigolMso5000Bin: Any = RigolWFM.rigol_mso5000_bin.RigolMso5000Bin # type: ignore[attr-defined]
try:
raw = RigolMso5000Bin.from_file(file_name)
except Exception as exc:
if _looks_like_malformed_5074(file_name):
raise ValueError(
"Unsupported malformed MSO5074 concatenated RG01 export. "
"Use a standard MSO5000/MSO5074 RG01 waveform file instead."
) from exc
raise
_validate_standard_rg01_layout(file_name, raw)
supported_buffer_types = {
RigolMso5000Bin.BufferTypeEnum.normal_float32,
RigolMso5000Bin.BufferTypeEnum.maximum_float32,
RigolMso5000Bin.BufferTypeEnum.minimum_float32,
RigolMso5000Bin.BufferTypeEnum.time_float32,
RigolMso5000Bin.BufferTypeEnum.counts_float32,
}
obj = Mso5000Waveform()
header = obj.header
header.cookie = raw.file_header.cookie.decode("ascii", errors="ignore")
header.version = raw.file_header.version
header.n_waveforms = raw.file_header.n_waveforms
header.ch = [ChannelHeader(f"CH{i + 1}", enabled=False) for i in range(4)]
analog_slot = 0
logic_slot = 0
for waveform in raw.waveforms:
wfm_header = waveform.wfm_header
data_header = waveform.data_header
label = wfm_header.waveform_label.strip()
waveform_type = int(wfm_header.waveform_type)
buffer_type = data_header.buffer_type
point_count = int(wfm_header.n_pts)
if header.n_pts == 0:
header.n_pts = point_count
header.x_increment = wfm_header.x_increment
header.x_origin = wfm_header.x_origin
header.x_display_range = wfm_header.x_display_range
header.model = _model_from_frame(wfm_header.frame_string)
if (
waveform_type == RigolMso5000Bin.WaveformTypeEnum.logic
or label.upper().startswith("LA")
or buffer_type == RigolMso5000Bin.BufferTypeEnum.digital_u8
):
samples = _logic_byte_samples(data_header, waveform.data_raw)
if len(samples) != point_count:
raise ValueError(
"MSO5000 logic waveform point count does not match the RG01 header. "
f"Expected {point_count}, found {len(samples)}."
)
bit_base = 8 * logic_slot
obj.logic_channels.update(_logic_bit_traces(samples, bit_base))
logic_slot += 1
obj.logic_x_increment = wfm_header.x_increment
obj.logic_x_origin = wfm_header.x_origin
continue
if buffer_type not in supported_buffer_types or data_header.bytes_per_point != 4:
raise ValueError(
"Unsupported MSO5000 waveform buffer "
f"(buffer_type={int(buffer_type)}, "
f"bytes_per_point={data_header.bytes_per_point}). "
"Only float32 analog buffers are currently supported."
)
slot = _channel_slot(label, analog_slot)
if slot is None or slot >= 4:
continue
data = np.frombuffer(waveform.data_raw, dtype="<f4").copy()
np.nan_to_num(data, copy=False)
raw_proxy = _proxy_raw(data)
if len(data) != point_count:
raise ValueError(
"MSO5000 analog waveform point count does not match the RG01 header. "
f"Expected {point_count}, found {len(data)}."
)
if analog_slot > 0 and len(data) != header.n_pts:
raise ValueError(
"MSO5000 analog channels have mismatched point counts. " f"Expected {header.n_pts}, found {len(data)}."
)
unit_code = getattr(wfm_header.y_units, "value", int(wfm_header.y_units))
ch_name = label or f"CH{slot + 1}"
channel = ChannelHeader(ch_name, enabled=True, unit_code=unit_code)
channel.volt_per_division = _estimate_volts_per_division(data)
header.ch[slot] = channel
header.channel_data[slot] = data
header.raw_data[slot] = raw_proxy
analog_slot += 1
if logic_slot == 1:
obj.logic_mapping = "LA D7-D0"
elif logic_slot == 2:
obj.logic_mapping = "LA D7-D0 + LA D15-D8"
elif logic_slot > 2:
obj.logic_mapping = f"{logic_slot} explicit logic byte lanes"
return obj