Source code for RigolWFM.rigol

"""
Rigol-specific scope lists, trigger helpers, and file-parser dispatch.

This module owns everything that is unique to Rigol oscilloscope families:
the model-string lists used for dispatch, the trigger-metadata helpers, and the
``parse_file()`` entry-point that ``wfm.Wfm.from_file()`` delegates to for all
Rigol families.

Non-Rigol vendors (LeCroy, Tektronix) are handled directly in ``wfm.py``.
"""

from typing import Any, Optional

import RigolWFM.channel
import RigolWFM.dho
import RigolWFM.mso5000
import RigolWFM.mso7000_8000
import RigolWFM.rigol_1000b_wfm
import RigolWFM.rigol_1000c_wfm
import RigolWFM.rigol_1000e_wfm
import RigolWFM.rigol_1000z_wfm
import RigolWFM.rigol_2000_wfm
import RigolWFM.rigol_4000_wfm
import RigolWFM.rigol_6000_wfm

# ---------------------------------------------------------------------------
# Scope-family model-string lists
# ---------------------------------------------------------------------------

# in progress
DS1000B_scopes: list[str] = ["B", "1000B", "DS1000B", "DS1074B", "DS1104B", "DS1204B"]

# tested
DS1000C_scopes: list[str] = [
    "C",
    "1000C",
    "DS1000C",
    "DS1000CD",
    "DS1000MD",
    "DS1000M",
    "DS1302CA",
    "DS1202CA",
    "DS1102CA",
    "DS1062CA",
    "DS1042C",
]

# tested
DS1000D_scopes: list[str] = ["D", "1000D", "DS1000D", "DS1102D", "DS1052D"]

# tested
DS1000E_scopes: list[str] = ["E", "1000E", "DS1000E", "DS1102E", "DS1052E"]

# tested, wonky voltages
DS1000Z_scopes: list[str] = [
    "Z",
    "1000Z",
    "DS1000Z",
    "DS1202Z",
    "DS1054Z",
    "MSO1054Z",
    "DS1074Z",
    "MSO1074Z",
    "DS1074Z-S",
    "DS1104Z",
    "MSO1104Z",
    "DS1104Z-S",
]

# tested
DS2000_scopes: list[str] = [
    "2",
    "2000",
    "DS2000",
    "DS2072A",
    "DS2102A",
    "MSO2102A",
    "MSO2102A-S",
    "DS2202A",
    "MSO2202A",
    "MSO2202A-S",
    "DS2302A",
    "MSO2302A",
    "MSO2302A-S",
]

# tested
DS4000_scopes: list[str] = [
    "4",
    "4000",
    "DS4000",
    "DS4054",
    "DS4052",
    "DS4034",
    "DS4032",
    "DS4024",
    "DS4022",
    "DS4014",
    "DS4012",
    "MSO4054",
    "MSO4052",
    "MSO4034",
    "MSO4032",
    "MSO4024",
    "MSO4022",
    "MSO4014",
    "MSO4012",
]

# untested
DS6000_scopes: list[str] = ["6", "6000", "DS6000", "DS6062", "DS6064", "DS6102", "DS6104"]

# example-backed `.bin` support
DS5000_scopes: list[str] = ["5", "5000", "MSO5000"]

# Alias for MSO5074 standard RG01 captures, parsed with the MSO5000 family code.
MSO5074_scopes: list[str] = ["5074", "MSO5074"]

# manual-backed `.bin` support
DS7000_scopes: list[str] = ["7", "7000", "DS7000", "MSO7000"]

# manual-backed `.bin` support
DS8000_scopes: list[str] = ["8", "8000", "MSO8000"]

# DHO800/DHO1000 series (.bin and .wfm - format detected by file extension)
DHO1000_scopes: list[str] = [
    "DHO",
    "DHO800",
    "DHO1000",
    "DHO804",
    "DHO812",
    "DHO814",
    "DHO824",
    "DHO1072",
    "DHO1074",
    "DHO1102",
    "DHO1202",
    "DHO1204",
]

# Flat list of every Rigol model string, used by detect_model() and valid_scope_list()
ALL_RIGOL_SCOPES: list[list[str]] = [
    DS1000B_scopes,
    DS1000C_scopes,
    DS1000D_scopes,
    DS1000E_scopes,
    DS1000Z_scopes,
    DS2000_scopes,
    DS4000_scopes,
    DS5000_scopes,
    MSO5074_scopes,
    DS7000_scopes,
    DS8000_scopes,
    DS6000_scopes,
    DHO1000_scopes,
]

# ---------------------------------------------------------------------------
# Trigger-metadata helpers (Rigol-specific)
# ---------------------------------------------------------------------------

_SWEEP_NAMES: dict[int, str] = {0: "AUTO", 1: "NORMAL", 2: "SINGLE"}
_COUPLING_NAMES: dict[int, str] = {0: "DC", 1: "LF", 2: "HF", 3: "AC"}
_DS2000_SOURCE_NAMES: dict[int, str] = {
    0: "CH1",
    1: "CH2",
    2: "EXT",
    3: "AC LINE",
    **{4 + i: "D%d" % i for i in range(16)},
}
_DS2000_TRIGGER_MODE_NAMES: dict[int, str] = {
    30: "Edge",
}


def _trig_header_dict(th: Any) -> dict:
    """Extract a KaitaiStruct trigger_header into a plain dict."""
    d: dict = {}
    try:
        d["mode"] = th.mode.name
    except Exception:
        pass
    try:
        d["source"] = th.source.name.upper()
    except Exception:
        pass
    try:
        d["level"] = float(th.level)
    except Exception:
        pass
    try:
        d["sweep"] = _SWEEP_NAMES.get(th.sweep, str(th.sweep))
    except Exception:
        pass
    try:
        d["coupling"] = _COUPLING_NAMES.get(th.coupling, str(th.coupling))
    except Exception:
        pass
    return d


[docs] def describe_trigger_block(d: dict, indent: str = " ") -> str: """Format a trigger info dict as indented text.""" s = "" if "mode" in d: s += "%sMode = %s\n" % (indent, d["mode"]) if "source" in d: s += "%sSource = %s\n" % (indent, d["source"]) if "level" in d: s += "%sLevel = %sV\n" % (indent, RigolWFM.channel.engineering_string(d["level"], 2)) if "sweep" in d: s += "%sSweep = %s\n" % (indent, d["sweep"]) if "coupling" in d: s += "%sCoupling = %s\n" % (indent, d["coupling"]) return s
def _scaled_ds4000_trigger_levels(level_block: Any, probe_values: list[float]) -> dict[str, float]: """Scale a parsed DS4000 trigger-level block into volts.""" raw_levels = [ ("CH1", level_block.ch1_level_uv, probe_values[0]), ("CH2", level_block.ch2_level_uv, probe_values[1]), ("CH3", level_block.ch3_level_uv, probe_values[2]), ("CH4", level_block.ch4_level_uv, probe_values[3]), ("EXT", level_block.ext_level_uv, 1.0), ] return {name: raw_uv * 1.0e-6 * scale for name, raw_uv, scale in raw_levels} def _decode_ds4000_trigger(waveform: Any) -> dict: """Best-effort decode of DS4000 trigger metadata from parsed setup fields.""" setup = getattr(waveform.header, "setup", None) if setup is None: return {} probe_values = [float(channel.probe_value) for channel in waveform.header.ch] modern_levels = getattr(setup, "modern_trigger_levels", None) modern_mode = getattr(setup, "modern_trigger_mode", None) modern_source = getattr(setup, "modern_trigger_source", None) if modern_levels is not None and hasattr(modern_mode, "name") and hasattr(modern_source, "name"): input_levels = _scaled_ds4000_trigger_levels(modern_levels, probe_values) source = modern_source.name.upper() # type: ignore[union-attr] info: dict[str, Any] = { "mode": modern_mode.name.capitalize(), # type: ignore[union-attr] "source": source, "input_levels": input_levels, } if source in input_levels: info["level"] = input_levels[source] return info legacy_levels = getattr(setup, "legacy_trigger_levels", None) if legacy_levels is not None: return {"input_levels": _scaled_ds4000_trigger_levels(legacy_levels, probe_values)} return {} def _decode_ds2000_trigger(waveform: Any) -> dict: """Best-effort decode of DS2000 trigger metadata from parsed setup fields.""" setup = getattr(waveform.header, "setup", None) if setup is None: return {} source_primary = getattr(setup, "trigger_source_primary", None) source_shadow = getattr(setup, "trigger_source_shadow", None) holdoff_ns = int(getattr(setup, "trigger_holdoff_ns", 0) or 0) level_block = getattr(setup, "trigger_levels", None) if level_block is None: return {} input_levels = { "CH1": level_block.ch1_level_uv * 1.0e-6, "CH2": level_block.ch2_level_uv * 1.0e-6, "EXT": level_block.ext_level_uv * 1.0e-6, } has_meaningful_setup = holdoff_ns != 0 or any(level != 0.0 for level in input_levels.values()) if not has_meaningful_setup: return {} info: dict[str, Any] = {"input_levels": input_levels} try: source_code = int(source_primary) # type: ignore[arg-type] if source_shadow is not None and source_code == int(source_shadow): source_name = _DS2000_SOURCE_NAMES.get(source_code) if source_name is not None: info["source"] = source_name if source_name in input_levels: info["level"] = input_levels[source_name] except (TypeError, ValueError): pass mode_code = getattr(setup, "trigger_mode_code", None) try: mode_name = _DS2000_TRIGGER_MODE_NAMES.get(int(mode_code)) # type: ignore[arg-type] if mode_name is not None: info["mode"] = mode_name except (TypeError, ValueError): pass return info # --------------------------------------------------------------------------- # Backward-compatible wrapper # ---------------------------------------------------------------------------
[docs] def dho_from_file(file_name: str) -> RigolWFM.dho.DhoWaveform: """Backward-compatible wrapper around `RigolWFM.dho.from_file()`.""" return RigolWFM.dho.from_file(file_name)
# --------------------------------------------------------------------------- # Unified Rigol parser dispatch # ---------------------------------------------------------------------------
[docs] def parse_file( umodel: str, file_name: str, ) -> Optional[tuple[Any, str, str, dict]]: """Parse a Rigol waveform file and return normalised metadata. Args: umodel: Upper-cased model string (as returned by ``detect_model()`` or supplied by the user). file_name: Path to the waveform file. Returns: ``(w, header_name, serial_number, trigger_info)`` when *umodel* matches a known Rigol family, or ``None`` when it does not (allowing the caller to try non-Rigol vendors). """ header_name = "" serial_number = "" trigger_info: dict = {} if umodel in DS1000B_scopes: w = RigolWFM.rigol_1000b_wfm.Rigol1000bWfm.from_file(file_name) # type: ignore[attr-defined] header_name = "DS1000B" trigger_info = { "mode": w.header.trigger_mode.name, "source": w.header.trigger_source.name.upper(), } elif umodel in DS1000C_scopes: w = RigolWFM.rigol_1000c_wfm.Rigol1000cWfm.from_file(file_name) # type: ignore[attr-defined] header_name = "DS1000C" trigger_info = { "mode": w.header.trigger_mode.name, "source": w.header.trigger_source.name.upper(), } elif umodel in DS1000D_scopes: w = RigolWFM.rigol_1000e_wfm.Rigol1000eWfm.from_file(file_name) # type: ignore[attr-defined] w.parser_name = "wfm1000e" header_name = "DS1000D" _mode = w.header.trigger_mode.name if _mode == "alt": trigger_info = { "mode": "alt", "trigger1": _trig_header_dict(w.header.trigger1), "trigger2": _trig_header_dict(w.header.trigger2), } else: trigger_info = _trig_header_dict(w.header.trigger1) trigger_info["mode"] = _mode elif umodel in DS1000E_scopes: w = RigolWFM.rigol_1000e_wfm.Rigol1000eWfm.from_file(file_name) # type: ignore[attr-defined] header_name = "DS1000E" _mode = w.header.trigger_mode.name if _mode == "alt": trigger_info = { "mode": "alt", "trigger1": _trig_header_dict(w.header.trigger1), "trigger2": _trig_header_dict(w.header.trigger2), } else: trigger_info = _trig_header_dict(w.header.trigger1) trigger_info["mode"] = _mode elif umodel in DS1000Z_scopes: w = RigolWFM.rigol_1000z_wfm.Rigol1000zWfm.from_file(file_name) # type: ignore[attr-defined] header_name = w.preheader.model_number elif umodel in DS2000_scopes: w = RigolWFM.rigol_2000_wfm.Rigol2000Wfm.from_file(file_name) # type: ignore[attr-defined] header_name = "DS2000" serial_number = getattr(w.header, "serial_number", w.header.model_number) trigger_info = _decode_ds2000_trigger(w) elif umodel in DS4000_scopes: w = RigolWFM.rigol_4000_wfm.Rigol4000Wfm.from_file(file_name) # type: ignore[attr-defined] header_name = "DS4000" serial_number = getattr(w.header, "serial_number", w.header.model_number) trigger_info = _decode_ds4000_trigger(w) elif umodel in DS5000_scopes: w = RigolWFM.mso5000.from_file(file_name) header_name = w.header.model_number or "MSO5000" elif umodel in MSO5074_scopes: w = RigolWFM.mso5000.from_file(file_name) header_name = w.header.model_number or "MSO5074" elif umodel in DS7000_scopes: w = RigolWFM.mso7000_8000.from_file(file_name) header_name = w.header.model_number or "MSO7000" elif umodel in DS8000_scopes: w = RigolWFM.mso7000_8000.from_file(file_name) header_name = w.header.model_number or "MSO8000" elif umodel in DS6000_scopes: w = RigolWFM.rigol_6000_wfm.Rigol6000Wfm.from_file(file_name) # type: ignore[attr-defined] header_name = w.header.model_number elif umodel in DHO1000_scopes: w = RigolWFM.dho.from_file(file_name) fallback = "DHO1000" model_hint = w.header.model_number or "" if model_hint.upper().startswith(("DHO8", "HDO8")): fallback = "DHO800" header_name = RigolWFM.dho.family_label( model_hint, is_bin=RigolWFM.dho.is_bin_file(file_name), fallback=fallback, ) else: return None return w, header_name, serial_number, trigger_info