CodeX---Python控制各种电表和仪器
·
Python 控制 6221 / SR830 / 2410 / PCO.edge camera / Thorlabs rotation stage / N5227A
准备工作
安装VScode和Python3.11
下载安装好VSCode和Python3.11并将其添加至PATH
安装控制电表以及画图所需要的包
运行安装好相应包后pip install pyvisa pyvisa-py matplotlib numpypip install pco opencv-python flask pythonnet
识别电脑上的VISA接口
import pyvisa
rm = pyvisa.ResourceManager()
resources = rm.list_resources()
for addr in resources:
print("\nAddress:", addr)
try:
inst = rm.open_resource(addr)
inst.timeout = 2000
inst.write_termination = "\n"
inst.read_termination = "\n"
try:
print("IDN:", inst.query("*IDN?").strip())
except Exception as e:
print("IDN query failed:", e)
inst.close()
except Exception as e:
print("Open failed:", e)
rm.close()
记录相应仪器的GPIB接口
利用CodeX生成简单读取代码
2182实时读取
import time
import csv
from datetime import datetime
from collections import deque
import pyvisa
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# 2182 的 VISA 地址,需要根据程序打印出的 VISA 设备列表修改
# 常见格式:GPIB0::7::INSTR
VISA_RESOURCE = "GPIB0::7::INSTR"
# 2182 测量通道
CHANNEL = 1
SAMPLE_INTERVAL = 1.0
NPLC = 1
# True 表示使用仪器自动量程
# False 表示向仪器写入固定量程 VOLTAGE_RANGE
AUTO_RANGE = True
VOLTAGE_RANGE = 0.1
WINDOW_SECONDS = 6000
ENABLE_CSV = True
CSV_PATH = "keithley_2182_voltage.csv"
def init_2182(inst):
# 设置 VISA 通信超时时间,单位 ms
inst.timeout = 10000
# 设置 SCPI 命令的结束符
inst.write_termination = "\n"
inst.read_termination = "\n"
# 重置仪器并清除错误队列
inst.write("*RST")
inst.write("*CLS")
# 设置 2182 为电压测量模式
inst.write(":SENS:FUNC 'VOLT'")
# 向仪器写入测量通道
inst.write(f":SENS:CHAN {CHANNEL}")
# 向仪器写入 NPLC 积分时间
# NPLC 越大,读数越稳,但速度越慢
inst.write(f":SENS:VOLT:NPLC {NPLC}")
# 设置 :READ? 只返回读数,便于直接转换成 float
inst.write(":FORM:ELEM READ")
if AUTO_RANGE:
# 向指定通道写入自动量程设置
inst.write(f":SENS:VOLT:CHAN{CHANNEL}:RANG:AUTO ON")
else:
# 向指定通道写入固定量程,单位 V
inst.write(f":SENS:VOLT:CHAN{CHANNEL}:RANG {VOLTAGE_RANGE}")
time.sleep(0.5)
def read_voltage_v(inst):
# 向 2182 发送读取命令 :READ?
# 仪器返回当前电压读数,单位 V
response = inst.query(":READ?").strip()
return float(response)
def main():
# 创建 VISA 资源管理器,用于识别和连接仪器
rm = pyvisa.ResourceManager()
# 读取当前电脑识别到的所有 VISA 仪器地址
print("可用 VISA 设备:")
resources = rm.list_resources()
if not resources:
print("没有检测到 VISA 设备,请检查 NI-VISA、连接线和仪器地址。")
else:
for res in resources:
print(" ", res)
print(f"\n正在连接:{VISA_RESOURCE}")
# 使用 VISA 地址打开 2182
inst = rm.open_resource(VISA_RESOURCE)
# 向 2182 写入初始化参数
init_2182(inst)
csv_file = None
writer = None
if ENABLE_CSV:
csv_file = open(CSV_PATH, "w", newline="", encoding="utf-8")
writer = csv.writer(csv_file)
writer.writerow(["timestamp", "elapsed_s", "voltage_V", "voltage_nV"])
t0 = time.time()
xs = deque()
ys_nv = deque()
fig, ax = plt.subplots()
line, = ax.plot([], [], linewidth=1.5)
ax.set_title("Keithley 2182 Real-time Voltage")
ax.set_xlabel("Time (s)")
ax.set_ylabel("Voltage (nV)")
ax.grid(True)
value_text = ax.text(
0.02,
0.95,
"",
transform=ax.transAxes,
va="top",
ha="left",
)
def update(_frame):
elapsed_s = time.time() - t0
try:
# 每次刷新图表时,从 2182 读取一次电压
voltage_v = read_voltage_v(inst)
except Exception as exc:
value_text.set_text(f"Read error: {exc}")
return line, value_text
voltage_nv = voltage_v * 1e9
xs.append(elapsed_s)
ys_nv.append(voltage_nv)
while xs and elapsed_s - xs[0] > WINDOW_SECONDS:
xs.popleft()
ys_nv.popleft()
line.set_data(xs, ys_nv)
if xs:
ax.set_xlim(
max(0, elapsed_s - WINDOW_SECONDS),
max(WINDOW_SECONDS, elapsed_s),
)
if ys_nv:
ymin = min(ys_nv)
ymax = max(ys_nv)
if ymin == ymax:
margin = max(abs(ymin) * 0.05, 1.0)
else:
margin = (ymax - ymin) * 0.15
ax.set_ylim(ymin - margin, ymax + margin)
now = datetime.now().isoformat(timespec="milliseconds")
value_text.set_text(f"{voltage_nv:.3f} nV")
if writer:
writer.writerow([
now,
f"{elapsed_s:.6f}",
f"{voltage_v:.12e}",
f"{voltage_nv:.6f}",
])
csv_file.flush()
print(
f"{now} | {elapsed_s:10.3f} s | "
f"{voltage_v:.12e} V | {voltage_nv:.3f} nV"
)
return line, value_text
def on_close(_event):
try:
# 让 2182 从远程控制返回本地面板控制
inst.write(":SYST:LOC")
except Exception:
pass
try:
# 关闭 2182 的 VISA 连接
inst.close()
except Exception:
pass
if csv_file:
csv_file.close()
# 关闭 VISA 资源管理器
rm.close()
print("已关闭连接。")
fig.canvas.mpl_connect("close_event", on_close)
ani = FuncAnimation(
fig,
update,
interval=int(SAMPLE_INTERVAL * 1000),
blit=False,
cache_frame_data=False,
)
plt.show()
if __name__ == "__main__":
main()
SR830实时读取
# sr830_realtime_plot.py
import time
from collections import deque
import pyvisa
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# SR830 的 VISA 地址。
# GPIB0::10::INSTR 表示:GPIB0 总线,仪器地址为 10。
# 如果不知道地址,可以运行 list_visa_resources() 查看电脑识别到的 VISA 设备。
SR830_ADDRESS = "GPIB0::10::INSTR"
SAMPLE_INTERVAL = 0.2
MAX_POINTS = 500
def list_visa_resources():
rm = pyvisa.ResourceManager()
# 列出当前电脑通过 VISA 能识别到的所有仪器地址。
# 例如可能看到:GPIB0::10::INSTR、USB0::...::INSTR 等。
resources = rm.list_resources()
print("Detected VISA resources:")
for res in resources:
print(" ", res)
rm.close()
def connect_sr830():
rm = pyvisa.ResourceManager()
# 根据 SR830_ADDRESS 打开仪器连接。
sr830 = rm.open_resource(SR830_ADDRESS)
# 设置仪器通信超时时间,单位 ms。
sr830.timeout = 5000
# SR830 的 GPIB/串口命令通常以换行符结束。
sr830.write_termination = "\n"
sr830.read_termination = "\n"
# OUTX 1:指定 SR830 通过 GPIB 返回数据。
# OUTX 0 = RS232,OUTX 1 = GPIB。
sr830.write("OUTX 1")
# *IDN?:读取仪器身份信息,用来确认连接的是 SR830。
print("Connected:", sr830.query("*IDN?").strip())
return rm, sr830
def init_sr830(sr830):
# sr830.write(...) 是向仪器写入设置命令。
# 这些命令会直接改变 SR830 面板上的对应参数。
# FMOD:参考源。
# 0 = 外部参考,1 = 内部参考。
sr830.write("FMOD 0")
# FREQ:内部参考频率,单位 Hz。
# 只有 FMOD 1 内部参考时,这个值才决定参考频率。
sr830.write("FREQ 1000")
# SLVL:正弦参考输出幅值,单位 Vrms。
sr830.write("SLVL 0.5")
# ISRC:信号输入方式。
# 0 = A,1 = A-B,2 = I(1 MOhm),3 = I(100 MOhm)。
sr830.write("ISRC 1")
# IGND:输入屏蔽接地方式。
# 0 = Float,1 = Ground。
sr830.write("IGND 0")
# ICPL:输入耦合方式。
# 0 = AC,1 = DC。
sr830.write("ICPL 1")
# ILIN:工频陷波滤波。
# 0 = Off,1 = Line,2 = 2xLine,3 = Line + 2xLine。
sr830.write("ILIN 1")
# SENS:灵敏度,也就是输入满量程。
# 0=2 nV/fA, 1=5 nV/fA, 2=10 nV/fA, 3=20 nV/fA
# 4=50 nV/fA, 5=100 nV/fA, 6=200 nV/fA, 7=500 nV/fA
# 8=1 uV/pA, 9=2 uV/pA, 10=5 uV/pA, 11=10 uV/pA
# 12=20 uV/pA, 13=50 uV/pA, 14=100 uV/pA, 15=200 uV/pA
# 16=500 uV/pA, 17=1 mV/nA, 18=2 mV/nA, 19=5 mV/nA
# 20=10 mV/nA, 21=20 mV/nA, 22=50 mV/nA, 23=100 mV/nA
# 24=200 mV/nA, 25=500 mV/nA, 26=1 V/uA。
sr830.write("SENS 9")
# OFLT:时间常数。
# 0=10 us, 1=30 us, 2=100 us, 3=300 us
# 4=1 ms, 5=3 ms, 6=10 ms, 7=30 ms
# 8=100 ms, 9=300 ms, 10=1 s, 11=3 s
# 12=10 s, 13=30 s, 14=100 s, 15=300 s
# 16=1 ks, 17=3 ks, 18=10 ks, 19=30 ks。
sr830.write("OFLT 10")
# OFSL:低通滤波斜率。
# 0 = 6 dB/oct,1 = 12 dB/oct,2 = 18 dB/oct,3 = 24 dB/oct。
sr830.write("OFSL 1")
# RMOD:动态储备。
# 0 = High Reserve,1 = Normal,2 = Low Noise。
sr830.write("RMOD 1")
# PHAS:参考相位,单位 degree。
sr830.write("PHAS 0")
def parse_lia_status(status):
# LIAS? 返回的是 SR830 的状态字。
# 这里用二进制 bit 判断仪器是否 overload 或参考失锁。
status = int(status)
# bit 0:输入或动态储备 overload。
input_overload = bool(status & (1 << 0))
# bit 1:滤波器 overload。
filter_overload = bool(status & (1 << 1))
# bit 2:输出 overload。
output_overload = bool(status & (1 << 2))
# bit 3:参考信号 unlock。
reference_unlock = bool(status & (1 << 3))
overload = input_overload or filter_overload or output_overload
messages = []
if input_overload:
messages.append("INPUT/RESERVE")
if filter_overload:
messages.append("FILTER")
if output_overload:
messages.append("OUTPUT")
if reference_unlock:
messages.append("UNLOCK")
if not messages:
messages.append("OK")
return overload, ", ".join(messages)
def read_sr830(sr830):
# sr830.query(...) 是向仪器发送查询命令,并读取仪器返回值。
# SNAP? 3,4:一次读取 R 和 Theta。
# 3 = R,4 = Theta。
# SR830 返回格式类似:1.234E-6,45.67
data = sr830.query("SNAP? 3,4")
r, theta = map(float, data.strip().split(","))
# FREQ?:读取当前参考频率。
# 内部参考时返回设定频率;外部参考时返回检测到的外部参考频率。
freq = float(sr830.query("FREQ?").strip())
# LIAS?:读取 SR830 状态字,用于判断 overload、unlock 等仪器状态。
lia_status = int(sr830.query("LIAS?").strip())
overload, status_text = parse_lia_status(lia_status)
return r, theta, freq, overload, status_text
def main():
# 如果不知道仪器地址,可以先取消下一行注释查看可用 VISA 地址。
# list_visa_resources()
rm, sr830 = connect_sr830()
init_sr830(sr830)
t_data = deque(maxlen=MAX_POINTS)
r_data = deque(maxlen=MAX_POINTS)
theta_data = deque(maxlen=MAX_POINTS)
start_time = time.time()
fig, axes = plt.subplots(2, 1, figsize=(10, 7), sharex=True)
ax1, ax2 = axes
line_r, = ax1.plot([], [], label="R", color="tab:blue")
line_theta, = ax2.plot([], [], label="Theta", color="tab:red")
freq_text = ax1.text(
0.02,
0.95,
"Reference Frequency: -- Hz",
transform=ax1.transAxes,
va="top",
ha="left",
)
overload_text = ax1.text(
0.02,
0.87,
"Status: --",
transform=ax1.transAxes,
va="top",
ha="left",
color="green",
fontweight="bold",
)
ax1.set_ylabel("R / V")
ax2.set_ylabel("Theta / degree")
ax2.set_xlabel("Time / s")
ax1.grid(True)
ax2.grid(True)
ax1.legend(loc="upper right")
ax2.legend(loc="upper right")
def update(frame):
try:
# 每刷新一次图,就从 SR830 读取一次 R、Theta、参考频率和状态。
r, theta, freq, overload, status_text = read_sr830(sr830)
except Exception as e:
# 常见原因:GPIB 地址错误、仪器未开机、VISA 驱动异常、通信超时。
print("Read error:", e)
return line_r, line_theta, freq_text, overload_text
t = time.time() - start_time
t_data.append(t)
r_data.append(r)
theta_data.append(theta)
line_r.set_data(t_data, r_data)
line_theta.set_data(t_data, theta_data)
freq_text.set_text(f"Reference Frequency: {freq:.6g} Hz")
if overload:
overload_text.set_text(f"Status: OVERLOAD ({status_text})")
overload_text.set_color("red")
else:
overload_text.set_text(f"Status: {status_text}")
overload_text.set_color("green")
ax1.relim()
ax1.autoscale_view()
ax2.relim()
ax2.autoscale_view()
print(
f"t = {t:.2f} s, "
f"R = {r:.6e} V, "
f"Theta = {theta:.3f} deg, "
f"Freq = {freq:.6g} Hz, "
f"Status = {status_text}"
)
return line_r, line_theta, freq_text, overload_text
ani = FuncAnimation(
fig,
update,
interval=int(SAMPLE_INTERVAL * 1000),
blit=False,
)
try:
plt.tight_layout()
plt.show()
finally:
# 关闭 SR830 VISA 连接。
sr830.close()
rm.close()
if __name__ == "__main__":
main()
2410实时测电阻
import time
import csv
from datetime import datetime
from collections import deque
import pyvisa
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# 2182 的 VISA 地址,需要根据程序打印出的 VISA 设备列表修改
# 常见格式:GPIB0::7::INSTR
VISA_RESOURCE = "GPIB0::7::INSTR"
# 2182 测量通道
CHANNEL = 1
SAMPLE_INTERVAL = 1.0
NPLC = 1
# True 表示使用仪器自动量程
# False 表示向仪器写入固定量程 VOLTAGE_RANGE
AUTO_RANGE = True
VOLTAGE_RANGE = 0.1
WINDOW_SECONDS = 6000
ENABLE_CSV = True
CSV_PATH = "keithley_2182_voltage.csv"
def init_2182(inst):
# 设置 VISA 通信超时时间,单位 ms
inst.timeout = 10000
# 设置 SCPI 命令的结束符
inst.write_termination = "\n"
inst.read_termination = "\n"
# 重置仪器并清除错误队列
inst.write("*RST")
inst.write("*CLS")
# 设置 2182 为电压测量模式
inst.write(":SENS:FUNC 'VOLT'")
# 向仪器写入测量通道
inst.write(f":SENS:CHAN {CHANNEL}")
# 向仪器写入 NPLC 积分时间
# NPLC 越大,读数越稳,但速度越慢
inst.write(f":SENS:VOLT:NPLC {NPLC}")
# 设置 :READ? 只返回读数,便于直接转换成 float
inst.write(":FORM:ELEM READ")
if AUTO_RANGE:
# 向指定通道写入自动量程设置
inst.write(f":SENS:VOLT:CHAN{CHANNEL}:RANG:AUTO ON")
else:
# 向指定通道写入固定量程,单位 V
inst.write(f":SENS:VOLT:CHAN{CHANNEL}:RANG {VOLTAGE_RANGE}")
time.sleep(0.5)
def read_voltage_v(inst):
# 向 2182 发送读取命令 :READ?
# 仪器返回当前电压读数,单位 V
response = inst.query(":READ?").strip()
return float(response)
def main():
# 创建 VISA 资源管理器,用于识别和连接仪器
rm = pyvisa.ResourceManager()
# 读取当前电脑识别到的所有 VISA 仪器地址
print("可用 VISA 设备:")
resources = rm.list_resources()
if not resources:
print("没有检测到 VISA 设备,请检查 NI-VISA、连接线和仪器地址。")
else:
for res in resources:
print(" ", res)
print(f"\n正在连接:{VISA_RESOURCE}")
# 使用 VISA 地址打开 2182
inst = rm.open_resource(VISA_RESOURCE)
# 向 2182 写入初始化参数
init_2182(inst)
csv_file = None
writer = None
if ENABLE_CSV:
csv_file = open(CSV_PATH, "w", newline="", encoding="utf-8")
writer = csv.writer(csv_file)
writer.writerow(["timestamp", "elapsed_s", "voltage_V", "voltage_nV"])
t0 = time.time()
xs = deque()
ys_nv = deque()
fig, ax = plt.subplots()
line, = ax.plot([], [], linewidth=1.5)
ax.set_title("Keithley 2182 Real-time Voltage")
ax.set_xlabel("Time (s)")
ax.set_ylabel("Voltage (nV)")
ax.grid(True)
value_text = ax.text(
0.02,
0.95,
"",
transform=ax.transAxes,
va="top",
ha="left",
)
def update(_frame):
elapsed_s = time.time() - t0
try:
# 每次刷新图表时,从 2182 读取一次电压
voltage_v = read_voltage_v(inst)
except Exception as exc:
value_text.set_text(f"Read error: {exc}")
return line, value_text
voltage_nv = voltage_v * 1e9
xs.append(elapsed_s)
ys_nv.append(voltage_nv)
while xs and elapsed_s - xs[0] > WINDOW_SECONDS:
xs.popleft()
ys_nv.popleft()
line.set_data(xs, ys_nv)
if xs:
ax.set_xlim(
max(0, elapsed_s - WINDOW_SECONDS),
max(WINDOW_SECONDS, elapsed_s),
)
if ys_nv:
ymin = min(ys_nv)
ymax = max(ys_nv)
if ymin == ymax:
margin = max(abs(ymin) * 0.05, 1.0)
else:
margin = (ymax - ymin) * 0.15
ax.set_ylim(ymin - margin, ymax + margin)
now = datetime.now().isoformat(timespec="milliseconds")
value_text.set_text(f"{voltage_nv:.3f} nV")
if writer:
writer.writerow([
now,
f"{elapsed_s:.6f}",
f"{voltage_v:.12e}",
f"{voltage_nv:.6f}",
])
csv_file.flush()
print(
f"{now} | {elapsed_s:10.3f} s | "
f"{voltage_v:.12e} V | {voltage_nv:.3f} nV"
)
return line, value_text
def on_close(_event):
try:
# 让 2182 从远程控制返回本地面板控制
inst.write(":SYST:LOC")
except Exception:
pass
try:
# 关闭 2182 的 VISA 连接
inst.close()
except Exception:
pass
if csv_file:
csv_file.close()
# 关闭 VISA 资源管理器
rm.close()
print("已关闭连接。")
fig.canvas.mpl_connect("close_event", on_close)
ani = FuncAnimation(
fig,
update,
interval=int(SAMPLE_INTERVAL * 1000),
blit=False,
cache_frame_data=False,
)
plt.show()
if __name__ == "__main__":
main()
PCO相机实时读取
import time
import cv2
import numpy as np
import pco
EXPOSURE_S = 0.010
RING_BUFFER_FRAMES = 10
VIDEO_WINDOW = "pco.edge live"
PLOT_WINDOW = "ROI intensity vs time"
zoom = 1.0
roi_start = None
roi_end = None
is_selecting_roi = False
roi = None
latest_display_shape = None
latest_image_shape = None
latest_offset = (0, 0)
latest_scale = 1.0
roi_times = []
roi_means = []
start_time = None
def mono16_to_preview(img_u16: np.ndarray) -> np.ndarray:
p1, p99 = np.percentile(img_u16, (1, 99.8))
img = np.clip((img_u16 - p1) * 255.0 / max(p99 - p1, 1), 0, 255)
return img.astype(np.uint8)
def image_to_window_xy(x_img, y_img):
ox, oy = latest_offset
return int(x_img * latest_scale + ox), int(y_img * latest_scale + oy)
def window_to_image_xy(x_win, y_win):
ox, oy = latest_offset
x_img = int((x_win - ox) / latest_scale)
y_img = int((y_win - oy) / latest_scale)
if latest_image_shape is None:
return 0, 0
h, w = latest_image_shape[:2]
x_img = np.clip(x_img, 0, w - 1)
y_img = np.clip(y_img, 0, h - 1)
return x_img, y_img
def mouse_callback(event, x, y, flags, param):
global zoom, roi_start, roi_end, is_selecting_roi, roi
global roi_times, roi_means, start_time
if latest_image_shape is None:
return
if event == cv2.EVENT_MOUSEWHEEL:
if flags > 0:
zoom = min(zoom * 1.2, 10.0)
else:
zoom = max(zoom / 1.2, 0.2)
elif event == cv2.EVENT_LBUTTONDOWN:
roi_start = window_to_image_xy(x, y)
roi_end = roi_start
is_selecting_roi = True
elif event == cv2.EVENT_MOUSEMOVE and is_selecting_roi:
roi_end = window_to_image_xy(x, y)
elif event == cv2.EVENT_LBUTTONUP:
roi_end = window_to_image_xy(x, y)
is_selecting_roi = False
x1, y1 = roi_start
x2, y2 = roi_end
x_min, x_max = sorted((x1, x2))
y_min, y_max = sorted((y1, y2))
if x_max > x_min and y_max > y_min:
roi = (x_min, y_min, x_max, y_max)
roi_times = []
roi_means = []
start_time = time.time()
else:
roi = None
roi_times = []
roi_means = []
start_time = None
def make_aspect_display(preview_u8: np.ndarray, window_w: int, window_h: int) -> np.ndarray:
global latest_offset, latest_scale, latest_display_shape, latest_image_shape
img_h, img_w = preview_u8.shape[:2]
latest_image_shape = preview_u8.shape
base_scale = min(window_w / img_w, window_h / img_h)
latest_scale = base_scale * zoom
disp_w = max(1, int(img_w * latest_scale))
disp_h = max(1, int(img_h * latest_scale))
resized = cv2.resize(preview_u8, (disp_w, disp_h), interpolation=cv2.INTER_AREA)
canvas = np.zeros((window_h, window_w), dtype=np.uint8)
ox = max(0, (window_w - disp_w) // 2)
oy = max(0, (window_h - disp_h) // 2)
src_x0 = max(0, -((window_w - disp_w) // 2))
src_y0 = max(0, -((window_h - disp_h) // 2))
dst_x0 = ox
dst_y0 = oy
copy_w = min(window_w - dst_x0, disp_w - src_x0)
copy_h = min(window_h - dst_y0, disp_h - src_y0)
if copy_w > 0 and copy_h > 0:
canvas[dst_y0:dst_y0 + copy_h, dst_x0:dst_x0 + copy_w] = resized[
src_y0:src_y0 + copy_h,
src_x0:src_x0 + copy_w,
]
latest_offset = ((window_w - disp_w) // 2, (window_h - disp_h) // 2)
latest_display_shape = canvas.shape
return cv2.cvtColor(canvas, cv2.COLOR_GRAY2BGR)
def draw_roi(display_bgr: np.ndarray, image_u16: np.ndarray):
active_roi = roi
if is_selecting_roi and roi_start is not None and roi_end is not None:
x1, y1 = roi_start
x2, y2 = roi_end
x_min, x_max = sorted((x1, x2))
y_min, y_max = sorted((y1, y2))
active_roi = (x_min, y_min, x_max, y_max)
if active_roi is None:
cv2.putText(
display_bgr,
"Drag mouse to select ROI | Wheel/+/- zoom | r reset | Close window to quit",
(20, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.65,
(0, 255, 255),
2,
cv2.LINE_AA,
)
return None
x1, y1, x2, y2 = active_roi
roi_width = x2 - x1
roi_height = y2 - y1
wx1, wy1 = image_to_window_xy(x1, y1)
wx2, wy2 = image_to_window_xy(x2, y2)
cv2.rectangle(display_bgr, (wx1, wy1), (wx2, wy2), (0, 255, 0), 2)
roi_img = image_u16[y1:y2, x1:x2]
if roi_img.size == 0:
return None
mean_val = float(np.mean(roi_img))
text = (
f"ROI x={x1}, y={y1}, "
f"w={roi_width}, h={roi_height} | "
f"mean={mean_val:.2f}"
)
print(text, end="\r")
cv2.putText(
display_bgr,
text,
(20, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.65,
(0, 255, 255),
2,
cv2.LINE_AA,
)
return mean_val
def make_intensity_plot(width=800, height=400):
plot = np.full((height, width, 3), 255, dtype=np.uint8)
margin_l = 70
margin_r = 20
margin_t = 25
margin_b = 55
cv2.rectangle(
plot,
(margin_l, margin_t),
(width - margin_r, height - margin_b),
(0, 0, 0),
1,
)
cv2.putText(
plot,
"ROI mean intensity vs time",
(margin_l, 18),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(0, 0, 0),
1,
cv2.LINE_AA,
)
if len(roi_times) < 2:
cv2.putText(
plot,
"Select ROI in video window",
(margin_l + 20, height // 2),
cv2.FONT_HERSHEY_SIMPLEX,
0.75,
(80, 80, 80),
2,
cv2.LINE_AA,
)
return plot
t = np.array(roi_times, dtype=np.float64)
y = np.array(roi_means, dtype=np.float64)
t_min, t_max = float(t[0]), float(t[-1])
y_min, y_max = float(np.min(y)), float(np.max(y))
if t_max <= t_min:
t_max = t_min + 1.0
if y_max <= y_min:
y_max = y_min + 1.0
x0 = margin_l
x1 = width - margin_r
y0 = height - margin_b
y1 = margin_t
xs = x0 + (t - t_min) / (t_max - t_min) * (x1 - x0)
ys = y0 - (y - y_min) / (y_max - y_min) * (y0 - y1)
points = np.column_stack((xs, ys)).astype(np.int32)
cv2.polylines(plot, [points], False, (0, 120, 255), 2, cv2.LINE_AA)
latest_text = f"mean={y[-1]:.2f}"
range_text = f"t={t_min:.1f}-{t_max:.1f}s"
cv2.putText(
plot,
latest_text,
(margin_l, height - 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 80, 200),
2,
cv2.LINE_AA,
)
cv2.putText(
plot,
range_text,
(margin_l + 170, height - 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(0, 0, 0),
1,
cv2.LINE_AA,
)
return plot
def main():
global zoom, roi, roi_times, roi_means, start_time
with pco.Camera() as cam:
cam.default_configuration()
cam.exposure_time = EXPOSURE_S
cam.configuration = {
"timestamp": "off",
"metadata": "off",
# "roi": (1, 1, 1024, 1024),
}
cv2.namedWindow(VIDEO_WINDOW, cv2.WINDOW_NORMAL)
cv2.resizeWindow(VIDEO_WINDOW, 1500, 1200)
cv2.setMouseCallback(VIDEO_WINDOW, mouse_callback)
cv2.namedWindow(PLOT_WINDOW, cv2.WINDOW_NORMAL)
cv2.resizeWindow(PLOT_WINDOW, 800, 400)
cam.record(RING_BUFFER_FRAMES, "ring buffer")
try:
while True:
if cv2.getWindowProperty(VIDEO_WINDOW, cv2.WND_PROP_VISIBLE) < 1:
break
cam.wait_for_new_image()
image, _ = cam.image(0xFFFFFFFF)
preview = mono16_to_preview(image)
_, _, window_w, window_h = cv2.getWindowImageRect(VIDEO_WINDOW)
window_w = max(window_w, 320)
window_h = max(window_h, 240)
display = make_aspect_display(preview, window_w, window_h)
mean_val = draw_roi(display, image)
if roi is not None and mean_val is not None:
if start_time is None:
start_time = time.time()
roi_times.append(time.time() - start_time)
roi_means.append(mean_val)
if len(roi_times) > 2000:
roi_times = roi_times[-2000:]
roi_means = roi_means[-2000:]
cv2.imshow(VIDEO_WINDOW, display)
cv2.imshow(PLOT_WINDOW, make_intensity_plot())
key = cv2.waitKey(1) & 0xFF
if key in (ord("+"), ord("=")):
zoom = min(zoom * 1.2, 10.0)
elif key in (ord("-"), ord("_")):
zoom = max(zoom / 1.2, 0.2)
elif key == ord("r"):
roi = None
zoom = 1.0
roi_times = []
roi_means = []
start_time = None
finally:
cam.stop()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
N5227A实时读取
import time
import matplotlib.pyplot as plt
from matplotlib.widgets import RadioButtons
import pyvisa
VISA_ADDR = "GPIB0::16::INSTR"
INTERVAL_S = 1.0
# 图窗里直接显示 S 参数名
# S11: Port 1 reflection
# S21: Forward transmission, input Port 1, output Port 2
# S12: Reverse transmission, input Port 2, output Port 1
# S22: Port 2 reflection
SPARAM_TO_CHANNEL = {
"S11": 1,
"S21": 2,
"S12": 3,
"S22": 4,
}
CHANNEL_TO_SPARAM = {
1: "S11",
2: "S21",
3: "S12",
4: "S22",
}
class N5227ARealtimePlot:
def __init__(self):
self.vna = None
self.channel = 1
self.sparam = "S11"
self.measurement = None
self.parameter = None
self.freq = None
self.x = None
self.fig = None
self.ax_plot = None
self.ax_radio = None
self.line = None
self.radio = None
def open_vna(self):
rm = pyvisa.ResourceManager()
self.vna = rm.open_resource(VISA_ADDR)
self.vna.timeout = 20000
self.vna.write_termination = "\n"
self.vna.read_termination = "\n"
print("Connected:", self.vna.query("*IDN?").strip())
self.vna.write("FORM:DATA ASC")
self.vna.write("FORM:BORD NORM")
def parse_ascii_floats(self, response):
return [float(x) for x in response.strip().split(",") if x.strip()]
def get_measurements(self, channel):
response = self.vna.query(f"CALC{channel}:PAR:CAT?").strip().replace('"', "")
parts = [part.strip() for part in response.split(",") if part.strip()]
measurements = []
for i in range(0, len(parts) - 1, 2):
measurements.append((parts[i], parts[i + 1]))
return measurements
def create_measurement_if_needed(self, channel):
target_param = CHANNEL_TO_SPARAM[channel]
target_name = f"CH{channel}_{target_param}_1"
measurements = self.get_measurements(channel)
for name, parameter in measurements:
if parameter.upper() == target_param:
print(f"CH{channel} 已有 {target_param}: {name}")
return name, parameter
print(f"CH{channel} 没有 {target_param},正在自动创建 {target_name}")
self.vna.write(f"CALC{channel}:PAR:DEF:EXT '{target_name}','{target_param}'")
self.vna.write(f"CALC{channel}:PAR:SEL '{target_name}'")
self.vna.write(f"CALC{channel}:FORM MLOG")
return target_name, target_param
def setup_all_measurements(self):
for channel in [1, 2, 3, 4]:
try:
self.create_measurement_if_needed(channel)
except Exception as e:
print(f"CH{channel} 自动创建失败: {e}")
def read_frequency_axis(self, channel):
try:
freq = self.parse_ascii_floats(self.vna.query(f"SENS{channel}:X?"))
return freq if freq else None
except Exception:
return None
def select_sparam(self, sparam):
self.sparam = sparam
self.channel = SPARAM_TO_CHANNEL[sparam]
try:
self.measurement, self.parameter = self.create_measurement_if_needed(self.channel)
except Exception as e:
print(f"{sparam} 选择失败: {e}")
self.measurement = None
self.parameter = None
self.freq = None
self.x = None
self.line.set_data([], [])
self.ax_plot.set_title(f"N5227A {sparam}: no measurement")
self.fig.canvas.draw_idle()
return
self.vna.write(f"CALC{self.channel}:PAR:SEL '{self.measurement}'")
self.vna.write(f"CALC{self.channel}:FORM MLOG")
self.freq = self.read_frequency_axis(self.channel)
if self.freq is not None:
self.x = [f / 1e9 for f in self.freq]
self.ax_plot.set_xlabel("Frequency (GHz)")
else:
self.x = None
self.ax_plot.set_xlabel("Point index")
self.ax_plot.set_ylabel("Magnitude (dB)")
print(f"切换到 {sparam}: CH{self.channel}, {self.measurement}")
def read_trace(self):
self.vna.write(f"SENS{self.channel}:SWE:MODE SING")
self.vna.query("*OPC?")
data = self.vna.query(f"CALC{self.channel}:DATA? FDATA")
return self.parse_ascii_floats(data)
def on_sparam_selected(self, label):
self.select_sparam(label)
def setup_plot(self):
self.fig = plt.figure(figsize=(11, 6))
# 左边留给 S 参数选择按钮,右边留一点空间避免控件和 y 轴/图表重叠
self.ax_plot = self.fig.add_axes([0.30, 0.12, 0.65, 0.80])
self.ax_radio = self.fig.add_axes([0.06, 0.38, 0.14, 0.28])
self.line, = self.ax_plot.plot([], [], linewidth=1.5)
self.ax_plot.set_xlabel("Frequency (GHz)")
self.ax_plot.set_ylabel("Magnitude (dB)", labelpad=10)
self.ax_plot.grid(True, alpha=0.35)
# 按你的要求,图表里直接用 S11 / S12 / S21 / S22 命名
labels = ("S11", "S12", "S21", "S22")
self.radio = RadioButtons(self.ax_radio, labels, active=0)
self.radio.on_clicked(self.on_sparam_selected)
self.select_sparam("S11")
def update_plot(self):
if self.measurement is None:
return
try:
y = self.read_trace()
except Exception as e:
print(f"读取 {self.sparam} 失败: {e}")
return
if not y:
print("没有读到数据")
return
if self.x is None or len(self.x) != len(y):
x_plot = list(range(1, len(y) + 1))
self.ax_plot.set_xlabel("Point index")
else:
x_plot = self.x
self.ax_plot.set_xlabel("Frequency (GHz)")
self.line.set_data(x_plot, y)
self.ax_plot.relim()
self.ax_plot.autoscale_view()
title = (
f"N5227A {self.sparam} | "
f"CH{self.channel} | "
f"{self.measurement} | "
f"{time.strftime('%H:%M:%S')}"
)
self.ax_plot.set_title(title)
self.fig.canvas.draw_idle()
def run(self):
self.open_vna()
print("正在检查并自动创建 S11 / S21 / S12 / S22")
self.setup_all_measurements()
self.setup_plot()
print("在图窗左侧选择 S11 / S12 / S21 / S22。")
print("关闭图像窗口或在 VS Code 里停止运行即可结束。")
try:
while plt.fignum_exists(self.fig.number):
self.update_plot()
plt.pause(INTERVAL_S)
except KeyboardInterrupt:
print("已停止。")
finally:
if self.vna is not None:
self.vna.close()
if __name__ == "__main__":
app = N5227ARealtimePlot()
app.run()
LS336实时读取
import re
import time
import matplotlib.pyplot as plt
import pyvisa
VISA_ADDR = "GPIB0::12::INSTR"
INTERVAL_S = 0.2
FIELD_QUERY = "RDGFIELD?"
UNIT_QUERY = "UNIT?"
FALLBACK_FIELD_UNIT = "uT"
def open_lakeshore_475():
rm = pyvisa.ResourceManager()
inst = rm.open_resource(VISA_ADDR)
inst.timeout = 5000
inst.write_termination = "\n"
inst.read_termination = "\n"
inst.clear()
print("Connected:", inst.query("*IDN?").strip())
inst.clear()
return inst
def extract_number(response):
match = re.search(r"[-+]?\d+(\.\d+)?([Ee][-+]?\d+)?", response)
if not match:
raise ValueError(f"没有从返回值中解析到数字: {response!r}")
return float(match.group(0))
def normalize_unit_text(response):
text = response.strip().replace('"', "").replace("'", "")
upper = text.upper()
# 根据你的 475 实测:
# UNIT? -> 1,同时仪器面板显示 uT
unit_map = {
"1": "uT",
"2": "mT",
"3": "T",
"4": "G",
"5": "kG",
"6": "Oe",
"7": "kOe",
"8": "A/m",
"UT": "uT",
"µT": "uT",
"μT": "uT",
"MT": "mT",
"T": "T",
"G": "G",
"KG": "kG",
"OE": "Oe",
"KOE": "kOe",
"A/M": "A/m",
}
return unit_map.get(upper, text)
def read_field_unit(inst):
try:
response = inst.query(UNIT_QUERY).strip()
unit = normalize_unit_text(response)
print(f"Unit query {UNIT_QUERY} -> {response!r}, using unit: {unit}")
return unit
except Exception as e:
print(f"单位读取失败: {e}")
print(f"使用备用单位: {FALLBACK_FIELD_UNIT}")
return FALLBACK_FIELD_UNIT
def read_field(inst):
response = inst.query(FIELD_QUERY).strip()
field = extract_number(response)
return field, response
def main():
inst = open_lakeshore_475()
field_unit = read_field_unit(inst)
times = []
fields = []
start_time = time.time()
plt.ion()
fig, ax = plt.subplots(figsize=(9, 5))
line, = ax.plot([], [], linewidth=1.8)
ax.set_title("Lake Shore 475 Magnetic Field")
ax.set_xlabel("Time (s)")
ax.set_ylabel(f"Magnetic Field ({field_unit})")
ax.ticklabel_format(axis="y", style="plain", useOffset=False)
ax.grid(True, alpha=0.35)
print("正在实时读取 Lake Shore 475 磁场值")
print(f"当前图表 y 轴单位: {field_unit}")
print("关闭图像窗口或在 VS Code 里停止运行即可结束。")
try:
while plt.fignum_exists(fig.number):
t = time.time() - start_time
try:
field, raw_response = read_field(inst)
except Exception as e:
print(f"读取失败: {e}")
time.sleep(INTERVAL_S)
continue
times.append(t)
fields.append(field)
line.set_data(times, fields)
ax.relim()
ax.autoscale_view()
ax.set_xlim(0, max(10, t))
ax.set_title(
f"Lake Shore 475 Field | "
f"{field:.6g} {field_unit} | "
f"{time.strftime('%H:%M:%S')}"
)
fig.canvas.draw()
fig.canvas.flush_events()
plt.pause(INTERVAL_S)
print(
f"{time.strftime('%H:%M:%S')} "
f"Field: {field:.6g} {field_unit} Raw: {raw_response}"
)
except KeyboardInterrupt:
print("已停止。")
finally:
inst.close()
if __name__ == "__main__":
main()
LS475实时读取
import re
import time
import matplotlib.pyplot as plt
import pyvisa
VISA_ADDR = "GPIB0::12::INSTR"
INTERVAL_S = 0.2
# Lake Shore 475 读磁场常用命令
FIELD_QUERY = "RDGFIELD?"
def open_lakeshore_475():
rm = pyvisa.ResourceManager()
inst = rm.open_resource(VISA_ADDR)
inst.timeout = 5000
inst.write_termination = "\n"
inst.read_termination = "\n"
inst.clear()
idn = inst.query("*IDN?").strip()
print("Connected:", idn)
# 清掉可能残留在缓冲区里的返回值,避免下一次 float() 读到 IDN 字符串
inst.clear()
return inst
def extract_number(response):
match = re.search(r"[-+]?\d+(\.\d+)?([Ee][-+]?\d+)?", response)
if not match:
raise ValueError(f"没有从返回值中解析到数字: {response!r}")
return float(match.group(0))
def read_field(inst):
response = inst.query(FIELD_QUERY).strip()
return extract_number(response), response
def main():
inst = open_lakeshore_475()
times = []
fields = []
start_time = time.time()
plt.ion()
fig, ax = plt.subplots(figsize=(9, 5))
line, = ax.plot([], [], linewidth=1.8)
ax.set_title("Lake Shore 475 Magnetic Field")
ax.set_xlabel("Time (s)")
ax.set_ylabel("Magnetic Field")
ax.ticklabel_format(axis="y", style="plain", useOffset=False)
ax.grid(True, alpha=0.35)
print("正在实时读取 Lake Shore 475 磁场值")
print("关闭图像窗口或在 VS Code 里停止运行即可结束。")
try:
while plt.fignum_exists(fig.number):
t = time.time() - start_time
try:
field, raw_response = read_field(inst)
except Exception as e:
print(f"读取失败: {e}")
time.sleep(INTERVAL_S)
continue
times.append(t)
fields.append(field)
line.set_data(times, fields)
ax.relim()
ax.autoscale_view()
ax.set_xlim(0, max(10, t))
ax.set_title(
f"Lake Shore 475 Field | "
f"{field:.6g} | "
f"{time.strftime('%H:%M:%S')}"
)
fig.canvas.draw()
fig.canvas.flush_events()
plt.pause(INTERVAL_S)
print(
f"{time.strftime('%H:%M:%S')} "
f"Field: {field:.6g} Raw: {raw_response}"
)
except KeyboardInterrupt:
print("已停止。")
finally:
inst.close()
if __name__ == "__main__":
main()
利用CodeX生成简单输出代码
6221实时输出
SMB100A实时输出
import time
import tkinter as tk
from tkinter import ttk, messagebox
import pyvisa
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
VISA_ADDR = "GPIB0::28::INSTR"
INTERVAL_MS = 200
DEFAULT_FREQUENCY_GHZ = 1.0
DEFAULT_POWER_DBM = -20.0
class SMB100AApp:
def __init__(self, root):
self.root = root
self.root.title("R&S SMB100A Control")
self.sg = None
self.running = True
self.frequency_hz = DEFAULT_FREQUENCY_GHZ * 1e9
self.power_dbm = DEFAULT_POWER_DBM
self.output_on = False
self.start_time = time.time()
self.times = []
self.powers = []
self.last_button_state = None
self.open_instrument()
self.build_ui()
self.apply_settings()
self.update_loop()
def open_instrument(self):
rm = pyvisa.ResourceManager()
self.sg = rm.open_resource(VISA_ADDR)
self.sg.timeout = 5000
self.sg.write_termination = "\n"
self.sg.read_termination = "\n"
self.sg.clear()
idn = self.sg.query("*IDN?").strip()
self.sg.clear()
print("Connected:", idn)
def query_output_state(self):
response = self.sg.query("OUTP?").strip().upper()
return response in ["1", "ON"]
def query_frequency(self):
return float(self.sg.query("FREQ?").strip())
def query_power(self):
return float(self.sg.query("POW?").strip())
def set_frequency(self, frequency_hz):
if frequency_hz <= 0:
raise ValueError("Frequency must be > 0")
self.sg.write(f"FREQ {frequency_hz}")
self.frequency_hz = self.query_frequency()
def set_power(self, power_dbm):
self.sg.write(f"POW {power_dbm}DBM")
self.power_dbm = self.query_power()
def set_output(self, on):
self.sg.write("OUTP ON" if on else "OUTP OFF")
self.output_on = self.query_output_state()
def build_ui(self):
main = ttk.Frame(self.root, padding=10)
main.pack(fill=tk.BOTH, expand=True)
self.fig = Figure(figsize=(9, 5), dpi=100)
self.ax = self.fig.add_subplot(111)
self.line, = self.ax.plot([], [], linewidth=1.8)
self.ax.set_title("R&S SMB100A Output Monitor")
self.ax.set_xlabel("Time (s)")
self.ax.set_ylabel("Output Power (dBm, OFF = 0)")
self.ax.ticklabel_format(axis="y", style="plain", useOffset=False)
self.ax.grid(True, alpha=0.35)
self.canvas = FigureCanvasTkAgg(self.fig, master=main)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
controls = ttk.Frame(main)
controls.pack(fill=tk.X, pady=(10, 0))
ttk.Label(controls, text="Frequency (GHz):").grid(row=0, column=0, padx=5)
self.freq_var = tk.StringVar(value=f"{DEFAULT_FREQUENCY_GHZ:g}")
self.freq_entry = ttk.Entry(controls, textvariable=self.freq_var, width=12)
self.freq_entry.grid(row=0, column=1, padx=5)
ttk.Label(controls, text="Power (dBm):").grid(row=0, column=2, padx=5)
self.power_var = tk.StringVar(value=f"{DEFAULT_POWER_DBM:g}")
self.power_entry = ttk.Entry(controls, textvariable=self.power_var, width=12)
self.power_entry.grid(row=0, column=3, padx=5)
self.apply_button = ttk.Button(
controls,
text="Apply",
command=self.apply_settings,
)
self.apply_button.grid(row=0, column=4, padx=8)
self.output_button = tk.Button(
controls,
text="Output OFF",
width=14,
command=self.toggle_output,
bg="lightcoral",
activebackground="lightcoral",
)
self.output_button.grid(row=0, column=5, padx=8)
self.status_var = tk.StringVar(value="")
self.status_label = ttk.Label(main, textvariable=self.status_var)
self.status_label.pack(anchor="w", pady=(8, 0))
self.freq_entry.bind("<Return>", lambda event: self.apply_settings())
self.power_entry.bind("<Return>", lambda event: self.apply_settings())
self.root.protocol("WM_DELETE_WINDOW", self.close)
def apply_settings(self):
try:
frequency_ghz = float(self.freq_var.get())
power_dbm = float(self.power_var.get())
self.set_frequency(frequency_ghz * 1e9)
self.set_power(power_dbm)
self.freq_var.set(f"{self.frequency_hz / 1e9:.9g}")
self.power_var.set(f"{self.power_dbm:.9g}")
print(
f"Set Frequency = {self.frequency_hz / 1e9:.9g} GHz, "
f"Power = {self.power_dbm:.9g} dBm"
)
except Exception as e:
messagebox.showerror("Input Error", str(e))
def toggle_output(self):
try:
self.set_output(not self.output_on)
self.update_output_button(force=True)
print(f"Output {'ON' if self.output_on else 'OFF'}")
except Exception as e:
messagebox.showerror("Output Error", str(e))
def update_output_button(self, force=False):
state = "ON" if self.output_on else "OFF"
if not force and state == self.last_button_state:
return
self.last_button_state = state
if self.output_on:
self.output_button.config(
text="Output ON",
bg="lightgreen",
activebackground="lightgreen",
)
else:
self.output_button.config(
text="Output OFF",
bg="lightcoral",
activebackground="lightcoral",
)
def update_loop(self):
if not self.running:
return
try:
self.output_on = self.query_output_state()
self.frequency_hz = self.query_frequency()
self.power_dbm = self.query_power()
t = time.time() - self.start_time
y_value = self.power_dbm if self.output_on else 0.0
self.times.append(t)
self.powers.append(y_value)
self.line.set_data(self.times, self.powers)
self.ax.relim()
self.ax.autoscale_view()
self.ax.set_xlim(0, max(10, t))
state = "ON" if self.output_on else "OFF"
self.ax.set_title(
f"R&S SMB100A | {state} | "
f"{self.frequency_hz / 1e9:.9g} GHz | "
f"{self.power_dbm:.9g} dBm | "
f"Plot={y_value:.9g}"
)
self.status_var.set(
f"Output: {state} "
f"Frequency: {self.frequency_hz / 1e9:.9g} GHz "
f"Power: {self.power_dbm:.9g} dBm"
)
self.update_output_button()
self.canvas.draw_idle()
except Exception as e:
self.status_var.set(f"Query failed: {e}")
print("Query failed:", e)
self.root.after(INTERVAL_MS, self.update_loop)
def close(self):
self.running = False
try:
if self.sg is not None:
self.sg.close()
finally:
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
app = SMB100AApp(root)
root.mainloop()
注意事项
注意在生成代码后让CodeX搜索相关官方网站,确认代码对应读出的是仪器真实输出和测得的值
更多推荐




所有评论(0)