国家仪器长期监测系统

2024-09-27 00:20:43 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在通过NI开发的nidaqmx模块构建一些脚本例程。 我使用2个NI PXI 44-98(32通道)采集卡

我们希望开发一个长时间(10小时)的监测实验,采样率为200k Hz

目前,我正在努力克服python的固有限制和Nidaqmx模块的逻辑

到目前为止,我已经用有限数量的传感器编写了一个连续采集程序

import nidaqmx
from nidaqmx.constants import AcquisitionType
import matplotlib.pyplot as plt
import numpy as np

sample_time = 600  # units = seconds
s_freq = 200000
num_samples = sample_time*s_freq
dt = 1/s_freq
print('go acquisition !')
with nidaqmx.Task() as task:
    task.ai_channels.add_ai_accel_chan("PXI1Slot2_3/ai1:3",
                                       sensitivity=10000.0,
                                       max_val=1,
                                       min_val=-1)
    task.timing.cfg_samp_clk_timing(s_freq,
                                   sample_mode = AcquisitionType.CONTINUOUS)
    data = task.read(number_of_samples_per_channel=num_samples, timeout = nidaqmx.constants.WAIT_INFINITELY)
print('I do it right !')

但使用这个非常简单的例程,我无法记录监视>;10分钟。python的内存不足,无法使用它。这对我来说是完全合乎逻辑的

我在NI网站上查看缓冲区逻辑,但我不清楚如何在这里实现它

我不明白我如何才能适应这个小例程,即每X MB任务记录的数据写入磁盘,同时仍然监视并清空“data”目录以避免溢出,我在stackoverflow上没有看到一些正确的答案

如果你已经遇到了这个问题,你有了解决方案,我很感兴趣

谢谢你的阅读


Tags: 模块sampleimporttasktimeas逻辑例程
1条回答
网友
1楼 · 发布于 2024-09-27 00:20:43

我不知道你是否解决了你的问题,或者停止了尝试,不管怎样,这都是一个基本的解决方案。我使用以太网控制的NI 9234卡,并使用csv写入存储数据:

# This works for a NI9234 card
import nidaqmx
from nidaqmx.constants import AcquisitionType
from nidaqmx import stream_readers
import numpy as np
import csv
import time
from datetime import datetime

sample_rate = 2048                              # Usually sample rate goes 2048 Hz, 2560 Hz, 3200 Hz, 5120 Hz, 6400 Hz (at least for a NI9234 card) 
samples_to_acq = 2048                           # At least for vibration matters, Samples to acq go from 2048, 4096, 8192
wait_time = samples_to_acq/sample_rate          # Data Acquisition Time in s, very important for the NI task function, and since we have 2048 on both sides, time is 1 s
cont_mode = AcquisitionType.CONTINUOUS              # There is also FINITE for sporadic measurements
iterations = 10
    
with nidaqmx.Task() as task:
    now = datetime.now()
    military = now.strftime('%H:%M:%S')     # Just recording the time like 20:32:54 instead of 8:32:54 pm
    first_header = ['Some title in here']
    second_header = [f'T. Captura: {military}']

# Create accelerometer channel and configure sample clock. current_excit_val=0.002 enables IEPE, velocity measured in ips and accel in m/s2, 
# both prox and tachometer can be used with a simple voltage channel.
# For more info about channels: https://nidaqmx-python.readthedocs.io/en/latest/ai_channel_collection.html

# Two accelerometer channels
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai0", sensitivity = 100, current_excit_val = 0.002)
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai1", sensitivity = 100, current_excit_val = 0.002)

# Two voltage channels
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai2")
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai3")

total_wait_time = wait_time * iterations                         # We will only take 10 measurements, 10 s for this example
samples_to_acq_new = samples_to_acq * iterations                 # Also multiply by 10 to keep the same ratio, it should be 

# Sets source of sample clock, its rate, and number of samples to aquire = buffer size
task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = cont_mode, samps_per_chan = samples_to_acq_new)               
  
start = time.time()
print ('Starting task...')         # Just to keep a control in your console

# Saving the 4 channels to a csv file. This file is overwritten everytime the program is executed.
# It should appear in the same folder that your program is located. 
with open('data_10times.csv', 'w', newline = '') as f:
    writer = csv.writer(f)
    writer.writerow(first_header)
    writer.writerow(second_header)
    
    # adding some blank spaces in btw
    writer.writerow('')
    writer.writerow('')        
    
    x = np.linspace(0, total_wait_time, samples_to_acq_new)       # Your x axis (ms), starts from 0, final time is total_wait_time, equally divided by the number of samples you'll capture
    
    data = np.ndarray((4, samples_to_acq_new), dtype = np.float64)  #Creates an array, 4 columns each one of 20480 rows
    nidaqmx.stream_readers.AnalogMultiChannelReader(task.in_stream).read_many_sample(data, samples_to_acq_new, timeout = 14) # it should't take that long for this example, check out time for other exercises               
    
    for value in range(len(x)):
        writer.writerow([x[value], data[0][value], data[1][value], data[2][value], data[3][value]])

elapsed_time = (time.time() - start)
print (f'done in {elapsed_time}')

所以。。。5个月后,但我希望这会有所帮助

相关问题 更多 >

    热门问题