This commit is contained in:
Pierre Lecomte 2024-05-17 17:42:38 +02:00
parent 1d9bb73ca7
commit 04f51d2afb

112
python/nidaq.py Normal file
View File

@ -0,0 +1,112 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Oct 19 17:11:56 2023
https://github.com/ni/nidaqmx-python/issues/162#issuecomment-1083659663
@author: sekisushai
"""
import numpy as np
import nidaqmx as ni
from nidaqmx.constants import TaskMode
from scipy import signal
from nidaqmx.constants import WAIT_INFINITELY
def query_devices():
"""Queries all the device information connected to the local system."""
local = ni.system.System.local()
for device in local.devices:
print(f"Device Name: {device.name}, Product Type: {device.product_type}")
print("Input channels:", [chan.name for chan in device.ai_physical_chans])
print("Output channels:", [chan.name for chan in device.ao_physical_chans])
#%%
def sine_sweep(
fs=44100, # Sampling frequency
dur = 5, # Sweep duration
silence_s = 1, # Silence duration before sweep
silence_e = 3, # Silence duration after sweep
f1 = 100, # Starting frequency
f2 = 20000, # Ending frequency
amp = 0.8 # Sweep amplitude
):
N = dur*fs - int(fs*silence_s) - int(fs*silence_e) # Number of samples
w1 = 2*np.pi*f1/fs # start of sweep in rad/sample
w2 = 2*np.pi*f2/fs # end of sweep in rad/sample
sinsweep = np.zeros(N)
t = np.arange(N)/(N-1)
lw = np.log(w2/w1)
sinsweep = amp * np.sin(w1*(N-1)/lw * (np.exp(t*lw)-1))
raw_sinsweep = sinsweep.copy()
# Find the last zero crossing to avoid the need for fadeout
flip_sweep = np.flipud(sinsweep)
err = 1
ii = 0
while err > 0.001:
err = np.abs(flip_sweep[ii])
ii += 1
flip_sweep[0:ii] = 0
sinsweep = np.flipud(flip_sweep)
# the convolutional inverse
envelope = (w2/w1)**(-t) # Holters2009, Eq.(9)
invfilter = np.flipud(raw_sinsweep)*envelope
scaling = np.pi*N*(w1/w2-1)/(2*(w2-w1)*np.log(w1/w2))*(w2-w1)/np.pi; # Holters2009, Eq.10
invfilter = invfilter/amp**2/scaling
# fade-in window (actually set to 0). Fade out removed because causes ringing - cropping at zero cross instead
taperStart = signal.tukey(N,0)
taperWindow = np.ones(N)
taperWindow[0:int(N/2)] = taperStart[0:int(N/2)]
sinsweep = sinsweep*taperWindow
# Complete signal with silence at the begining and at the end
N_tot = N + int(fs*silence_s) + int(fs*silence_e)
zeroStart = np.zeros(fs*silence_s)
zeroEnd = np.zeros(fs*silence_e)
sinsweep = np.concatenate((zeroStart, sinsweep, zeroEnd), axis=None)
return sinsweep
def playrec(
outdata, sr, input_mapping=['cDAQ1Mod2/ai1'],
output_mapping=['cDAQ1Mod1/ao0']
):
nsamples = outdata.shape[0]
with ni.Task() as read_task, ni.Task() as write_task:
for o in output_mapping:
aochan = write_task.ao_channels.add_ao_voltage_chan(o)
aochan.ao_max = 3
aochan.ao_min = -3
read_task.ai_channels.add_ai_voltage_chan('cDAQ1Mod2/ai0')
for i in input_mapping:
#aichan = read_task.ai_channels.add_ai_microphone_chan(i)
aichan = read_task.ai_channels.add_teds_ai_microphone_chan(i)
#aichan.ai_min = -10
#aichan.ai_max = 10
for task in (read_task, write_task):
task.timing.cfg_samp_clk_timing(rate=sr,
samps_per_chan=nsamples)
read_task.control(TaskMode.TASK_RESERVE)
write_task.triggers.start_trigger.cfg_dig_edge_start_trig(read_task.triggers.start_trigger.term)
write_task.write(outdata, auto_start=False)
write_task.start()
indata = read_task.read(nsamples, timeout=WAIT_INFINITELY)
return indata
if __name__ == "__main__":
query_devices()
sr = 44100
duration = 8
t = np.linspace(0, duration, sr*duration, endpoint=False)
sig = sine_sweep(sr,duration)
indata = playrec(sig, sr)