Source code for RigolWFM.yokogawa_hdr

"""
Parser for Yokogawa DL/WE-series oscilloscope .hdr companion metadata files.

The Yokogawa DL/WE-series oscilloscopes produce a two-file waveform package:

  <name>.hdr  — **this file**: ASCII text key/value metadata
  <name>.wvf  — companion flat binary sample data

This module parses the .hdr file into a structured :class:`HdrInfo` object
that supplies every parameter needed to read and calibrate the .wvf binary
data (byte order, data layout, per-trace calibration, time axis, etc.).

Typical usage::

    from RigolWFM.yokogawa_hdr import parse_hdr, wvf_byte_offset

    hdr = parse_hdr("capture.hdr")
    print(hdr.model, hdr.endian, hdr.data_format)

    g, t, b = 0, 0, 0                      # group, trace, block indices (0-origin)
    trace = hdr.groups[g].traces[t]
    off = wvf_byte_offset(hdr, g, t, b)    # byte offset into .wvf file
    dtype = hdr.byte_order + trace.v_data_type.numpy_dtype

Calibration:
    volts[i] = v_resolution * raw[i] + v_offset

Time axis (0-origin sample index i):
    t[i] = h_offset + h_resolution * i

References:
    Yokogawa Electric Corporation, IM 707713-61E (WVF File Access Toolkit).
    Yokogawa DL1640 User Manual, Appendix 3.
    Erik Benkler, "wvfread v1.7", Physikalisch-Technische Bundesanstalt, 2011.
"""

import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional

import numpy as np
import numpy.typing as npt

# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------


[docs] @dataclass class VDataTypeInfo: """Decoded vertical data type for one trace (from the VDataType .hdr field).""" raw: str # original token, e.g. "IS2", "IU2", "FS4", "B16" byte_num: int # bytes per ADC sample numpy_dtype: str # endian-neutral numpy dtype string, e.g. "i2", "u2", "f4" is_logic: bool # True for B-type (raw logic bit-fields, e.g. DL750 digital channels)
[docs] @dataclass class TraceInfo: """Per-trace metadata within one group.""" name: str = "" block_size: int = 0 # samples per block v_resolution: float = 1.0 # volts per ADC count (ScaleA) v_offset: float = 0.0 # volt offset (ScaleB) v_data_type: Optional[VDataTypeInfo] = None v_unit: str = "V" v_plus_over: Optional[int] = None # ADC value meaning upper overrange v_minus_over: Optional[int] = None # ADC value meaning lower overrange v_illegal: Optional[int] = None # ADC value meaning invalid/hidden sample v_max: Optional[int] = None # maximum valid ADC value v_min: Optional[int] = None # minimum valid ADC value h_resolution: float = 1e-9 # seconds per sample (SamplingInterval) h_offset: float = 0.0 # time of first sample relative to trigger (s) h_unit: str = "s" dates: list[str] = field(default_factory=list) # one entry per block times: list[str] = field(default_factory=list) # one entry per block
[docs] @dataclass class GroupInfo: """Per-group metadata (one $Group<N> section).""" trace_number: int = 0 block_number: int = 0 traces: list[TraceInfo] = field(default_factory=list)
[docs] @dataclass class HdrInfo: """Complete parsed contents of a Yokogawa .hdr metadata file.""" format_version: str = "" model: str = "" endian: str = "Ltl" # "Big" (Motorola) or "Ltl" (Intel x86) data_format: str = "TRACE" # "TRACE" or "BLOCK" layout in .wvf group_number: int = 0 trace_total_number: int = 0 data_offset: int = 0 # leading unused bytes in .wvf groups: list[GroupInfo] = field(default_factory=list) @property def is_big_endian(self) -> bool: """Return True when the .wvf binary data uses big-endian byte order.""" return self.endian.upper().startswith("B") @property def byte_order(self) -> str: """Numpy / struct byte-order prefix: ``'>'`` (BE) or ``'<'`` (LE).""" return ">" if self.is_big_endian else "<"
# --------------------------------------------------------------------------- # Public entry points # ---------------------------------------------------------------------------
[docs] def parse_hdr(path: str) -> HdrInfo: """Parse a Yokogawa .hdr file and return a structured :class:`HdrInfo`. Args: path: Path to the ``.hdr`` file. Returns: Populated :class:`HdrInfo` instance. Raises: FileNotFoundError: If *path* does not exist. ValueError: If a required field is missing or malformed. """ text = Path(path).read_text(encoding="ascii", errors="replace") return parse_hdr_text(text)
[docs] def parse_hdr_text(text: str) -> HdrInfo: """Parse .hdr content already loaded as a string. Useful when the text has been fetched from a URL or passed as a JS string in a web viewer. Args: text: Full ASCII contents of the .hdr file. Returns: Populated :class:`HdrInfo` instance. """ public_raw, group_raws = _split_sections(text.splitlines()) return _build_info(public_raw, group_raws)
# --------------------------------------------------------------------------- # Byte-offset calculator # ---------------------------------------------------------------------------
[docs] def wvf_byte_offset(hdr: HdrInfo, group: int, trace: int, block: int) -> int: """Return the byte offset of a (group, trace, block) slice in the .wvf file. All indices are **0-origin**. Args: hdr: Parsed :class:`HdrInfo`. group: Group index (0 … GroupNumber-1). trace: Trace index within the group (0 … TraceNumber-1). block: Block index within the group (0 … BlockNumber-1). Returns: Byte offset from the start of the .wvf file. Raises: ValueError: If *hdr.data_format* is not ``"TRACE"`` or ``"BLOCK"``. """ fmt = hdr.data_format.upper() off = hdr.data_offset if fmt == "TRACE": return off + _trace_offset(hdr, group, trace, block) if fmt == "BLOCK": return off + _block_offset(hdr, group, trace, block) raise ValueError(f"Unknown DataFormat: {hdr.data_format!r}")
def _trace_offset(hdr: HdrInfo, tgt_g: int, tgt_t: int, tgt_b: int) -> int: """Byte offset for TRACE layout (all blocks of each trace stored together).""" off = 0 for g, grp in enumerate(hdr.groups): nb = grp.block_number for t, tr in enumerate(grp.traces): w = tr.v_data_type.byte_num if tr.v_data_type else 2 s = tr.block_size chunk = s * nb * w if g == tgt_g and t == tgt_t: return off + tgt_b * s * w off += chunk raise ValueError(f"(group={tgt_g}, trace={tgt_t}) not found in HdrInfo") def _block_offset(hdr: HdrInfo, tgt_g: int, tgt_t: int, tgt_b: int) -> int: """Byte offset for BLOCK layout (all traces within a block stored together).""" off = 0 for g, grp in enumerate(hdr.groups): nb = grp.block_number if g < tgt_g: for tr in grp.traces: w = tr.v_data_type.byte_num if tr.v_data_type else 2 off += tr.block_size * nb * w continue for b in range(nb): for t, tr in enumerate(grp.traces): w = tr.v_data_type.byte_num if tr.v_data_type else 2 s = tr.block_size if b == tgt_b and t == tgt_t: return off off += s * w raise ValueError(f"(group={tgt_g}, trace={tgt_t}, block={tgt_b}) not found in HdrInfo") # --------------------------------------------------------------------------- # Section splitter # --------------------------------------------------------------------------- # Stores: key → (original_line_with_leading_spaces, value_after_key) _SectionDict = dict[str, tuple[str, str]] def _split_sections( lines: list[str], ) -> tuple[_SectionDict, list[_SectionDict]]: """Partition .hdr lines into ``$PublicInfo`` and ``$Group<N>`` dicts. Each dict maps ``key → (original_line, value_after_key)``. The original line is preserved so column-position alignment (used for VUnit, HUnit, and optional numeric fields) can be reproduced exactly. ``$PrivateInfo`` and any other sections are silently ignored. """ public: _SectionDict = {} groups: list[_SectionDict] = [] current: Optional[_SectionDict] = None for raw_line in lines: line = raw_line.rstrip("\r") stripped = line.strip() if not stripped: continue if stripped.startswith("$"): if stripped == "$PublicInfo": current = public elif re.match(r"^\$Group\d+$", stripped): current = {} groups.append(current) else: current = None # e.g. $PrivateInfo — skip continue if current is None: continue m = re.match(r"(\S+)(.*)", line) if not m: continue key = m.group(1) rest = m.group(2).strip() # First occurrence wins (duplicates in same section are ignored) if key not in current: current[key] = (line, rest) return public, groups # --------------------------------------------------------------------------- # VDataType parsing # --------------------------------------------------------------------------- def _parse_vdtype(code: str) -> VDataTypeInfo: """Decode one VDataType token string. Supported codes (from IM 707713-61E): ``IS<n>`` signed integer, n bytes (IS1, IS2, IS4) ``IU<n>`` unsigned integer, n bytes (IU1, IU2, IU4) ``FS<n>`` IEEE 754 float, n bytes (FS4, FS8) ``FU<n>`` unsigned float, n bytes (treated identically to FS<n>) ``B<m>`` logic bit-field, m bytes (B2, B4, B8, B16) """ code = code.strip() if not code: raise ValueError("Empty VDataType code") kind = code[0].upper() if kind in ("I", "F"): if len(code) < 3: raise ValueError(f"Malformed VDataType: {code!r}") sub = code[1].upper() byte_num = int(code[2:]) if kind == "I": base = "i" if sub == "S" else "u" else: base = "f" return VDataTypeInfo(raw=code, byte_num=byte_num, numpy_dtype=f"{base}{byte_num}", is_logic=False) if kind == "B": byte_num = int(code[1:]) # B16 = 16-byte / 128-bit words; no native numpy dtype; use void numpy_dtype = f"V{byte_num}" if byte_num > 8 else f"u{byte_num}" return VDataTypeInfo(raw=code, byte_num=byte_num, numpy_dtype=numpy_dtype, is_logic=True) raise ValueError(f"Unknown VDataType code: {code!r}") # --------------------------------------------------------------------------- # Column-position helpers (replicate MATLAB rowpos logic from hdrread.m) # --------------------------------------------------------------------------- def _vdtype_col_positions(vdtype_line: str) -> list[int]: """Return the starting column index of each VDataType token in *vdtype_line*. This mirrors the MATLAB ``rowpos`` array that ``hdrread.m`` uses to column-align ``VUnit``, ``HUnit``, and the optional overrange fields. """ return [m.start() for m in re.finditer(r"[IFBifb]\w*", vdtype_line)] def _col_aligned_values( line: Optional[str], col_positions: list[int], n_traces: int, ) -> list[Optional[str]]: """Extract per-trace tokens from *line* by column position. For each trace, returns the whitespace-delimited token that starts at the same column as the corresponding VDataType token in the VDataType line. Returns ``None`` for traces whose column has no matching token. """ if not line: return [None] * n_traces tokens: dict[int, str] = {m.start(): m.group() for m in re.finditer(r"\S+", line)} return [tokens.get(col) for col in col_positions[:n_traces]] def _try_int(s: Optional[str]) -> Optional[int]: """Parse *s* as int; return ``None`` for ``None``, ``'?'``, or non-numeric.""" if s is None: return None try: return int(s) except (ValueError, TypeError): return None # --------------------------------------------------------------------------- # Main builder # --------------------------------------------------------------------------- def _req(d: _SectionDict, key: str) -> str: if key not in d: raise ValueError(f".hdr file is missing required field: {key!r}") return d[key][1] def _opt_val(d: _SectionDict, key: str) -> Optional[str]: e = d.get(key) return e[1] if e else None def _opt_line(d: _SectionDict, key: str) -> Optional[str]: e = d.get(key) return e[0] if e else None def _build_info(public: _SectionDict, group_raws: list[_SectionDict]) -> HdrInfo: info = HdrInfo() info.format_version = _req(public, "FormatVersion") info.model = _req(public, "Model") info.endian = _req(public, "Endian") info.data_format = _req(public, "DataFormat") info.group_number = int(_req(public, "GroupNumber")) info.trace_total_number = int(_req(public, "TraceTotalNumber")) info.data_offset = int(_req(public, "DataOffset")) for g_idx, gd in enumerate(group_raws): grp = _build_group(g_idx, gd) info.groups.append(grp) return info def _build_group(g_idx: int, gd: _SectionDict) -> GroupInfo: """Parse one $Group section dict into a GroupInfo.""" grp = GroupInfo() grp.trace_number = int(_req(gd, "TraceNumber")) grp.block_number = int(_req(gd, "BlockNumber")) n = grp.trace_number nb = grp.block_number grp.traces = [TraceInfo() for _ in range(n)] _assign_str(grp.traces, n, _req(gd, "TraceName"), "name") _assign_int(grp.traces, n, _req(gd, "BlockSize"), "block_size") _assign_float(grp.traces, n, _req(gd, "VResolution"), "v_resolution") _assign_float(grp.traces, n, _req(gd, "VOffset"), "v_offset") # --- VDataType: parse codes and capture column positions --- vdtype_entry = gd.get("VDataType") if not vdtype_entry: raise ValueError(f".hdr Group{g_idx + 1} missing VDataType") vdtype_line, vdtype_val = vdtype_entry col_pos = _vdtype_col_positions(vdtype_line) for i, tok in enumerate(vdtype_val.split()[:n]): try: grp.traces[i].v_data_type = _parse_vdtype(tok) except ValueError: pass # --- Column-aligned string fields --- _assign_col_str(grp.traces, n, _opt_line(gd, "VUnit"), col_pos, "VUnit", "v_unit") _assign_col_str(grp.traces, n, _opt_line(gd, "HUnit"), col_pos, "HUnit", "h_unit") # --- Column-aligned optional integer fields --- _assign_col_int(grp.traces, n, _opt_line(gd, "VPlusOverData"), col_pos, "v_plus_over") _assign_col_int(grp.traces, n, _opt_line(gd, "VMinusOverData"), col_pos, "v_minus_over") _assign_col_int(grp.traces, n, _opt_line(gd, "VIllegalData"), col_pos, "v_illegal") _assign_col_int(grp.traces, n, _opt_line(gd, "VMaxData"), col_pos, "v_max") _assign_col_int(grp.traces, n, _opt_line(gd, "VMinData"), col_pos, "v_min") # --- HResolution / HOffset --- _assign_float(grp.traces, n, _req(gd, "HResolution"), "h_resolution") _assign_float(grp.traces, n, _req(gd, "HOffset"), "h_offset") # --- Per-block Date / Time --- for tr in grp.traces: tr.dates = [""] * nb tr.times = [""] * nb # Files with 1 block use key "Date" (or occasionally "Date1"). # Files with >1 blocks use "Date1", "Date2", ... # Newer DL750 (v6.22+) may write only "Date" even with multiple blocks. for b_idx in range(nb): if nb == 1: d_key = "Date" if "Date" in gd else "Date1" t_key = "Time" if "Time" in gd else "Time1" else: d_key = f"Date{b_idx + 1}" t_key = f"Time{b_idx + 1}" if b_idx == 0 and d_key not in gd: d_key = "Date" if b_idx == 0 and t_key not in gd: t_key = "Time" d_val = _opt_val(gd, d_key) t_val = _opt_val(gd, t_key) if d_val: for i, tok in enumerate(d_val.split()[:n]): grp.traces[i].dates[b_idx] = tok if t_val: for i, tok in enumerate(t_val.split()[:n]): grp.traces[i].times[b_idx] = tok return grp # --------------------------------------------------------------------------- # Trace-level assignment helpers (module-level to avoid cell-var-from-loop) # --------------------------------------------------------------------------- def _assign_str(traces: list[TraceInfo], n: int, val: str, attr: str) -> None: for i, tok in enumerate(val.split()[:n]): setattr(traces[i], attr, tok) def _assign_float(traces: list[TraceInfo], n: int, val: str, attr: str) -> None: for i, tok in enumerate(val.split()[:n]): setattr(traces[i], attr, float(tok)) def _assign_int(traces: list[TraceInfo], n: int, val: str, attr: str) -> None: for i, tok in enumerate(val.split()[:n]): setattr(traces[i], attr, int(tok)) def _assign_col_str( traces: list[TraceInfo], n: int, line: Optional[str], col_pos: list[int], key: str, attr: str, ) -> None: vals = _col_aligned_values(line, col_pos, n) for i, val in enumerate(vals): if val and val != key: setattr(traces[i], attr, val) def _assign_col_int( traces: list[TraceInfo], n: int, line: Optional[str], col_pos: list[int], attr: str, ) -> None: vals = _col_aligned_values(line, col_pos, n) for i, val in enumerate(vals): v = _try_int(val) if v is not None: setattr(traces[i], attr, v) # --------------------------------------------------------------------------- # Normalized waveform wrapper for channel.py consumption # ---------------------------------------------------------------------------
[docs] class YokogawaWvfHeader: """Normalized header for a Yokogawa .hdr+.wvf pair consumed by :class:`channel.Channel`.""" def __init__( self, hdr: HdrInfo, channel_data: list[Optional[npt.NDArray[np.float64]]], raw_data: list[Optional[npt.NDArray[np.uint8]]], ) -> None: """Initialize from parsed HdrInfo and pre-decoded channel arrays.""" self._hdr = hdr self.channel_data = channel_data self.raw_data = raw_data tr = self._first_trace() self.x_increment: float = tr.h_resolution if tr else 1e-9 self.x_origin: float = tr.h_offset if tr else 0.0 def _first_trace(self) -> Optional[TraceInfo]: if self._hdr.groups: g = self._hdr.groups[0] if g.traces: return g.traces[0] return None @property def model(self) -> str: """Instrument model string.""" return self._hdr.model or "Yokogawa" @property def model_number(self) -> str: """Alias for :attr:`model`.""" return self.model @property def firmware_version(self) -> str: """Yokogawa .hdr files do not embed firmware information.""" return "unknown" @property def points(self) -> int: """Number of samples per block (from the first trace).""" tr = self._first_trace() return tr.block_size if tr else 0 @property def time_scale(self) -> float: """Estimated time per division (total capture / 10 divisions).""" if self.points > 0 and self.x_increment > 0: return self.points * self.x_increment / 10.0 return 1e-3
[docs] class YokogawaWvfWaveform: """Normalized Yokogawa .hdr+.wvf parser result consumed by :class:`channel.Channel`.""" def __init__(self, header: YokogawaWvfHeader) -> None: """Wrap a :class:`YokogawaWvfHeader`.""" self.header = header @property def parser_name(self) -> str: """Parser tag recognized by :meth:`channel.Channel.__init__`.""" return "yokogawa_wvf" def __str__(self) -> str: """Return a parser tag string for use by :meth:`~RigolWFM.wfm.Wfm.from_file`.""" return f"x.{self.parser_name}"
def _decode_trace_samples( wvf_bytes: bytes, hdr: HdrInfo, group: int, trace_idx: int, ) -> tuple[Optional[npt.NDArray[np.float64]], Optional[npt.NDArray[np.uint8]]]: """Decode one trace's block-0 samples from the .wvf binary. Returns ``(volts, raw_u8)``; both are ``None`` if the trace has no samples. """ grp = hdr.groups[group] tr = grp.traces[trace_idx] if tr.v_data_type is None or tr.block_size == 0: return None, None byte_off = wvf_byte_offset(hdr, group, trace_idx, 0) dtype_info = tr.v_data_type n = tr.block_size nbytes = n * dtype_info.byte_num if byte_off + nbytes > len(wvf_bytes): return None, None bo = hdr.byte_order # '>' or '<' raw_np = np.frombuffer(wvf_bytes, dtype=np.dtype(bo + dtype_info.numpy_dtype), count=n, offset=byte_off) # Apply voltage calibration: volts = v_resolution * raw + v_offset volts = tr.v_resolution * raw_np.astype(np.float64) + tr.v_offset # Synthesize a uint8 "raw" array scaled to 0–254 for display v_range = np.ptp(volts) if v_range > 0: raw_u8 = np.clip(((volts - volts.min()) / v_range * 254), 0, 254).astype(np.uint8) else: raw_u8 = np.full(n, 127, dtype=np.uint8) return volts, raw_u8
[docs] def from_hdr_file(path: str) -> YokogawaWvfWaveform: """Parse a Yokogawa ``.hdr`` file and load the companion ``.wvf`` binary. Args: path: Path to the ``.hdr`` metadata file. The companion ``.wvf`` file must exist alongside it with the same base name. Returns: :class:`YokogawaWvfWaveform` ready for use with :class:`~RigolWFM.channel.Channel`. Raises: FileNotFoundError: If the ``.hdr`` or ``.wvf`` file does not exist. ValueError: If a required header field is missing or malformed. """ hdr_path = Path(path) wvf_path = hdr_path.with_suffix(".wvf") if not wvf_path.exists(): # Try case-insensitive match parent = hdr_path.parent stem = hdr_path.stem matches = list(parent.glob(stem + ".[Ww][Vv][Ff]")) if not matches: raise FileNotFoundError(f"Companion .wvf file not found for '{path}'") wvf_path = matches[0] hdr = parse_hdr(str(hdr_path)) wvf_bytes = wvf_path.read_bytes() # Collect up to 4 traces from group 0 (primary group) channel_data: list[Optional[npt.NDArray[np.float64]]] = [None] * 4 raw_data: list[Optional[npt.NDArray[np.uint8]]] = [None] * 4 if hdr.groups: grp = hdr.groups[0] for t_idx, _ in enumerate(grp.traces[:4]): volts, raw_u8 = _decode_trace_samples(wvf_bytes, hdr, 0, t_idx) channel_data[t_idx] = volts raw_data[t_idx] = raw_u8 header = YokogawaWvfHeader(hdr, channel_data, raw_data) return YokogawaWvfWaveform(header)