"""
Parse and convert oscilloscope waveform files.
Supports Rigol (DS1000B/C/D/E/Z, DS2000, DS4000, DS6000, MSO5000, MSO5074,
MSO7000/8000, DHO800/DHO1000), Agilent / Keysight (`AGxx` `.bin`), Siglent
(`.bin` waveform revisions), Rohde & Schwarz RTP (`.bin` + `.Wfm.bin`),
Teledyne LeCroy (.trc), Tektronix (.wfm/.isf), and Yokogawa (.wfm) scope
families via ``Wfm.from_file()``.
Example:
>>> import RigolWFM.wfm as wfm
>>> waveform = wfm.Wfm.from_file("file_name.wfm")
>>> description = waveform.describe()
>>> print(description)
"""
import os
import os.path
import re
import struct
import sys
import tempfile
import urllib.parse
import wave
from typing import IO, TYPE_CHECKING, Any, Literal, cast
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import requests # type: ignore[import-untyped]
import RigolWFM.channel
import RigolWFM.agilent
import RigolWFM.isf
import RigolWFM.lecroy
import RigolWFM.rohde_schwarz
import RigolWFM.rigol
import RigolWFM.rigol_1000z_logic
import RigolWFM.siglent
import RigolWFM.tek
import RigolWFM.yokogawa
import RigolWFM.yokogawa_hdr
if TYPE_CHECKING:
from matplotlib.figure import Figure
# ---------------------------------------------------------------------------
# Non-Rigol vendor scope-family model-string lists
# ---------------------------------------------------------------------------
# Agilent / Keysight `AGxx` .bin files
Keysight_scopes: list[str] = [
"Keysight",
"KEYSIGHT",
"keysight",
"Agilent",
"AGILENT",
"agilent",
"agilent_bin",
]
# Siglent `.bin` waveform files
Siglent_scopes: list[str] = [
"Siglent",
"SIGLENT",
"siglent",
"siglent_bin",
]
Siglent_old_scopes: list[str] = [
"SiglentOld",
"SIGLENTOLD",
"siglentold",
]
# Rohde & Schwarz RTP oscilloscope exports (`.bin` metadata + `.Wfm.bin` payload)
RohdeSchwarz_scopes: list[str] = [
"RohdeSchwarz",
"ROHDESCHWARZ",
"rohdeschwarz",
"Rohde",
"ROHDE",
"rohde",
"R&S",
"r&s",
"rohde_schwarz_bin",
]
# Teledyne LeCroy .trc files (LECROY_1_0 / LECROY_2_3 format)
LeCroy_scopes: list[str] = ["LeCroy", "LECROY", "lecroy", "trc"]
# Tektronix .wfm files (WFM#001 / WFM#002 / WFM#003 formats)
Tek_scopes: list[str] = ["Tek", "TEK", "tektronix", "Tektronix", "tek_wfm"]
# Tektronix .isf files (ISF Internal Save Format)
ISF_scopes: list[str] = ["ISF", "isf", "tek_isf", "TEK_ISF"]
# Yokogawa ASCII-header .wfm files
Yokogawa_scopes: list[str] = ["Yokogawa", "YOKOGAWA", "yokogawa", "yoko", "yokogawa_wfm"]
# Yokogawa two-file .hdr + .wvf pairs
Yokogawa_wvf_scopes: list[str] = ["yokogawa_wvf", "yokogawa_hdr", "Yokogawa_WVF", "YOKOGAWA_WVF"]
_GENERIC_VENDOR_MODELS = {
*(name.upper() for name in Keysight_scopes),
*(name.upper() for name in Siglent_scopes),
*(name.upper() for name in Siglent_old_scopes),
*(name.upper() for name in RohdeSchwarz_scopes),
*(name.upper() for name in LeCroy_scopes),
*(name.upper() for name in Tek_scopes),
*(name.upper() for name in ISF_scopes),
*(name.upper() for name in Yokogawa_scopes),
*(name.upper() for name in Yokogawa_wvf_scopes),
}
# Re-export Rigol scope lists so existing callers that reference e.g.
# ``wfm.DS1000E_scopes`` continue to work without change.
DS1000B_scopes = RigolWFM.rigol.DS1000B_scopes
DS1000C_scopes = RigolWFM.rigol.DS1000C_scopes
DS1000D_scopes = RigolWFM.rigol.DS1000D_scopes
DS1000E_scopes = RigolWFM.rigol.DS1000E_scopes
DS1000Z_scopes = RigolWFM.rigol.DS1000Z_scopes
DS2000_scopes = RigolWFM.rigol.DS2000_scopes
DS4000_scopes = RigolWFM.rigol.DS4000_scopes
DS6000_scopes = RigolWFM.rigol.DS6000_scopes
DS5000_scopes = RigolWFM.rigol.DS5000_scopes
MSO5074_scopes = RigolWFM.rigol.MSO5074_scopes
DS7000_scopes = RigolWFM.rigol.DS7000_scopes
DS8000_scopes = RigolWFM.rigol.DS8000_scopes
DHO1000_scopes = RigolWFM.rigol.DHO1000_scopes
ExportArray = npt.NDArray[np.float64] | npt.NDArray[np.uint8]
# Re-export Rigol trigger constants so existing callers continue to work.
_DS2000_SOURCE_NAMES = RigolWFM.rigol._DS2000_SOURCE_NAMES # pylint: disable=protected-access
_LECROY_MAGIC = b"WAVEDESC"
_TEK_MAGIC = b"WFM#"
_ISF_MAGIC = b":CURV" # matches both ":CURV #" and ":CURVE #"
_CANONICAL_PARSER_NAMES = {
"rigol_1000b_wfm": "wfm1000b",
"rigol_1000c_wfm": "wfm1000c",
"rigol_1000e_wfm": "wfm1000e",
"rigol_1000z_wfm": "wfm1000z",
"rigol_2000_wfm": "wfm2000",
"rigol_4000_wfm": "wfm4000",
"rigol_6000_wfm": "wfm6000",
"rigol_mso5000_bin": "bin5000",
"rigol_7000_8000_bin": "bin7000_8000",
}
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
[docs]
class Read_WFM_Error(Exception):
"""Generic Read Error."""
[docs]
class Parse_WFM_Error(Exception):
"""Generic Parse Error."""
[docs]
class Invalid_URL(Exception):
"""URL scheme is not http or https."""
[docs]
class Unknown_Scope_Error(Exception):
"""Not one of the listed oscilloscope models."""
[docs]
class Write_WAV_Error(Exception):
"""Something went wrong while writing the .wave file."""
[docs]
class Channel_Not_In_WFM_Error(Exception):
"""The channel is not in the .wfm file."""
# ---------------------------------------------------------------------------
# Autodetect and utility functions
# ---------------------------------------------------------------------------
[docs]
def detect_model(filename: str) -> str:
"""Detect the oscilloscope model from a waveform file's binary signature.
Reads a small prefix of the file and uses magic bytes, file size, and
embedded model strings to identify the scope family.
Returns a short model string (e.g. ``"E"``, ``"Z"``, ``"5074"``) suitable
for passing to :meth:`Wfm.from_file`.
Raises:
FileNotFoundError: if the file cannot be opened.
Parse_WFM_Error: if the signature is not recognised.
"""
try:
fsize = os.path.getsize(filename)
with open(filename, "rb") as f:
hdr = f.read(8192)
except OSError as exc:
raise FileNotFoundError(filename) from exc
return _detect_model_from_header(hdr, fsize, filename)
def _detect_model_from_header(hdr: bytes, fsize: int, filename_hint: str = "") -> str:
"""Detect the oscilloscope model from a file header and filename hint."""
basename_hint = os.path.basename(filename_hint)
suffix = os.path.splitext(basename_hint)[1].lower()
allow_bin_like = suffix in ("", ".bin")
allow_wfm_like = suffix in ("", ".wfm")
display_name = filename_hint or "<buffer>"
if len(hdr) < 4:
raise Parse_WFM_Error(f"File too short to detect model: {display_name}")
magic4 = hdr[:4]
# DS1000B: a5 a5 a4 01
if magic4 == bytes([0xA5, 0xA5, 0xA4, 0x01]):
return "B"
# DS1000C: first byte 0xA1, bytes 1-3 = a5 00 00
if magic4 == bytes([0xA1, 0xA5, 0x00, 0x00]):
return "C"
# DS1000Z (and MSO1000Z): 01 ff ff ff
if magic4 == bytes([0x01, 0xFF, 0xFF, 0xFF]):
return "Z"
# DHO proprietary .wfm: 02 00 00 00
if magic4 == bytes([0x02, 0x00, 0x00, 0x00]) and allow_wfm_like:
return "DHO"
# DHO .bin export: RG03
if hdr[:4] == b"RG03":
return "DHO"
# Agilent / Keysight `.bin`: AG01 / AG03 / AG10
if hdr[:2] == b"AG":
try:
version = int(hdr[2:4].decode("ascii"))
except ValueError:
version = -1
if version in (1, 3, 10):
return "Keysight"
# Siglent `.bin`: documented old platform and V0.1-V6 waveform families
if allow_bin_like:
try:
siglent_revision = RigolWFM.siglent.detect_revision_from_bytes(hdr, fsize)
except ValueError:
siglent_revision = ""
if siglent_revision:
return "SiglentOld" if siglent_revision == "old" else "Siglent"
# Rohde & Schwarz RTP exports: XML `.bin` metadata with companion `.Wfm.bin` payload
if allow_bin_like and hdr.startswith(b"<?xml") and b"<Database" in hdr and b'SaveItemType="Data"' in hdr:
return "RohdeSchwarz"
# RG01: MSO5000-family, MSO7000, MSO8000
if hdr[:4] == b"RG01":
if len(hdr) >= 16:
wfm_hdr_size = struct.unpack("<I", hdr[12:16])[0]
if wfm_hdr_size == 128:
# MSO7000 vs MSO8000: read frame_string at file offset 100
# (12-byte file header + 88 bytes into waveform header)
try:
frame = hdr[100:124].split(b"\x00")[0].decode("ascii")
if frame.upper().startswith("MSO8") or frame.upper().startswith("DS8"):
return "8"
except Exception:
pass
return "7"
return "5"
# DS2000/4000/6000: a5 a5 38 00, model string at offset 4
if magic4 == bytes([0xA5, 0xA5, 0x38, 0x00]):
try:
model_str = hdr[4:24].split(b"\x00")[0].decode("ascii")
if model_str.startswith("DS4") or model_str.startswith("MSO4"):
return "4"
if model_str.startswith("DS6") or model_str.startswith("MSO6"):
return "6"
except Exception:
pass
return "2"
# DS1000C/D/E: a5 a5 00 00
# Distinguish by comparing file size to expected data offsets.
# DS1000D uses the wfm1000e parser (data at 276), so genuine D files match
# the E formula. The 272-byte formula arises only for DS1000C files whose
# first byte is 0xA5 instead of the usual 0xA1: those files include a
# 16-byte padding block before the samples (256 + 16 + n*pts = 272 + n*pts).
if magic4 == bytes([0xA5, 0xA5, 0x00, 0x00]):
try:
pts = struct.unpack("<I", hdr[28:32])[0]
ch1_en = bool(hdr[49])
ch2_en = bool(hdr[73])
n_ch = int(ch1_en) + int(ch2_en)
if pts > 0 and n_ch > 0:
if fsize == 256 + n_ch * pts:
return "C"
if fsize == 272 + n_ch * pts:
return "C"
if fsize == 276 + n_ch * pts:
return "E"
except Exception:
pass
return "E" # most common default for this magic
# Yokogawa single-file .wfm: ASCII header with NR_PT/XIN/YMU/YOF fields
if all(token in hdr for token in (b"NR_PT:", b"PT_O:", b"XIN:", b"YMU:", b"YOF:", b"BYT:")):
return "Yokogawa"
# Yokogawa two-file .hdr + .wvf: ASCII text with $PublicInfo section
if b"$PublicInfo" in hdr:
return "yokogawa_wvf"
# Tektronix legacy .wfm: "LLWFM" marker in the opening bytes
if b"LLWFM" in hdr[:8]:
return "Tek"
# Tektronix .wfm: byte_order word at 0 (0x0F0F LE or 0xF0F0 BE), version "WFM#" at offset 2
if hdr[0] in (0x0F, 0xF0) and hdr[1] in (0x0F, 0xF0) and hdr[2:6] == _TEK_MAGIC:
return "Tek"
# Tektronix .isf: ASCII header containing ":CURV " or ":CURVE " followed by '#'
if _ISF_MAGIC in hdr:
return "ISF"
# LeCroy .trc: "WAVEDESC" marker at byte 0, or after a SCPI / transport prefix
if _LECROY_MAGIC in hdr:
return "LeCroy"
raise Parse_WFM_Error(f"Unrecognised file signature {magic4.hex()} in {display_name}")
def _model_in_family(model: str, family: list[str]) -> bool:
"""Return True when model matches one of the aliases in family."""
upper = model.upper()
return any(upper == alias.upper() for alias in family)
def _default_download_suffix(model: str, header: bytes) -> str:
"""Return a usable file suffix for a downloaded waveform."""
if _model_in_family(model, Keysight_scopes):
return ".bin"
if _model_in_family(model, Siglent_scopes) or _model_in_family(model, Siglent_old_scopes):
return ".bin"
if _model_in_family(model, RohdeSchwarz_scopes):
return ".bin"
if _model_in_family(model, LeCroy_scopes):
return ".trc"
if _model_in_family(model, ISF_scopes):
return ".isf"
if _model_in_family(model, Tek_scopes):
return ".wfm"
if _model_in_family(model, Yokogawa_scopes):
return ".wfm"
if _model_in_family(model, DHO1000_scopes):
return ".bin" if header.startswith(b"RG03") else ".wfm"
if (
_model_in_family(model, DS5000_scopes)
or _model_in_family(model, MSO5074_scopes)
or _model_in_family(model, DS7000_scopes)
or _model_in_family(model, DS8000_scopes)
):
return ".bin"
return ".wfm"
def _rohde_schwarz_payload_url(url: str) -> str:
"""Return the companion `.Wfm.bin` URL for an RTP metadata URL."""
parsed = urllib.parse.urlparse(url)
path = parsed.path
if path.lower().endswith(".wfm.bin") or not path.lower().endswith(".bin"):
raise Parse_WFM_Error(f"Cannot derive a Rohde & Schwarz companion payload URL from '{url}'")
return parsed._replace(path=path[:-4] + ".Wfm.bin").geturl()
def _best_firmware_version(waveform: Any) -> str:
"""Return the most informative firmware-version string exposed by a parser."""
preheader = getattr(waveform, "preheader", None)
preheader_firmware = getattr(preheader, "firmware_version", "")
if preheader_firmware:
return str(preheader_firmware)
header = getattr(waveform, "header", None)
header_firmware = getattr(header, "firmware_version", "")
if header_firmware:
return str(header_firmware)
return ""
[docs]
def valid_scope_list() -> str:
"""List all supported oscilloscope model strings."""
s = "\nRigol oscilloscope models:\n "
s += "\n ".join(", ".join(family) for family in RigolWFM.rigol.ALL_RIGOL_SCOPES)
s += "\n "
s += ", ".join(Keysight_scopes) + "\n "
s += ", ".join(Siglent_scopes) + "\n "
s += ", ".join(Siglent_old_scopes) + "\n "
s += ", ".join(RohdeSchwarz_scopes) + "\n "
s += ", ".join(LeCroy_scopes) + "\n "
s += ", ".join(Tek_scopes) + "\n "
s += ", ".join(ISF_scopes) + "\n"
s += " " + ", ".join(Yokogawa_scopes) + "\n"
s += " " + ", ".join(Yokogawa_wvf_scopes) + "\n"
return s
def _scope_family(name: str) -> str:
"""Return the alphabetic prefix of a scope name (e.g. 'MSO' from 'MSO5074').
Used to detect obvious model mismatches between user-supplied model codes
and the model string embedded in the file. Single-character aliases (B, C,
D, E, Z) and purely-numeric codes ('2', '5074') are intentionally short and
can never be compared reliably, so callers should skip the warning when
len(_scope_family(umodel)) <= 1.
"""
head = name.upper().split("(", 1)[0].strip()
head = head.split(None, 1)[0] if head else ""
return "".join(c for c in head if c.isalpha())
# Backward-compatible convenience wrapper
[docs]
def dho_from_file(file_name: str) -> Any:
"""Backward-compatible wrapper around `RigolWFM.dho.from_file()`."""
return RigolWFM.rigol.dho_from_file(file_name)
def _sanitize_export_name(name: str, used: set[str]) -> str:
"""Return a MAT/NPZ-safe array name while preserving familiar channel labels."""
sanitized = re.sub(r"[^0-9A-Za-z_]+", "_", name).strip("_")
if not sanitized:
sanitized = "signal"
if sanitized[0].isdigit():
sanitized = f"signal_{sanitized}"
if sanitized in {"time", "start", "increment"}:
sanitized = f"{sanitized}_channel"
base = sanitized
suffix = 1
while sanitized in used:
sanitized = f"{base}_{suffix}"
suffix += 1
return sanitized
def _write_binary_output(filename: str | os.PathLike[str] | IO[bytes], payload: bytes) -> None:
"""Write a binary payload to a path or writable binary file object."""
if isinstance(filename, (str, os.PathLike)):
with open(os.fspath(filename), "wb") as handle:
handle.write(payload)
return
filename.write(payload)
def _mat_v5_pad(data: bytes) -> bytes:
"""Pad a MAT v5 element payload to the next 8-byte boundary."""
pad = (-len(data)) % 8
if pad == 0:
return data
return data + (b"\x00" * pad)
def _mat_v5_element(data_type: int, payload: bytes) -> bytes:
"""Build one regular MAT v5 data element."""
return struct.pack("<II", data_type, len(payload)) + _mat_v5_pad(payload)
def _mat_v5_numeric_matrix(name: str, values: ExportArray) -> bytes:
"""Encode one real numeric MATLAB v5 matrix using column-major storage."""
array = np.asarray(values, dtype=np.float64)
array2d: npt.NDArray[np.float64]
if array.ndim == 0:
array2d = array.reshape((1, 1))
elif array.ndim == 1:
array2d = array.reshape((array.shape[0], 1))
elif array.ndim == 2:
array2d = cast(npt.NDArray[np.float64], array)
else:
raise ValueError(f"MAT export only supports scalars, vectors, and 2-D arrays; got {array.ndim} dims.")
name_bytes = name.encode("ascii")
if not name_bytes:
raise ValueError("MAT export variable names must not be empty.")
mxDOUBLE_CLASS = 6
miINT8 = 1
miINT32 = 5
miUINT32 = 6
miDOUBLE = 9
miMATRIX = 14
flags = struct.pack("<II", mxDOUBLE_CLASS, 0)
dims = struct.pack("<" + ("i" * array2d.ndim), *array2d.shape)
real = np.asfortranarray(array2d).tobytes(order="F")
matrix = b"".join(
[
_mat_v5_element(miUINT32, flags),
_mat_v5_element(miINT32, dims),
_mat_v5_element(miINT8, name_bytes),
_mat_v5_element(miDOUBLE, real),
]
)
return _mat_v5_element(miMATRIX, matrix)
def _build_mat_v5_payload(arrays: dict[str, ExportArray]) -> bytes:
"""Return a MATLAB v5 `.mat` payload for the provided numeric arrays."""
header_text = "MATLAB 5.0 MAT-file, Platform: RigolWFM, Created by RigolWFM"
header = header_text.encode("ascii", "replace")[:116].ljust(116, b" ")
header += b"\x00" * 8
header += struct.pack("<H", 0x0100)
header += b"IM"
body = b"".join(_mat_v5_numeric_matrix(name, values) for name, values in arrays.items())
return header + body
# ---------------------------------------------------------------------------
# Main Wfm class
# ---------------------------------------------------------------------------
[docs]
class Wfm:
"""Class with parsed data from a waveform file."""
channels: list[RigolWFM.channel.Channel]
original_name: str
file_name: str
basename: str
firmware: str
user_name: str
parser_name: str
header_name: str
serial_number: str
def __init__(self, file_name: str) -> None:
"""Initialize a Wfm object from a file."""
self.channels = []
self.original_name = file_name
self.file_name = file_name
self.basename = file_name
self.firmware = "unknown"
# there are multiple possible scope names
# 1. user_name - the name passed to the program
# 2. header_name - the name found in the header of the file
# 3. parser_name - the name of parser used
self.user_name = "unknown"
self.parser_name = "unknown"
self.header_name = "unknown"
self.serial_number = ""
self.trigger_info: dict = {}
self.logic_split: RigolWFM.rigol_1000z_logic.Rigol1000zSplit | None = None
self.logic_mapping = ""
self.logic_channels: dict[str, npt.NDArray[np.uint8]] = {}
self.logic_observed_channels: dict[str, npt.NDArray[np.uint8]] = {}
self.logic_times: npt.NDArray[np.float64] | None = None
self.logic_seconds_per_point: float | None = None
self.logic_time_offset: float | None = None
[docs]
@classmethod
def from_file(cls, file_name: str, model: str = "auto", selected: str = "1234") -> "Wfm":
"""
Create Wfm object from a file.
Args:
file_name: name of file
model: oscilloscope family, e.g. 'E', 'Z', 'LeCroy', 'Tek'; defaults to auto-detect
selected: string of channels to process e.g., '12'
Returns:
a Wfm object for the file
"""
try:
with open(file_name, "rb"):
pass
except OSError as e:
raise Read_WFM_Error(e) from e
new_wfm = cls(file_name)
new_wfm.original_name = file_name
new_wfm.file_name = file_name
new_wfm.basename = os.path.basename(file_name)
auto_model = model.upper() == "AUTO"
umodel = detect_model(file_name).upper() if auto_model else model.upper()
# --- Rigol families ---
rigol_result = RigolWFM.rigol.parse_file(umodel, file_name)
if rigol_result is not None:
w, new_wfm.header_name, new_wfm.serial_number, new_wfm.trigger_info = rigol_result
# --- Agilent / Keysight `.bin` ---
elif _model_in_family(umodel, Keysight_scopes):
w = RigolWFM.agilent.from_file(file_name)
new_wfm.header_name = w.header.model or "Keysight"
new_wfm.serial_number = w.header.serial_number
# --- Siglent `.bin` waveform files ---
elif _model_in_family(umodel, Siglent_scopes) or _model_in_family(umodel, Siglent_old_scopes):
w = RigolWFM.siglent.from_file(file_name, umodel)
new_wfm.header_name = w.header.model or "Siglent"
new_wfm.serial_number = getattr(w.header, "serial_number", "")
# --- Rohde & Schwarz RTP `.bin` metadata files ---
elif _model_in_family(umodel, RohdeSchwarz_scopes):
w = RigolWFM.rohde_schwarz.from_file(file_name)
new_wfm.header_name = w.header.model or "Rohde & Schwarz"
new_wfm.serial_number = getattr(w.header, "serial_number", "")
# --- LeCroy ---
elif _model_in_family(umodel, LeCroy_scopes):
w = RigolWFM.lecroy.from_file(file_name)
new_wfm.header_name = w.header.model_number or "LeCroy"
# --- Tektronix .wfm ---
elif _model_in_family(umodel, Tek_scopes):
w = RigolWFM.tek.from_file(file_name)
new_wfm.header_name = w.header.model or "Tektronix"
# --- Tektronix .isf ---
elif _model_in_family(umodel, ISF_scopes):
w = RigolWFM.isf.from_file(file_name)
new_wfm.header_name = w.header.model or "Tektronix ISF"
# --- Yokogawa .wfm ---
elif _model_in_family(umodel, Yokogawa_scopes):
w = RigolWFM.yokogawa.from_file(file_name)
new_wfm.header_name = w.header.model or "Yokogawa"
# --- Yokogawa .hdr + .wvf ---
elif _model_in_family(umodel, Yokogawa_wvf_scopes):
w = RigolWFM.yokogawa_hdr.from_hdr_file(file_name)
new_wfm.header_name = w.header.model or "Yokogawa"
else:
raise Unknown_Scope_Error(f"Unknown oscilloscope type: '{umodel}'\n{valid_scope_list()}")
new_wfm.user_name = "auto" if auto_model else model
pname = getattr(w, "parser_name", type(w).__module__.rsplit(".", 1)[-1])
pname = _CANONICAL_PARSER_NAMES.get(pname, pname)
new_wfm.parser_name = pname
fallback_firmware = _best_firmware_version(w)
if pname == "wfm1000z":
enabled_count = sum(
int(flag)
for flag in (
getattr(w.header, "ch1_enabled", False),
getattr(w.header, "ch2_enabled", False),
getattr(w.header, "ch3_enabled", False),
getattr(w.header, "ch4_enabled", False),
)
)
split = RigolWFM.rigol_1000z_logic.split_raw_payload(w.data.raw, enabled_count)
if split.uses_logic_layout:
new_wfm.logic_split = split
new_wfm.logic_observed_channels = RigolWFM.rigol_1000z_logic.observed_bit_traces(split)
new_wfm.logic_channels, new_wfm.logic_mapping = RigolWFM.rigol_1000z_logic.named_digital_traces(split)
if split.logic_lanes:
logic_points = len(split.logic_lanes[0])
new_wfm.logic_seconds_per_point = float(w.header.seconds_per_point)
new_wfm.logic_time_offset = float(w.header.time_offset)
start = new_wfm.logic_time_offset - logic_points * new_wfm.logic_seconds_per_point / 2
new_wfm.logic_times = (start + np.arange(logic_points) * new_wfm.logic_seconds_per_point).astype(
np.float64
)
elif pname == "bin5000":
logic_channels = getattr(w, "logic_channels", {})
if logic_channels:
new_wfm.logic_channels = logic_channels
new_wfm.logic_observed_channels = getattr(w, "logic_observed_channels", logic_channels)
new_wfm.logic_mapping = getattr(w, "logic_mapping", "")
new_wfm.logic_seconds_per_point = getattr(w, "logic_x_increment", None)
logic_x_origin = getattr(w, "logic_x_origin", None)
if logic_x_origin is not None:
new_wfm.logic_time_offset = -logic_x_origin
first_trace = next(iter(logic_channels.values()), None)
if (
first_trace is not None
and new_wfm.logic_seconds_per_point is not None
and logic_x_origin is not None
):
logic_start = -float(logic_x_origin)
new_wfm.logic_times = (
logic_start + np.arange(len(first_trace)) * new_wfm.logic_seconds_per_point
).astype(np.float64)
# Warn when the model embedded in the file clearly disagrees with the
# user-supplied model. Single-character aliases (B, C, D, E, Z) and
# purely-numeric codes ('2', '5074') are intentionally short and cannot
# be compared reliably, so skip the check for them.
file_family = _scope_family(new_wfm.header_name)
user_family = "" if auto_model else _scope_family(umodel)
if (
file_family
and len(user_family) > 1
and umodel not in _GENERIC_VENDOR_MODELS
and file_family not in user_family
and user_family not in file_family
):
print(
f"Warning: file reports model '{new_wfm.header_name}' " f"but scope type '{model}' was specified.",
file=sys.stderr,
)
enabled = ""
for ch_number in range(1, 5):
ch = RigolWFM.channel.Channel(w, ch_number, pname, selected)
if not ch.enabled:
continue
enabled += str(ch_number)
new_wfm.channels.append(ch)
if len(new_wfm.channels) == 0:
if not new_wfm.logic_channels:
print("Sorry! No channels in the waveform are both selected and enabled", file=sys.stderr)
print(f" User selected channels = '{selected}'", file=sys.stderr)
print(f" Scope enabled channels = '{enabled}'", file=sys.stderr)
print(file=sys.stderr)
if fallback_firmware:
new_wfm.firmware = fallback_firmware
else:
new_wfm.firmware = new_wfm.channels[0].firmware
if new_wfm.firmware == "unknown" and fallback_firmware:
new_wfm.firmware = fallback_firmware
return new_wfm
[docs]
@classmethod
def from_url(cls, url: str, model: str = "auto", selected: str = "1234") -> "Wfm":
"""
Return a waveform object given a URL.
This is a bit complicated because the parser must have a local file
to work with. The process is to download the file to a temporary
location and then process that file. There is a lot that can go
wrong - bad url, bad download, or an error parsing the file.
Args:
url: location of the file
model: oscilloscope family, e.g. 'E' or 'Z'; defaults to auto-detect
selected: string of channels to process e.g., '12'
Returns:
a Wfm object for the file
"""
u = urllib.parse.urlparse(url)
scheme = u[0]
if scheme not in ["http", "https"]:
raise Invalid_URL(f"URL scheme must be 'http' or 'https', got '{scheme}': {url}")
try:
print(f"downloading '{url}'", file=sys.stderr)
r = requests.get(url, allow_redirects=True, timeout=10)
r.raise_for_status()
payload = r.content
path = urllib.parse.unquote(u.path)
basename = os.path.basename(path)
auto_model = model.upper() == "AUTO"
resolved_model = (
_detect_model_from_header(payload[:8192], len(payload), basename or path or url)
if auto_model
else model
)
if not basename:
basename = "downloaded_waveform"
if not os.path.splitext(basename)[1]:
basename += _default_download_suffix(resolved_model, payload[:8192])
with tempfile.TemporaryDirectory() as tmpdir:
working_name = os.path.join(tmpdir, basename)
with open(working_name, "wb") as handle:
handle.write(payload)
if _model_in_family(resolved_model, RohdeSchwarz_scopes):
payload_url = _rohde_schwarz_payload_url(url)
print(f"downloading '{payload_url}'", file=sys.stderr)
payload_response = requests.get(payload_url, allow_redirects=True, timeout=10)
payload_response.raise_for_status()
local_payload = os.path.splitext(working_name)[0] + ".Wfm.bin"
with open(local_payload, "wb") as handle:
handle.write(payload_response.content)
try:
model_choice = "auto" if auto_model else model
new_wfm = cls.from_file(working_name, model_choice, selected)
new_wfm.original_name = url
new_wfm.basename = os.path.basename(path) or basename
return new_wfm
except (Read_WFM_Error, Parse_WFM_Error, Unknown_Scope_Error):
raise
except Exception as e:
raise Parse_WFM_Error(e) from e
except requests.exceptions.RequestException as e:
raise Read_WFM_Error(f"Failed to download '{url}': {e}") from e
[docs]
def describe(self) -> str:
"""Return a string describing the contents of the waveform file."""
s = " General:\n"
s += " File Model = %s\n" % self.header_name
if self.serial_number:
s += " Serial Number = %s\n" % self.serial_number
s += " User Model = %s\n" % self.user_name
s += " Parser Model = %s\n" % self.parser_name
s += " Firmware = %s\n" % self.firmware
s += " Filename = %s\n" % self.basename
s += " Channels = ["
first = True
for ch in self.channels:
if not first:
s += ", "
s += "%s" % ch.channel_number
first = False
s += "]\n\n"
if self.logic_channels:
s += " Logic:\n"
if self.logic_split is not None and self.logic_split.uses_logic_layout:
s += " Layout = interleaved stride %d\n" % self.logic_split.inferred_stride
if self.logic_mapping:
s += " Mapping = %s\n" % self.logic_mapping
if self.logic_split is not None and self.logic_split.logic_lanes:
s += " Points = %8d\n" % len(self.logic_split.logic_lanes[0])
elif self.logic_times is not None:
s += " Points = %8d\n" % len(self.logic_times)
elif self.logic_channels:
first_trace = next(iter(self.logic_channels.values()))
s += " Points = %8d\n" % len(first_trace)
if self.logic_seconds_per_point is not None:
s += " Delta = %10ss/point\n" % (
RigolWFM.channel.engineering_string(self.logic_seconds_per_point, 3)
)
s += " Traces = ["
s += ", ".join(self.logic_channels)
s += "]\n"
if self.logic_observed_channels and list(self.logic_observed_channels) != list(self.logic_channels):
s += " Observed = ["
s += ", ".join(self.logic_observed_channels)
s += "]\n"
s += "\n"
# Compute derived trigger levels: voltage at t=0 for relevant analog channels.
_source = self.trigger_info.get("source", "")
_CH_SOURCE_MAP = {"CH1": 1, "CH2": 2, "CH3": 3, "CH4": 4}
_source_ch_num = _CH_SOURCE_MAP.get(_source)
_known_non_analog = bool(_source) and _source_ch_num is None
derived_levels: dict[int, float] = {}
if not _known_non_analog:
for ch in self.channels:
if _source_ch_num is not None and ch.channel_number != _source_ch_num:
continue
if ch.enabled_and_selected and ch.times is not None and ch.volts is not None and len(ch.times) > 0:
idx = int(np.argmin(np.abs(ch.times)))
derived_levels[ch.channel_number] = float(ch.volts[idx])
if self.trigger_info or derived_levels:
s += " Trigger:\n"
if self.trigger_info:
if self.trigger_info.get("mode") == "alt":
s += " Mode = alt\n"
if "trigger1" in self.trigger_info:
s += "\n Trigger 1:\n"
s += RigolWFM.rigol.describe_trigger_block(self.trigger_info["trigger1"], indent=" ")
if "trigger2" in self.trigger_info:
s += "\n Trigger 2:\n"
s += RigolWFM.rigol.describe_trigger_block(self.trigger_info["trigger2"], indent=" ")
else:
s += RigolWFM.rigol.describe_trigger_block(self.trigger_info)
show_ch_label = _source_ch_num is None
for ch_num, level in sorted(derived_levels.items()):
label = "Derived Level (CH%d)" % ch_num if show_ch_label else "Derived Level "
s += " %s = %sV\n" % (label, RigolWFM.channel.engineering_string(level, 2))
s += "\n"
for ch in self.channels:
s += str(ch)
s += "\n"
return s
[docs]
def best_scaling(self) -> tuple[float, str, float, str]:
"""Return appropriate scaling for plot."""
v_scale = 1e-12
v_prefix = ""
h_scale = 1e-12
h_prefix = ""
for ch in self.channels:
v, p = RigolWFM.channel.best_scale(ch.volt_per_division)
if v > v_scale:
v_scale = v
v_prefix = p
h, p = RigolWFM.channel.best_scale(ch.time_scale)
if h > h_scale:
h_scale = h
h_prefix = p
if not self.channels:
logic_span = 0.0
if self.logic_times is not None and len(self.logic_times) > 1:
logic_span = float(self.logic_times[-1] - self.logic_times[0])
elif self.logic_seconds_per_point is not None:
logic_span = float(self.logic_seconds_per_point)
h_scale, h_prefix = RigolWFM.channel.best_scale(logic_span)
return h_scale, h_prefix, v_scale, v_prefix
def _csv_series(
self,
) -> tuple[
npt.NDArray[np.float64],
list[tuple[str, str, npt.NDArray[np.float64] | npt.NDArray[np.uint8]]],
]:
"""Return a shared X axis plus ordered analog/digital traces for CSV export."""
analog_channels = [
ch
for ch in self.channels
if ch.enabled_and_selected and ch.times is not None and ch.volts is not None and ch.points > 0
]
digital_channels = list(self.logic_channels.items())
if not analog_channels and not digital_channels:
return np.empty(0, dtype=np.float64), []
base_times: npt.NDArray[np.float64] | None
if analog_channels:
assert analog_channels[0].times is not None
base_times = analog_channels[0].times.astype(np.float64, copy=False)
elif self.logic_times is not None:
base_times = self.logic_times.astype(np.float64, copy=False)
else:
first_trace = next(iter(self.logic_channels.values()), None)
if first_trace is None:
return np.empty(0, dtype=np.float64), []
start = float(self.logic_time_offset or 0.0)
step = float(self.logic_seconds_per_point or 1.0)
base_times = start + np.arange(len(first_trace), dtype=np.float64) * step
n_pts = len(base_times)
for ch in analog_channels:
assert ch.times is not None
assert ch.volts is not None
n_pts = min(n_pts, len(ch.times), len(ch.volts), ch.points)
if self.logic_times is not None:
n_pts = min(n_pts, len(self.logic_times))
for _, trace in digital_channels:
n_pts = min(n_pts, len(trace))
series: list[tuple[str, str, npt.NDArray[np.float64] | npt.NDArray[np.uint8]]] = []
for ch in analog_channels:
assert ch.volts is not None
series.append((ch.name, "analog", ch.volts[:n_pts]))
for name, trace in digital_channels:
series.append((name, "digital", trace[:n_pts]))
return base_times[:n_pts], series
def _export_arrays(self) -> dict[str, ExportArray]:
"""Return the shared MAT/NPZ export arrays for this waveform."""
times, series = self._csv_series()
if len(times) == 0 or not series:
return {}
increment = float((times[-1] - times[0]) / (len(times) - 1)) if len(times) > 1 else 0.0
arrays: dict[str, ExportArray] = {
"time": times.astype(np.float64, copy=True),
"start": np.array([float(times[0])], dtype=np.float64),
"increment": np.array([increment], dtype=np.float64),
}
used = set(arrays)
for name, kind, values in series:
key = _sanitize_export_name(name, used)
used.add(key)
if kind == "analog":
arrays[key] = np.asarray(values, dtype=np.float64)
else:
arrays[key] = np.asarray(values, dtype=np.uint8)
return arrays
[docs]
def plot(self) -> "Figure":
"""Plot the data in oscilloscope style and return the Figure."""
_CH_COLORS = ["#FFFF00", "#00FFFF", "#FF00FF", "#00FF00"]
h_scale, h_prefix, v_scale, v_prefix = self.best_scaling()
fig, ax = plt.subplots(figsize=(10, 6), facecolor="black")
ax.set_facecolor("black")
for i, ch in enumerate(self.channels):
ax.plot(
ch.times * h_scale, # type: ignore[operator]
ch.volts * v_scale, # type: ignore[operator]
label=ch.name,
color=_CH_COLORS[i % 4],
linewidth=0.8,
)
ax.set_xlabel("Time (%ss)" % h_prefix, color="white")
ax.set_ylabel("Voltage (%sV)" % v_prefix, color="white")
ax.set_title(self.basename, color="white")
ax.legend(loc="upper right", facecolor="black", edgecolor="#555555", labelcolor="white")
ax.grid(True, which="major", color="#2a2a2a", linewidth=0.8, linestyle="-")
ax.minorticks_on()
ax.grid(True, which="minor", color="#1a1a1a", linewidth=0.4, linestyle=":")
ax.tick_params(colors="white", which="both")
for spine in ax.spines.values():
spine.set_edgecolor("#444444")
fig.tight_layout()
return fig
[docs]
def csv(self) -> str:
"""Return a string of comma separated values."""
times, series = self._csv_series()
if len(times) == 0 or not series:
return ""
h_scale, h_prefix, v_scale, v_prefix = self.best_scaling()
def _fmt_float(value: float) -> str:
"""Format floats with enough precision for round-trip CSV axes."""
return format(value, ".17g")
s = "X"
for name, _, _ in series:
s += ",%s" % name
s += ",Start,Increment\n"
n_pts = len(times)
incr = ((times[n_pts - 1] - times[0]) / (n_pts - 1)) * h_scale if n_pts > 1 else 0.0
off = times[0] * h_scale
x_values = off + np.arange(n_pts, dtype=np.float64) * incr
s += "%ss" % h_prefix
for name, kind, _ in series:
if kind == "analog":
ch = next(ch for ch in self.channels if ch.name == name)
s += ",%s%s" % (v_prefix, ch.unit.name.upper())
else:
s += ",STATE"
s += f",{_fmt_float(off)},{_fmt_float(incr)}\n"
for i in range(n_pts):
s += _fmt_float(float(x_values[i]))
for _, kind, values in series:
if kind == "analog":
s += ",%.2f" % (float(values[i]) * v_scale)
else:
s += ",%d" % int(values[i])
s += "\n"
return s
[docs]
def sigrokcsv(self) -> str:
"""Return a string of comma separated values for sigrok."""
times, series = self._csv_series()
if len(times) == 0 or not series:
return ""
def _fmt_float(value: float) -> str:
"""Format floats with enough precision for sigrok round-trips."""
return format(value, ".17g")
s = "X"
analog_labels = {
ch.name: f"{ch.name} ({ch.unit.name.upper()})"
for ch in self.channels
if ch.enabled_and_selected and ch.volts is not None
}
for name, kind, _ in series:
s += ",%s" % (analog_labels.get(name, name) if kind == "analog" else name)
s += "\n"
for i, time_value in enumerate(times):
s += _fmt_float(float(time_value))
for _, kind, values in series:
if kind == "analog":
s += ",%s" % _fmt_float(float(values[i]))
else:
s += ",%d" % int(values[i])
s += "\n"
return s
[docs]
def npz(self, filename: str | os.PathLike[str] | IO[bytes]) -> None:
"""Save waveform arrays as a NumPy `.npz` archive."""
arrays = self._export_arrays()
if not arrays:
raise ValueError("No analog or digital channels are available to export to NPZ.")
kwargs: Any = arrays
if isinstance(filename, (str, os.PathLike)):
np.savez(file=os.fspath(filename), **kwargs)
return
np.savez(file=filename, **kwargs)
[docs]
def mat(self, filename: str | os.PathLike[str] | IO[bytes]) -> None:
"""Save waveform arrays as a MATLAB v5 `.mat` file."""
arrays = self._export_arrays()
if not arrays:
raise ValueError("No analog or digital channels are available to export to MAT.")
payload = _build_mat_v5_payload(arrays)
_write_binary_output(filename, payload)
[docs]
def wav(
self,
filename: str | os.PathLike[str] | IO[bytes],
*,
channel: int | list[int] = 1,
scale: Literal["auto", "scope"] = "auto",
) -> None:
"""Save one or two channels as a signed 16-bit WAV file.
Args:
filename: Destination path (str or PathLike) or a writable binary file-like object.
channel: Channel number(s) to export. Pass a single int for mono output,
or a list of two ints for stereo output. Each channel must be enabled
and selected. In LTspice, the first channel is addressed as ``chan=0``
and the second as ``chan=1``.
scale: How to map voltages to the ±32767 integer range.
``"auto"`` maps each channel's own min/max volts to ±32767.
This preserves waveform shape, but absolute voltage information is
lost. In LTspice, set ``Vpeak`` on the WAV source to half the
signal's peak-to-peak voltage.
``"scope"`` maps each channel's ±(4 × V/div) full-scale range to
±32767 while keeping zero volts at zero. In LTspice, set
``Vpeak = 4 × V/div``.
Raises:
ValueError: If more than two channels are requested, or if any requested
channel is not found, not enabled, or not selected.
"""
channel_list = [channel] if isinstance(channel, int) else list(channel)
if len(channel_list) > 2:
raise ValueError(f"WAV files support at most 2 channels; {len(channel_list)} requested.")
channels = []
for num in channel_list:
ch = next(
(c for c in self.channels if c.channel_number == num and c.enabled_and_selected),
None,
)
if ch is None:
raise ValueError(f"Channel {num} is not available or not enabled/selected in this waveform.")
channels.append(ch)
def _scale_channel(ch: Any) -> npt.NDArray[np.int16]:
assert ch.volts is not None
v = ch.volts.astype(np.float64)
if scale == "auto":
v_min = float(np.min(v))
v_max = float(np.max(v))
v_range = v_max - v_min if v_max != v_min else 1.0
return ((v - v_min) / v_range * 65534 - 32767).astype(np.int16)
# "scope"
full_scale = 4.0 * ch.volt_per_division if ch.volt_per_division != 0 else 1.0
return np.clip(v / full_scale * 32767, -32767, 32767).astype(np.int16)
scaled = [_scale_channel(ch) for ch in channels]
n_channels = len(scaled)
n_pts = min(len(s) for s in scaled)
if n_channels == 1:
frames = scaled[0][:n_pts]
else:
frames = np.empty(n_pts * 2, dtype=np.int16)
frames[0::2] = scaled[0][:n_pts]
frames[1::2] = scaled[1][:n_pts]
_MAX_WAV_U32 = 2**32 - 1
sample_rate = int(min(round(1.0 / channels[0].seconds_per_point), _MAX_WAV_U32 // (n_channels * 2)))
wave_target: str | IO[bytes]
if isinstance(filename, (str, os.PathLike)):
wave_target = os.fspath(filename)
else:
wave_target = filename
wavef = wave.open(wave_target, "wb")
try:
wavef.setnchannels(n_channels)
wavef.setsampwidth(2)
wavef.setframerate(sample_rate)
wavef.setcomptype("NONE", "")
wavef.setnframes(n_pts)
wavef.writeframes(frames.tobytes())
finally:
wavef.close()