"""
Adapter layer for Tektronix ISF (Internal Save Format) oscilloscope files (.isf).
Normalises the parsed KSY data into the header/data shape expected by
`RigolWFM.wfm.Wfm` and `RigolWFM.channel`.
File structure
--------------
<text_header> '#' <n_digits> <byte_count_str> <curve_data>
The text header is a semicolon-delimited sequence of KEY VALUE pairs. Field
names appear in a long form (older firmware) or short form (newer firmware):
Long form: BYT_NR, BIT_NR, ENCDG, BN_FMT, BYT_OR, WFID, NR_PT,
PT_FMT, XUNIT, XINCR, XZERO, PT_OFF, YUNIT, YMULT, YOFF, YZERO
Short form: BYT_N, BIT_N, ENC, BN_F, BYT_O, WFI, NR_P,
PT_F, XUN, XIN, XZE, PT_O, YUN, YMU, YOF, YZE
Voltage calibration
-------------------
volts[i] = YZERO + YMULT * (adc[i] - YOFF)
Time axis (PT_FMT = "Y")
-------------------------
t[i] = XZERO + XINCR * (i - PT_OFF) (i = 0 … NR_PT-1)
Envelope mode (PT_FMT = "ENV")
-------------------------------
sample_count = NR_PT / 2
adc_min[i] = adc[2*i]
adc_max[i] = adc[2*i + 1]
t[i] = XZERO + XINCR * (2*i - PT_OFF)
"""
import io as _io
import re
from typing import Any, Optional
import numpy as np
import numpy.typing as npt
from kaitaistruct import KaitaiStream # type: ignore[import]
import RigolWFM.channel
import RigolWFM.tektronix_internal_isf
_TektronixInternalIsf: Any = RigolWFM.tektronix_internal_isf.TektronixInternalIsf # type: ignore[attr-defined]
# Maps both long-form and short-form field names to a canonical key
_FIELD_ALIASES: dict[str, str] = {
# bytes per sample
"BYT_NR": "byt_nr",
"BYT_N": "byt_nr",
# bytes per point — synonym used in some headers
"BN_FMT": "bn_fmt",
"BN_F": "bn_fmt",
# byte order
"BYT_OR": "byt_or",
"BYT_O": "byt_or",
# waveform identifier / label
"WFID": "wfid",
"WFI": "wfid",
# number of points
"NR_PT": "nr_pt",
"NR_P": "nr_pt",
# point format
"PT_FMT": "pt_fmt",
"PT_F": "pt_fmt",
# x increment (time per sample)
"XINCR": "xincr",
"XIN": "xincr",
# x zero
"XZERO": "xzero",
"XZE": "xzero",
# point offset
"PT_OFF": "pt_off",
"PT_O": "pt_off",
# y multiplier
"YMULT": "ymult",
"YMU": "ymult",
# y offset (ADC count)
"YOFF": "yoff",
"YOF": "yoff",
# y zero (baseline volts)
"YZERO": "yzero",
"YZE": "yzero",
# volts per division (optional)
"VSCALE": "vscale",
}
def _parse_header(text: str) -> dict[str, str]:
"""Parse an ISF text header into a canonical field dict.
Handles both long-form and short-form field names, and strips the
optional ':WFMP:' SCPI prefix from field names.
Args:
text: the raw ASCII header string (without the trailing '#').
Returns:
dict mapping canonical field name (e.g. ``'nr_pt'``) to raw string value.
"""
result: dict[str, str] = {}
for fragment in text.split(";"):
fragment = fragment.strip()
if not fragment:
continue
# Strip optional :WFMP: or :CURVE / :CURV prefix
fragment = re.sub(r"^:[A-Z]+:", "", fragment)
# Split into key and value on first whitespace
parts = fragment.split(None, 1)
if len(parts) < 2:
continue
key_raw = parts[0].upper()
value = parts[1].strip().strip('"')
canonical = _FIELD_ALIASES.get(key_raw)
if canonical is not None:
result[canonical] = value
return result
def _float_field(fields: dict[str, str], key: str, default: float = 0.0) -> float:
"""Return a float from the header dict, falling back to ``default``."""
raw = fields.get(key)
if raw is None:
return default
try:
return float(raw)
except ValueError:
return default
def _int_field(fields: dict[str, str], key: str, default: int = 0) -> int:
"""Return an int from the header dict, falling back to ``default``."""
raw = fields.get(key)
if raw is None:
return default
try:
return int(float(raw))
except ValueError:
return default
# ---------------------------------------------------------------------------
# Normalized header objects (same shape as tek.py's Header / ChannelHeader)
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def from_file(file_name: str) -> IsfWaveform:
"""Parse a Tektronix ISF file and normalize it for `Wfm.from_file()`.
Args:
file_name: path to a Tektronix ``.isf`` waveform file.
Returns:
An `IsfWaveform` object whose ``header`` follows the shape expected
by `RigolWFM.channel.Channel` and `RigolWFM.wfm.Wfm`.
Raises:
ValueError: if the file cannot be read or does not look like a valid
Tektronix ISF file.
"""
try:
with open(file_name, "rb") as f:
data = f.read()
except OSError as exc:
raise ValueError(f"Cannot open ISF file '{file_name}': {exc}") from exc
try:
parsed = _TektronixInternalIsf(KaitaiStream(_io.BytesIO(data)))
header_text: str = parsed.header_text
curve_bytes: bytes = bytes(parsed.curve_data)
except Exception as exc:
raise ValueError(f"Could not parse ISF file '{file_name}': {exc}") from exc
fields = _parse_header(header_text)
# --- Metadata ---
nr_pt = _int_field(fields, "nr_pt", 0)
byt_nr = _int_field(fields, "byt_nr", 2)
byt_or = fields.get("byt_or", "MSB").upper()
pt_fmt = fields.get("pt_fmt", "Y").upper()
wfid = fields.get("wfid", "")
xincr = _float_field(fields, "xincr", 1e-6)
xzero = _float_field(fields, "xzero", 0.0)
pt_off = _float_field(fields, "pt_off", 0.0)
ymult = _float_field(fields, "ymult", 1.0)
yoff = _float_field(fields, "yoff", 0.0)
yzero = _float_field(fields, "yzero", 0.0)
vscale = _float_field(fields, "vscale", 0.0)
# --- Endianness ---
byte_order = ">" if byt_or == "MSB" else "<"
# --- Trim trailing newline that some ISF files append after the data ---
# Only strip CR/LF bytes; do NOT strip null bytes since they are valid
# sample data (e.g. the low byte of a zero-valued int16 sample).
curve_bytes = curve_bytes.rstrip(b"\n\r")
# --- Decode ADC samples ---
if byt_nr == 1:
dtype = np.dtype(f"{byte_order}i1")
else:
dtype = np.dtype(f"{byte_order}i2")
adc_all = np.frombuffer(curve_bytes, dtype=dtype)
# --- Handle envelope mode ---
is_env = pt_fmt == "ENV"
if is_env and len(adc_all) >= 2:
adc_min = adc_all[0::2]
adc_max = adc_all[1::2]
n_pts = len(adc_min)
# Use the average of min/max as the representative trace
adc = (adc_min.astype(np.float64) + adc_max.astype(np.float64)) / 2.0
else:
adc = adc_all.astype(np.float64)
n_pts = len(adc)
if 0 < nr_pt < n_pts:
adc = adc[:nr_pt]
n_pts = nr_pt
# --- Calibrate to volts ---
volts = (yzero + ymult * (adc - yoff)).astype(np.float32)
# --- Build time axis ---
if is_env:
t0 = xzero + (0 * 2 - pt_off) * xincr
t_step = xincr * 2
else:
t0 = xzero - pt_off * xincr
t_step = xincr
# --- Volts per division ---
if vscale != 0.0:
volt_per_div = abs(vscale)
elif ymult != 0.0:
# estimate: range ≈ 8 divs × 32 counts/div for int16
volt_per_div = abs(ymult) * 32.0
else:
volt_per_div = 1.0
# --- Build raw proxy (high byte of int16, or direct for int8) ---
if byt_nr == 2:
raw8 = (adc.astype(np.int16).view(np.uint16) >> 8).astype(np.uint8)
else:
raw8 = adc.astype(np.int8).view(np.uint8)
# --- Assemble normalized objects ---
obj = IsfWaveform()
h = obj.header
h.model = "Tektronix"
h.trace_label = wfid
h.n_pts = n_pts
h.x_origin = t0
h.x_increment = t_step
slot = 0
ch = h.ch[slot]
ch.name = "CH1"
ch.enabled = True
ch.coupling = "DC"
ch.probe_value = 1.0
ch.volt_per_division = volt_per_div
ch.volt_scale = ymult
ch.volt_offset = yzero - ymult * yoff
h.channel_data[slot] = volts
h.raw_data[slot] = raw8
return obj