当前位置:   article > 正文

Python的DAQ数据采集_py nidaq

py nidaq

1.DAQ的driver下载 https://knowledge.ni.com/KnowledgeArticleDetails?id=kA00Z0000019Pf1SAE&l=zh-CN

2.github 里的下载,读使用说明read.me

https://github.com/zsl10100808/nidaqmx-python/blob/master/README.rst

按照操作配置,使用python 相应命令实现数据连续采集

3.python程序

  1. #!/usr/bin/env python3
  2. # from pyqtgraph.Qt import QtCore, QtGui
  3. import sys
  4. from pyqtgraph.Qt import QtGui, QtCore
  5. import pyqtgraph as pg
  6. import pprint
  7. import nidaqmx
  8. from nidaqmx.constants import AcquisitionType, TaskMode
  9. from nidaqmx.errors import DaqError
  10. import numpy as np
  11. from scipy.io import savemat
  12. from scipy import signal
  13. from time import sleep
  14. import datetime
  15. pp = pprint.PrettyPrinter(indent=4)# print setup
  16. system = nidaqmx.system.System.local()
  17. try:
  18. dev = system.devices.device_names[1]#according to the device number
  19. print(dev)
  20. except:
  21. print("check Nidaqmx driver or python version3.x or DO YOU connect the DAQ?")
  22. sys.exit()
  23. # plot cure sim
  24. # QtGui.QApplication.setGraphicsSystem('raster')
  25. app = QtGui.QApplication([]) #GUI
  26. win = pg.GraphicsLayoutWidget(show=True, title="Basic plotting examples")
  27. win.resize(1000, 600) #1000*600
  28. win.setWindowTitle('pyqtgraph example: Plotting')
  29. # Enable antialiasing for prettier plots
  30. pg.setConfigOptions(antialias=True)
  31. p1 = win.addPlot(title="mmg updating plot")
  32. # define a filter,filter from scipy signal
  33. b, a = signal.butter(8, [0.01, 0.2], 'bandpass')
  34. filtered_data = {'ch1': [], 'ch2': [], 'ch3': [], 'ch4': []}
  35. """
  36. setup require task and then get data,dictionary
  37. """
  38. with nidaqmx.Task() as task:
  39. task.ai_channels.add_ai_voltage_chan(dev+"/ai0:3")
  40. task.timing.cfg_samp_clk_timing(
  41. rate=200, sample_mode=AcquisitionType.CONTINUOUS)
  42. # freq=1k,continuously get data
  43. # Python 2.X does not have nonlocal keyword.
  44. non_local_var = {'ch1': [], 'ch2': [], 'ch3': [], 'ch4': []}
  45. w_start = 0
  46. window = 30
  47. def callback(task_handle, every_n_samples_event_type,
  48. number_of_samples, callback_data):
  49. # print('Every N Samples callback invoked.')
  50. samples = np.array(task.read(number_of_samples_per_channel=200))
  51. #tranfer to array
  52. # print(np.shape(samples))
  53. non_local_var['ch1'].extend(samples[0, :])
  54. non_local_var['ch2'].extend(samples[1, :])
  55. non_local_var['ch3'].extend(samples[2, :])
  56. non_local_var['ch4'].extend(samples[3, :])
  57. data_ = signal.filtfilt(b, a, samples)
  58. filtered_data['ch1'].extend(data_[0, :])
  59. filtered_data['ch2'].extend(data_[1, :])
  60. filtered_data['ch3'].extend(data_[2, :])
  61. filtered_data['ch4'].extend(data_[3, :])
  62. # print(data_filtered)
  63. return 0
  64. task.register_every_n_samples_acquired_into_buffer_event(
  65. 200, callback)
  66. task.start()
  67. curve1 = p1.plot(non_local_var['ch1'], pen=(255, 0, 0), name="channel1")
  68. curve2 = p1.plot(non_local_var['ch2'], pen=(0, 255, 0), name="channel2")
  69. curve3 = p1.plot(non_local_var['ch3'], pen=(0, 0, 255), name="channel3")
  70. curve4 = p1.plot(non_local_var['ch4'], pen=(127, 127, 0), name="channel4")
  71. #ch1 use channel1 ,pen color
  72. p1.setXRange(0, 500) # data range
  73. start_time = pg.ptime.time()
  74. win.nextRow()
  75. p2 = win.addPlot()
  76. curve1_filtered = p2.plot(
  77. filtered_data['ch1'], pen=(255, 0, 0), name="channel1")
  78. curve2_filtered = p2.plot(
  79. filtered_data['ch2'], pen=(0, 255, 0), name="channel2")
  80. curve3_filtered = p2.plot(
  81. filtered_data['ch3'], pen=(0, 0, 255), name="channel3")
  82. curve4_filtered = p2.plot(
  83. filtered_data['ch4'], pen=(127, 127, 0), name="channel4")
  84. p2.setXRange(0, 500)
  85. def update():
  86. global curve1, curve2, curve3, curve4, window, p1, w_start, p2, curve1_filtered, curve2_filtered, curve3_filtered, curve4_filtered
  87. curve1.setData(non_local_var['ch1'][w_start:-1])
  88. curve2.setData(non_local_var['ch2'][w_start:-1])
  89. curve3.setData(non_local_var['ch3'][w_start:-1])
  90. curve4.setData(non_local_var['ch4'][w_start:-1])
  91. curve1_filtered.setData(filtered_data['ch1'][w_start:-1])
  92. curve2_filtered.setData(filtered_data['ch2'][w_start:-1])
  93. curve3_filtered.setData(filtered_data['ch3'][w_start:-1])
  94. curve4_filtered.setData(filtered_data['ch4'][w_start:-1])
  95. # acummlite the curve
  96. # curve1.setData(non_local_var['ch1'])
  97. # curve2.setData(non_local_var['ch2'])
  98. # curve3.setData(non_local_var['ch3'])
  99. # curve4.setData(non_local_var['ch4'])
  100. w_start = w_start+window
  101. # p1.enableAutoRange('xy',False)## stop auto scaling
  102. # print(pg.ptime.time()-start_time)
  103. timer = QtCore.QTimer()
  104. timer.timeout.connect(update)# connect
  105. sleep(0.5) # delay x s
  106. print(pg.ptime.time()-start_time)
  107. timer.start(1000) # every x ms go to func update
  108. input('Running task. Press Enter to stop and see number of '
  109. 'accumulated samples.\n')#wait the input to stop
  110. task.close()
  111. print(len(non_local_var['ch1']))
  112. today = datetime.date.today()
  113. date = str(today)
  114. try:
  115. dir_name = input('input a dir where you want to save the file:\n')
  116. import os
  117. os.chdir(dir_name)
  118. except FileNotFoundError as e:
  119. print(e)
  120. filename = input('input a saving name:\n')
  121. if input("save original data? y/n\n") == 'y':
  122. savemat('{0}_{1}_filtered.mat'.format(date, filename), non_local_var)
  123. savemat('{0}_{1}.mat'.format(date, filename), filtered_data)

4.完成数据采集

通过输入按键的命令实现数据的task.close 

5.数据的实时显示

上面显示原始数据

下面显示滤波后的数据

注意:

1.下载好必须的python库

2.设置好update频率(太快无法显示)

3.通过定时器实现了update

4.可以通过thread线程优化

该结果用于window,同时适用于树莓派,ubuntu程序

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/979181
推荐阅读
相关标签
  

闽ICP备14008679号