Source code for RigolWFM.wfmconvert

#!/usr/bin/env python3

"""
Command line utility to convert oscilloscope waveform files.

    Examples::

        prompt> wfmconvert info DS1102E-A.wfm

        prompt> wfmconvert csv DS1102E-A.wfm

        prompt> wfmconvert wav DS1102E-A.wfm
"""

import re
import os
import sys
import shutil
import argparse
import subprocess
import textwrap
from typing import NoReturn

import matplotlib.pyplot as plt

import RigolWFM
import RigolWFM.wfm


def _build_model_aliases() -> dict[str, str]:
    """Return case-insensitive CLI aliases mapped to canonical model names."""
    aliases: dict[str, str] = {"AUTO": "auto"}

    rigol_families = [
        ("B", RigolWFM.wfm.DS1000B_scopes),
        ("C", RigolWFM.wfm.DS1000C_scopes),
        ("D", RigolWFM.wfm.DS1000D_scopes),
        ("E", RigolWFM.wfm.DS1000E_scopes),
        ("Z", RigolWFM.wfm.DS1000Z_scopes),
        ("2", RigolWFM.wfm.DS2000_scopes),
        ("4", RigolWFM.wfm.DS4000_scopes),
        ("5", RigolWFM.wfm.DS5000_scopes),
        ("6", RigolWFM.wfm.DS6000_scopes),
        ("7", RigolWFM.wfm.DS7000_scopes),
        ("8", RigolWFM.wfm.DS8000_scopes),
        ("DHO", RigolWFM.wfm.DHO1000_scopes),
    ]
    for canonical, family in rigol_families:
        aliases[canonical.upper()] = canonical
        for alias in family:
            aliases[str(alias).upper()] = canonical
    for alias in RigolWFM.wfm.MSO5074_scopes:
        aliases[str(alias).upper()] = "5"

    vendor_families = [
        ("Keysight", RigolWFM.wfm.Keysight_scopes),
        ("Siglent", RigolWFM.wfm.Siglent_scopes),
        ("SiglentOld", RigolWFM.wfm.Siglent_old_scopes),
        ("RohdeSchwarz", RigolWFM.wfm.RohdeSchwarz_scopes),
        ("LeCroy", RigolWFM.wfm.LeCroy_scopes),
        ("Tek", RigolWFM.wfm.Tek_scopes),
        ("ISF", RigolWFM.wfm.ISF_scopes),
        ("Yokogawa", RigolWFM.wfm.Yokogawa_scopes),
    ]
    for canonical, family in vendor_families:
        aliases[canonical.upper()] = canonical
        for alias in family:
            aliases[str(alias).upper()] = canonical

    return aliases


_MODEL_ALIASES = _build_model_aliases()
_CANONICAL_MODELS = [
    "auto",
    "B",
    "C",
    "D",
    "E",
    "Z",
    "2",
    "4",
    "5",
    "6",
    "7",
    "8",
    "DHO",
    "Keysight",
    "Siglent",
    "SiglentOld",
    "RohdeSchwarz",
    "LeCroy",
    "Tek",
    "ISF",
    "Yokogawa",
]


def _normalize_model_choice(value: str) -> str:
    """Return a canonical CLI model string or raise argparse.ArgumentTypeError."""
    canonical = _MODEL_ALIASES.get(value.upper())
    if canonical is None:
        raise argparse.ArgumentTypeError(
            "unsupported model '{}'; choose one of {}. Supported aliases are also accepted.".format(
                value, ", ".join(_CANONICAL_MODELS)
            )
        )
    return canonical


def _output_path(infile: str, ext: str, output_dir: str) -> str:
    """Return output path: basename of infile with ext replaced, in output_dir."""
    stem = os.path.splitext(os.path.basename(infile))[0]
    return os.path.join(output_dir, stem + ext)


[docs] def info(_args: argparse.Namespace, scope_data: RigolWFM.wfm.Wfm, _infile: str) -> None: """Print a text summary describing a waveform file.""" s = scope_data.describe() print(s)
[docs] def csv(args: argparse.Namespace, scope_data: RigolWFM.wfm.Wfm, infile: str) -> None: """Create a file with comma separated values.""" csv_name = _output_path(infile, ".csv", args.output_dir) if os.path.isfile(csv_name) and not args.force: print(f"'{csv_name}' exists, use --force to overwrite") return s = scope_data.csv() with open(csv_name, "wb") as f: b = s.encode(encoding="utf-8") f.write(b)
[docs] def npz(args: argparse.Namespace, scope_data: RigolWFM.wfm.Wfm, infile: str) -> None: """Create a NumPy `.npz` archive.""" npz_name = _output_path(infile, ".npz", args.output_dir) if os.path.isfile(npz_name) and not args.force: print(f"'{npz_name}' exists, use --force to overwrite") return try: scope_data.npz(npz_name) except ValueError as e: print(f"wfmconvert error: {e}", file=sys.stderr) sys.exit(1)
[docs] def mat(args: argparse.Namespace, scope_data: RigolWFM.wfm.Wfm, infile: str) -> None: """Create a MATLAB `.mat` file.""" mat_name = _output_path(infile, ".mat", args.output_dir) if os.path.isfile(mat_name) and not args.force: print(f"'{mat_name}' exists, use --force to overwrite") return try: scope_data.mat(mat_name) except ValueError as e: print(f"wfmconvert error: {e}", file=sys.stderr) sys.exit(1)
[docs] def wav(args: argparse.Namespace, scope_data: RigolWFM.wfm.Wfm, infile: str) -> None: """Create an audible .wav file for use in LTspice.""" wav_name = _output_path(infile, ".wav", args.output_dir) if os.path.isfile(wav_name) and not args.force: print(f"'{wav_name}' exists, use --force to overwrite") return channel_digits = [int(c) for c in args.channel] if len(channel_digits) > 2: print( f"wfmconvert error: wav supports at most 2 channels; got --channel {args.channel}.\n" "Use --channel 1 for mono or --channel 12 for stereo.", file=sys.stderr, ) sys.exit(1) channel: int | list[int] = channel_digits[0] if len(channel_digits) == 1 else channel_digits try: scope_data.wav(wav_name, channel=channel, scale=args.scale) except ValueError as e: print(f"wfmconvert error: {e}", file=sys.stderr) sys.exit(1)
[docs] def png(args: argparse.Namespace, scope_data: RigolWFM.wfm.Wfm, infile: str) -> None: """Save a PNG plot of the waveform.""" png_name = _output_path(infile, ".png", args.output_dir) if os.path.isfile(png_name) and not args.force: print(f"'{png_name}' exists, use --force to overwrite") return fig = scope_data.plot() fig.savefig(png_name, dpi=args.dpi, bbox_inches="tight", facecolor="black") plt.close(fig)
[docs] def sigrok(args: argparse.Namespace, scope_data: RigolWFM.wfm.Wfm, infile: str) -> bool: """Create a Sigrok (.sr) file.""" sigrok_name = _output_path(infile, ".sr", args.output_dir) if os.path.isfile(sigrok_name) and not args.force: print(f"'{sigrok_name}' exists, use --force to overwrite", file=sys.stderr) return False s = scope_data.sigrokcsv() sigrok_input_format = _sigrok_csv_input_format(scope_data) if not s or not sigrok_input_format: print("No analog or digital channels are available to export to sigrok.", file=sys.stderr) return False # Check if sigrok-cli is installed and accessible if not shutil.which("sigrok-cli"): print("sigrok-cli is not installed or not found in PATH.", file=sys.stderr) print( "See https://sigrok.org/wiki/Sigrok-cli for more information.", file=sys.stderr, ) return False # sigrok-cli reports a warning about /dev/stdin not being a regular file, # but the conversion works fine. try: p = subprocess.run( [ "sigrok-cli", "-I", sigrok_input_format, "-i", "/dev/stdin", "-o", sigrok_name, ], input=s, check=True, stderr=subprocess.PIPE, text=True, ) except subprocess.CalledProcessError as e: print(f"sigrok-cli failed with error: {e.stderr}", file=sys.stderr) print( "See https://sigrok.org/wiki/Sigrok-cli for more information.", file=sys.stderr, ) return False except Exception as e: print(f"An unexpected error occurred: {e}", file=sys.stderr) return False if p.returncode != 0: print(f"sigrok-cli returned non-zero exit code: {p.returncode}", file=sys.stderr) print( "See https://sigrok.org/wiki/Sigrok-cli for more information.", file=sys.stderr, ) return False if not os.path.isfile(sigrok_name) or os.path.getsize(sigrok_name) == 0: print(f"sigrok-cli did not produce a usable output file: '{sigrok_name}'", file=sys.stderr) print( "See https://sigrok.org/wiki/Sigrok-cli for more information.", file=sys.stderr, ) return False return True
def _sigrok_csv_input_format(scope_data: RigolWFM.wfm.Wfm) -> str: """Return `sigrok-cli` CSV import options for the waveform's exported series.""" times, series = scope_data._csv_series() if len(times) == 0 or not series: return "" format_parts = ["t"] previous = "" count = 0 for _, kind, _ in series: current = "a" if kind == "analog" else "l" if current == previous: count += 1 continue if previous: format_parts.append(f"{count}{previous}" if count > 1 else previous) previous = current count = 1 format_parts.append(f"{count}{previous}" if count > 1 else previous) options = ["csv", "start_line=1", "header=true", f"column_formats={','.join(format_parts)}"] if len(times) > 1: span = float(times[-1] - times[0]) if span > 0: samplerate = (len(times) - 1) / span options.insert(3, f"samplerate={format(samplerate, '.17g')}") return ":".join(options) class _WfmParser(argparse.ArgumentParser): """ArgumentParser that gives a friendlier error when the action verb is omitted.""" def error(self, message: str) -> NoReturn: if "argument action: invalid choice" in message: self.exit( 2, ( "wfmconvert error: missing action verb.\n" "Example: wfmconvert info DS1102E.wfm\n" "Run 'wfmconvert --help' for more information.\n" ), ) super().error(message)
[docs] def main() -> None: """Parse console command line arguments.""" parser = _WfmParser( prog="wfmconvert", description="Convert oscilloscope waveform files to another format.", formatter_class=argparse.RawTextHelpFormatter, epilog=textwrap.dedent("""\ examples: wfmconvert info DS1102E.wfm wfmconvert csv DS1102E.wfm wfmconvert npz DS1102E.wfm wfmconvert mat DS1102E.wfm wfmconvert png DS1102E.wfm wfmconvert --output-dir /tmp csv *.wfm wfmconvert --channel 2 csv DS1102E.wfm wfmconvert --channel 124 sigrok DS1102E.wfm wfmconvert --channel 3 --scale scope wav DS1102E.wfm wfmconvert --channel 12 --scale scope wav DS1102E.wfm wfmconvert --model C info DS1042C-A.wfm """), ) parser.add_argument( "--model", type=_normalize_model_choice, default="auto", metavar="MODEL", help=textwrap.dedent("""\ oscilloscope model family (default: auto-detect from file). canonical values: auto, B, C, D, E, Z, 2, 4, 5, 6, 7, 8, DHO, Keysight, Siglent, SiglentOld, RohdeSchwarz, LeCroy, Tek, ISF, Yokogawa. aliases are also accepted. """), ) parser.add_argument( "--output-dir", metavar="DIR", default=".", help="directory for output files (default: current working directory)", ) parser.add_argument( "--force", action="store_true", default=False, help="overwrite existing output files", ) parser.add_argument( "--dpi", type=int, default=300, help="resolution in dots per inch for PNG output (default: 300)", ) parser.add_argument( "--scale", choices=["auto", "scope"], default="auto", help=textwrap.dedent("""\ voltage scaling for WAV output (default: auto). auto: signal min/max → ±32767. Waveform shape is preserved; set Vpeak on the LTspice WAV source to half the signal's peak-to-peak voltage. scope: scope ±(4×V/div) range → ±32767. Zero volts stays at zero; set Vpeak = 4×V/div in LTspice. """), ) parser.add_argument( "--channel", type=str, default="1234", help=textwrap.dedent("""\ select channel(s) to process. `--channel 1` outputs only contents of the first channel. `--channel 34` outputs contents of channels 3 and 4. The default is `--channel 1234`. """), ) parser.add_argument( "--version", action="version", version="%(prog)s {version}".format(version=RigolWFM.__version__), ) parser.add_argument( dest="action", choices=["csv", "info", "png", "wav", "sigrok", "npz", "mat"], help=textwrap.dedent("""\ csv: convert to a file with comma separated values info: show the various scope settings for a waveform file npz: save waveform arrays in a NumPy `.npz` archive mat: save waveform arrays in a MATLAB `.mat` file png: save a waveform plot as a PNG image (use --dpi to set resolution) wav: convert to a WAV sound format file for use with Pulseview or LTspice. sigrok: convert to a sigrok file """), ) parser.add_argument("infile", type=str, nargs="+", help="the waveform file(s) to be converted") args = parser.parse_args() if not os.path.isdir(args.output_dir): print(f"wfmconvert error: output directory '{args.output_dir}' does not exist", file=sys.stderr) sys.exit(1) # strip anything that is not a possible channel number good = re.sub(r"[^1234]", "", args.channel) # remove duplicates keeping order selected = "".join(dict.fromkeys(good)) if len(selected) == 0: print("\nwfmconvert error") print("No valid channels were passed after --channel") print("Channels are identified by number and must be a combination of 1, 2, 3, or 4") print(f'You used "--channel {args.channel}"') sys.exit(1) actionMap = {"info": info, "csv": csv, "npz": npz, "mat": mat, "png": png, "wav": wav, "sigrok": sigrok} for filename in args.infile: try: model = args.model if model == "auto": model = RigolWFM.wfm.detect_model(filename) print(f"Detected model: {model}", file=sys.stderr) scope_data = RigolWFM.wfm.Wfm.from_file(filename, model, selected) result = actionMap[args.action](args, scope_data, filename) if result is False: sys.exit(1) except FileNotFoundError: print(f"wfmconvert error: file not found: '{filename}'", file=sys.stderr) sys.exit(1) except RigolWFM.wfm.Read_WFM_Error as e: if isinstance(e.__cause__, FileNotFoundError): print(f"wfmconvert error: file not found: '{filename}'", file=sys.stderr) else: print(f"wfmconvert error: could not read '{filename}': {e}", file=sys.stderr) sys.exit(1) except RigolWFM.wfm.Unknown_Scope_Error as e: print(e, file=sys.stderr) sys.exit(1) except (RigolWFM.wfm.Parse_WFM_Error, ValueError) as e: if args.model == "auto": print(f"Could not detect or parse the scope model for '{filename}'.", file=sys.stderr) else: print( f"File contents do not follow the selected oscilloscope model format '{args.model}'.", file=sys.stderr, ) print("To help with development, please report this error", file=sys.stderr) print("as an issue to https://github.com/scottprahl/RigolWFM\n", file=sys.stderr) print(e, file=sys.stderr) sys.exit(1)
if __name__ == "__main__": main()