diff --git a/python/nidaq.py b/python/nidaq.py new file mode 100644 index 0000000..5716717 --- /dev/null +++ b/python/nidaq.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Thu Oct 19 17:11:56 2023 +https://github.com/ni/nidaqmx-python/issues/162#issuecomment-1083659663 +@author: sekisushai +""" + +import numpy as np +import nidaqmx as ni +from nidaqmx.constants import TaskMode +from scipy import signal +from nidaqmx.constants import WAIT_INFINITELY + +def query_devices(): + """Queries all the device information connected to the local system.""" + local = ni.system.System.local() + for device in local.devices: + print(f"Device Name: {device.name}, Product Type: {device.product_type}") + print("Input channels:", [chan.name for chan in device.ai_physical_chans]) + print("Output channels:", [chan.name for chan in device.ao_physical_chans]) +#%% +def sine_sweep( + fs=44100, # Sampling frequency + dur = 5, # Sweep duration + silence_s = 1, # Silence duration before sweep + silence_e = 3, # Silence duration after sweep + f1 = 100, # Starting frequency + f2 = 20000, # Ending frequency + amp = 0.8 # Sweep amplitude + ): + N = dur*fs - int(fs*silence_s) - int(fs*silence_e) # Number of samples + + w1 = 2*np.pi*f1/fs # start of sweep in rad/sample + w2 = 2*np.pi*f2/fs # end of sweep in rad/sample + + sinsweep = np.zeros(N) + t = np.arange(N)/(N-1) + lw = np.log(w2/w1) + + sinsweep = amp * np.sin(w1*(N-1)/lw * (np.exp(t*lw)-1)) + raw_sinsweep = sinsweep.copy() + + # Find the last zero crossing to avoid the need for fadeout + flip_sweep = np.flipud(sinsweep) + + err = 1 + ii = 0 + + while err > 0.001: + err = np.abs(flip_sweep[ii]) + ii += 1 + + flip_sweep[0:ii] = 0 + sinsweep = np.flipud(flip_sweep) + + # the convolutional inverse + envelope = (w2/w1)**(-t) # Holters2009, Eq.(9) + invfilter = np.flipud(raw_sinsweep)*envelope + scaling = np.pi*N*(w1/w2-1)/(2*(w2-w1)*np.log(w1/w2))*(w2-w1)/np.pi; # Holters2009, Eq.10 + invfilter = invfilter/amp**2/scaling + + # fade-in window (actually set to 0). Fade out removed because causes ringing - cropping at zero cross instead + taperStart = signal.tukey(N,0) + taperWindow = np.ones(N) + taperWindow[0:int(N/2)] = taperStart[0:int(N/2)] + sinsweep = sinsweep*taperWindow + + # Complete signal with silence at the begining and at the end + N_tot = N + int(fs*silence_s) + int(fs*silence_e) + zeroStart = np.zeros(fs*silence_s) + zeroEnd = np.zeros(fs*silence_e) + sinsweep = np.concatenate((zeroStart, sinsweep, zeroEnd), axis=None) + return sinsweep + +def playrec( + outdata, sr, input_mapping=['cDAQ1Mod2/ai1'], + output_mapping=['cDAQ1Mod1/ao0'] +): + nsamples = outdata.shape[0] + with ni.Task() as read_task, ni.Task() as write_task: + for o in output_mapping: + aochan = write_task.ao_channels.add_ao_voltage_chan(o) + aochan.ao_max = 3 + aochan.ao_min = -3 + read_task.ai_channels.add_ai_voltage_chan('cDAQ1Mod2/ai0') + for i in input_mapping: + #aichan = read_task.ai_channels.add_ai_microphone_chan(i) + aichan = read_task.ai_channels.add_teds_ai_microphone_chan(i) + #aichan.ai_min = -10 + #aichan.ai_max = 10 + + for task in (read_task, write_task): + task.timing.cfg_samp_clk_timing(rate=sr, + samps_per_chan=nsamples) + read_task.control(TaskMode.TASK_RESERVE) + write_task.triggers.start_trigger.cfg_dig_edge_start_trig(read_task.triggers.start_trigger.term) + write_task.write(outdata, auto_start=False) + write_task.start() + indata = read_task.read(nsamples, timeout=WAIT_INFINITELY) + + return indata + +if __name__ == "__main__": + query_devices() + + sr = 44100 + duration = 8 + t = np.linspace(0, duration, sr*duration, endpoint=False) + sig = sine_sweep(sr,duration) + + indata = playrec(sig, sr) \ No newline at end of file