Skip to content

Python from a notebook

There are three Python entry points into a galois-edge daemon, in order of decreasing friction:

LayerWhen to use itWhat it looks like
galois SDKNew code that wants typed objects, NumPy decoding for waveforms, context-managed streams and sweeps, and one API that works locally and through the cloud.galois.Edge.connect("lab-pi:50051")
pyvisa-galois backendYou already have PyVISA scripts. Change one line and they work against the daemon.pyvisa.ResourceManager("@galois")
Raw gRPC stubsBuilding tooling, generating clients in another language, or you don’t want a Python wrapper at all.from galois_edge import edge_pb2, edge_pb2_grpc

The galois package is the canonical Python wrapper around galois.edge.v1.EdgeDaemonService. Sync and async surfaces in one package, NumPy decoding for VectorData, friendly exceptions, cloud-routed access through the Galois Cloud HTTP API.

Terminal window
pip install galois
import galois
with galois.Edge.connect("lab-pi.tail-1234.ts.net:50051") as edge:
print(edge.ping())
for inst in edge.instruments():
print(inst.id, inst.manufacturer, inst.model)
dmm = edge.instrument("USB0::0x2A8D::0x0101::MY54505555::INSTR")
print(dmm.query("*IDN?"))
print(dmm.execute("measure_voltage"))

Edge.connect accepts an auth_token= kwarg — pass the daemon’s INBOUND_AUTH_TOKEN value when the daemon was started with one. When the daemon has no inbound token configured, the kwarg is silently ignored and the network is the auth boundary.

edge = galois.Edge.connect(
"lab-pi:50051",
auth_token="glc_internal_xxxxxxxxxxxxxxxx",
)

The channel ships with sensible keepalive defaults so a session that sits idle on a Tailscale path for hours doesn’t silently drop.

import os, galois
cloud = galois.Cloud.connect(
"https://cloud.galoislabs.ai",
token=os.environ["GALOIS_AUTH_TOKEN"],
)
edge = cloud.edge("rigol-rack-1")
with edge:
dmm = edge.instruments()[0]
print(dmm.query("*IDN?"))

Cloud-routed mode covers the v1 supported subset: instruments, instrument, query/write/read, execute, capabilities, and proxy_sdk_call. Anything else — streaming, sweeps, sequences, scan, status, webcam_snapshot, profile management — raises EdgeUnsupportedError until the cloud backend gains the relevant endpoint.

with smu.stream("measure_current", interval_ms=200, timeout_ms=10_000) as s:
for point in s:
print(point.timestamp_ms, point.value, point.unit)
if point.value > 0.5:
break # context manager calls StopStream on exit

For waveform-shaped returns, point.waveform is a Waveform dataclass with y and x as numpy.ndarray:

with scope.stream("get_waveform", interval_ms=500) as s:
for point in s:
wf = point.waveform
plt.plot(wf.x, wf.y)
plt.xlabel(wf.x_unit); plt.ylabel(wf.y_unit)
break
sweep = magnet.start_sweep("set_field", target_value=2.5, sweep_rate=0.1)
while not sweep.status().status in ("completed", "error", "aborted"):
print(sweep.status().current_value)
time.sleep(1)
# Or:
sweep.wait(poll_interval=1.0) # blocks until terminal
sweep.stop() # park at current value (status: holding)
sweep.cancel() # abort to safe state (status: stopped)

Sweep.stop() holds at the current value; Sweep.cancel() runs the profile’s abort SCPI. The sweep continues on the daemon if your client drops.

Every method on Edge has an async twin on AsyncEdge. Same names, awaitable, async iterators where iterators were used.

import asyncio, galois
async def main():
async with await galois.AsyncEdge.connect("lab-pi:50051") as edge:
dmm = await edge.instrument_by_class("dmm")
async with dmm.stream("measure_voltage") as s:
async for p in s:
print(p.timestamp_ms, p.value)
if p.value > 1.0:
break
asyncio.run(main())

AsyncCloud, AsyncCloudEdge, AsyncInstrument, AsyncStream, and AsyncSweep mirror the sync surface 1:1. EdgeError and its subclasses cover both surfaces — there is no parallel async exception hierarchy.

All operations raise from a single hierarchy:

from galois import EdgeError, EdgeConnectionError, EdgeTimeoutError, \
EdgeUnsupportedError, InstrumentError, ProfileError, SweepError
try:
smu.execute("measure_current")
except EdgeTimeoutError as e:
...
except EdgeError as e:
print(e.status, e.message, getattr(e, "scpi_command", None))

UNAVAILABLEEdgeConnectionError, DEADLINE_EXCEEDEDEdgeTimeoutError, UNIMPLEMENTEDEdgeUnsupportedError. Per-RPC failures map to InstrumentError / ProfileError / SweepError and carry scpi_command and instrument_id where relevant.

See PyVISA Backend for the full reference. The 30-second version:

import pyvisa
rm = pyvisa.ResourceManager("@galois")
print(rm.list_resources())
dmm = rm.open_resource("USB0::0x2A8D::0x0101::MY54505555::INSTR")
print(dmm.query("*IDN?"))
print(float(dmm.query("MEAS:VOLT:DC?")))
dmm.close()

When a profile matches the instrument, the typed proxy is auto-attached to the resource — call profile commands directly:

smu = rm.open_resource("GPIB0::24::INSTR")
smu.set_voltage(voltage=1.5) # profile-typed, keyword-only
current = smu.measure_current()
smu.query("*IDN?") # PyVISA standard — always works

PyVISA’s own methods always win on name collision.

The daemon’s gRPC server is the canonical contract. Generated Python stubs ship inside the engine package and can be vendored into your environment, or regenerated from proto/edge/v1/edge.proto. With the daemon running on localhost:50051:

import grpc
from galois_edge import edge_pb2, edge_pb2_grpc
channel = grpc.insecure_channel("localhost:50051")
stub = edge_pb2_grpc.EdgeDaemonServiceStub(channel)
# Health
ping = stub.Ping(edge_pb2.PingRequest())
# Inventory
resp = stub.ListInstruments(edge_pb2.ListInstrumentsRequest())
for inst in resp.instruments:
print(inst.id, inst.manufacturer, inst.model, inst.address)
# Raw SCPI
out = stub.SendCommand(edge_pb2.SendCommandRequest(
instrument_id="GPIB0::24::INSTR",
scpi_command="*IDN?",
))
print(out.response)

When the daemon was started with INBOUND_AUTH_TOKEN, attach the bearer token as call metadata:

auth = (("authorization", "Bearer glc_internal_xxxx"),)
out = stub.SendCommand(req, metadata=auth)
req = edge_pb2.ExecuteCommandRequest(
instrument_id="GPIB0::24::INSTR",
command_name="set_voltage",
parameters={"value": "1.5"},
)
out = stub.ExecuteCommand(req)
print(out.success, out.scpi_command, out.data)

The daemon validates parameters against the YAML profile, applies value mappings, and returns the SCPI it actually sent. For commands marked requires_sweep in the profile, ExecuteCommand returns an error — use StartSweep instead.

req = edge_pb2.StreamMeasurementRequest(
stream_id="dmm-volt",
instrument_id="USB0::0x2A8D::0x0101::MY54505555::INSTR",
command_name="measure_voltage",
interval_ms=200,
timeout_ms=10_000,
)
for point in stub.StreamMeasurement(req):
print(point.timestamp_ms, point.value, point.unit)

Minimum interval_ms is 100. Stop early with StopStream(stream_id=...).

For waveform-shaped returns (oscilloscope traces), point.vector_data carries raw bytes plus dtype, x-axis start/increment, and units:

import numpy as np
v = point.vector_data
y = np.frombuffer(v.y_data, dtype=v.y_dtype)
x = v.x_start + np.arange(v.y_length) * v.x_increment

For commands the profile marks requires_sweep: true (magnets, temperature controllers), the regular execute path is rejected. Drive them through the sweep RPCs:

sweep = stub.StartSweep(edge_pb2.StartSweepRequest(
instrument_id="magnet-1",
command_name="set_field",
target_value=2.5,
sweep_rate=0.1,
))
while True:
status = stub.GetSweepStatus(edge_pb2.GetSweepStatusRequest(sweep_id=sweep.sweep_id))
print(status.current_value, status.status)
if status.status in ("completed", "error", "aborted"):
break
# To abort early:
stub.StopSweep(edge_pb2.StopSweepRequest(sweep_id=sweep.sweep_id))

The sweep runs autonomously on the daemon — your client can drop offline and the ramp continues.

For instruments without a profile but with a vendor Python SDK installed on the edge — Quantum Design PPMS, Zurich Instruments MFLI, etc. — ProxySDKCall invokes a method on the edge daemon’s interpreter:

from google.protobuf import struct_pb2
req = edge_pb2.ProxySDKCallRequest(
instrument_id="ppms-1",
module="MultiPyVu",
method="set_temperature",
args=[struct_pb2.Value(number_value=100.0),
struct_pb2.Value(number_value=5.0)],
)
out = stub.ProxySDKCall(req)

Args and return values are JSON-serialised, so anything google.protobuf.Value carries (numbers, strings, bools, lists, dicts) is supported.

The galois.Cloud class above is the typed entry point. For unwrapped HTTP calls — language-agnostic, useful for non-Python tooling — the cloud backend exposes the daemon API at <backend>/api/v1/edges/<edge_id>/... with JWT authentication. That surface lives in the cloud backend repo, not the daemon, and is documented separately.