1.DAQ的driver下载 https://knowledge.ni.com/KnowledgeArticleDetails?id=kA00Z0000019Pf1SAE&l=zh-CN
2.github 里的下载,读使用说明read.me
按照操作配置,使用python 相应命令实现数据连续采集
- #!/usr/bin/env python3
- # from pyqtgraph.Qt import QtCore, QtGui
- import sys
- from pyqtgraph.Qt import QtGui, QtCore
- import pyqtgraph as pg
- import pprint
- import nidaqmx
- from nidaqmx.constants import AcquisitionType, TaskMode
- from nidaqmx.errors import DaqError
- import numpy as np
- from scipy.io import savemat
- from scipy import signal
- from time import sleep
- import datetime
- pp = pprint.PrettyPrinter(indent=4)# print setup
- system = nidaqmx.system.System.local()
- try:
- dev = system.devices.device_names[1]#according to the device number
- print(dev)
- except:
- print("check Nidaqmx driver or python version3.x or DO YOU connect the DAQ?")
- sys.exit()
- # plot cure sim
- # QtGui.QApplication.setGraphicsSystem('raster')
- app = QtGui.QApplication([]) #GUI
- win = pg.GraphicsLayoutWidget(show=True, title="Basic plotting examples")
- win.resize(1000, 600) #1000*600
- win.setWindowTitle('pyqtgraph example: Plotting')
- # Enable antialiasing for prettier plots
- pg.setConfigOptions(antialias=True)
- p1 = win.addPlot(title="mmg updating plot")
- # define a filter,filter from scipy signal
- b, a = signal.butter(8, [0.01, 0.2], 'bandpass')
- filtered_data = {'ch1': [], 'ch2': [], 'ch3': [], 'ch4': []}
- """
- setup require task and then get data,dictionary
- """
- with nidaqmx.Task() as task:
- task.ai_channels.add_ai_voltage_chan(dev+"/ai0:3")
- task.timing.cfg_samp_clk_timing(
- rate=200, sample_mode=AcquisitionType.CONTINUOUS)
- # freq=1k,continuously get data
- # Python 2.X does not have nonlocal keyword.
- non_local_var = {'ch1': [], 'ch2': [], 'ch3': [], 'ch4': []}
- w_start = 0
- window = 30
- def callback(task_handle, every_n_samples_event_type,
- number_of_samples, callback_data):
- # print('Every N Samples callback invoked.')
- samples = np.array(task.read(number_of_samples_per_channel=200))
- #tranfer to array
- # print(np.shape(samples))
- non_local_var['ch1'].extend(samples[0, :])
- non_local_var['ch2'].extend(samples[1, :])
- non_local_var['ch3'].extend(samples[2, :])
- non_local_var['ch4'].extend(samples[3, :])
- data_ = signal.filtfilt(b, a, samples)
- filtered_data['ch1'].extend(data_[0, :])
- filtered_data['ch2'].extend(data_[1, :])
- filtered_data['ch3'].extend(data_[2, :])
- filtered_data['ch4'].extend(data_[3, :])
- # print(data_filtered)
- return 0
- task.register_every_n_samples_acquired_into_buffer_event(
- 200, callback)
- task.start()
- curve1 = p1.plot(non_local_var['ch1'], pen=(255, 0, 0), name="channel1")
- curve2 = p1.plot(non_local_var['ch2'], pen=(0, 255, 0), name="channel2")
- curve3 = p1.plot(non_local_var['ch3'], pen=(0, 0, 255), name="channel3")
- curve4 = p1.plot(non_local_var['ch4'], pen=(127, 127, 0), name="channel4")
- #ch1 use channel1 ,pen color
- p1.setXRange(0, 500) # data range
- start_time = pg.ptime.time()
- win.nextRow()
- p2 = win.addPlot()
- curve1_filtered = p2.plot(
- filtered_data['ch1'], pen=(255, 0, 0), name="channel1")
- curve2_filtered = p2.plot(
- filtered_data['ch2'], pen=(0, 255, 0), name="channel2")
- curve3_filtered = p2.plot(
- filtered_data['ch3'], pen=(0, 0, 255), name="channel3")
- curve4_filtered = p2.plot(
- filtered_data['ch4'], pen=(127, 127, 0), name="channel4")
- p2.setXRange(0, 500)
- def update():
- global curve1, curve2, curve3, curve4, window, p1, w_start, p2, curve1_filtered, curve2_filtered, curve3_filtered, curve4_filtered
- curve1.setData(non_local_var['ch1'][w_start:-1])
- curve2.setData(non_local_var['ch2'][w_start:-1])
- curve3.setData(non_local_var['ch3'][w_start:-1])
- curve4.setData(non_local_var['ch4'][w_start:-1])
- curve1_filtered.setData(filtered_data['ch1'][w_start:-1])
- curve2_filtered.setData(filtered_data['ch2'][w_start:-1])
- curve3_filtered.setData(filtered_data['ch3'][w_start:-1])
- curve4_filtered.setData(filtered_data['ch4'][w_start:-1])
- # acummlite the curve
- # curve1.setData(non_local_var['ch1'])
- # curve2.setData(non_local_var['ch2'])
- # curve3.setData(non_local_var['ch3'])
- # curve4.setData(non_local_var['ch4'])
- w_start = w_start+window
- # p1.enableAutoRange('xy',False)## stop auto scaling
- # print(pg.ptime.time()-start_time)
- timer = QtCore.QTimer()
- timer.timeout.connect(update)# connect
- sleep(0.5) # delay x s
- print(pg.ptime.time()-start_time)
- timer.start(1000) # every x ms go to func update
- input('Running task. Press Enter to stop and see number of '
- 'accumulated samples.\n')#wait the input to stop
- task.close()
- print(len(non_local_var['ch1']))
- today = datetime.date.today()
- date = str(today)
- try:
- dir_name = input('input a dir where you want to save the file:\n')
- import os
- os.chdir(dir_name)
- except FileNotFoundError as e:
- print(e)
- filename = input('input a saving name:\n')
- if input("save original data? y/n\n") == 'y':
- savemat('{0}_{1}_filtered.mat'.format(date, filename), non_local_var)
- savemat('{0}_{1}.mat'.format(date, filename), filtered_data)
