#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Thu Oct 19 17:11:56 2023 https://github.com/ni/nidaqmx-python/issues/162#issuecomment-1083659663 @author: sekisushai """ import numpy as np import nidaqmx as ni from nidaqmx.constants import TaskMode from scipy import signal from nidaqmx.constants import WAIT_INFINITELY def query_devices(): """Queries all the device information connected to the local system.""" local = ni.system.System.local() for device in local.devices: print(f"Device Name: {device.name}, Product Type: {device.product_type}") print("Input channels:", [chan.name for chan in device.ai_physical_chans]) print("Output channels:", [chan.name for chan in device.ao_physical_chans]) #%% def sine_sweep( fs=44100, # Sampling frequency dur = 5, # Sweep duration silence_s = 1, # Silence duration before sweep silence_e = 3, # Silence duration after sweep f1 = 100, # Starting frequency f2 = 20000, # Ending frequency amp = 0.8 # Sweep amplitude ): N = dur*fs - int(fs*silence_s) - int(fs*silence_e) # Number of samples w1 = 2*np.pi*f1/fs # start of sweep in rad/sample w2 = 2*np.pi*f2/fs # end of sweep in rad/sample sinsweep = np.zeros(N) t = np.arange(N)/(N-1) lw = np.log(w2/w1) sinsweep = amp * np.sin(w1*(N-1)/lw * (np.exp(t*lw)-1)) raw_sinsweep = sinsweep.copy() # Find the last zero crossing to avoid the need for fadeout flip_sweep = np.flipud(sinsweep) err = 1 ii = 0 while err > 0.001: err = np.abs(flip_sweep[ii]) ii += 1 flip_sweep[0:ii] = 0 sinsweep = np.flipud(flip_sweep) # the convolutional inverse envelope = (w2/w1)**(-t) # Holters2009, Eq.(9) invfilter = np.flipud(raw_sinsweep)*envelope scaling = np.pi*N*(w1/w2-1)/(2*(w2-w1)*np.log(w1/w2))*(w2-w1)/np.pi; # Holters2009, Eq.10 invfilter = invfilter/amp**2/scaling # fade-in window (actually set to 0). Fade out removed because causes ringing - cropping at zero cross instead taperStart = signal.tukey(N,0) taperWindow = np.ones(N) taperWindow[0:int(N/2)] = taperStart[0:int(N/2)] sinsweep = sinsweep*taperWindow # Complete signal with silence at the begining and at the end N_tot = N + int(fs*silence_s) + int(fs*silence_e) zeroStart = np.zeros(fs*silence_s) zeroEnd = np.zeros(fs*silence_e) sinsweep = np.concatenate((zeroStart, sinsweep, zeroEnd), axis=None) return sinsweep def playrec( outdata, sr, input_mapping=['cDAQ1Mod2/ai1'], output_mapping=['cDAQ1Mod1/ao0'] ): nsamples = outdata.shape[0] with ni.Task() as read_task, ni.Task() as write_task: for o in output_mapping: aochan = write_task.ao_channels.add_ao_voltage_chan(o) aochan.ao_max = 3 aochan.ao_min = -3 read_task.ai_channels.add_ai_voltage_chan('cDAQ1Mod2/ai0') for i in input_mapping: #aichan = read_task.ai_channels.add_ai_microphone_chan(i) aichan = read_task.ai_channels.add_teds_ai_microphone_chan(i) #aichan.ai_min = -10 #aichan.ai_max = 10 for task in (read_task, write_task): task.timing.cfg_samp_clk_timing(rate=sr, samps_per_chan=nsamples) read_task.control(TaskMode.TASK_RESERVE) write_task.triggers.start_trigger.cfg_dig_edge_start_trig(read_task.triggers.start_trigger.term) write_task.write(outdata, auto_start=False) write_task.start() indata = read_task.read(nsamples, timeout=WAIT_INFINITELY) return indata if __name__ == "__main__": query_devices() sr = 44100 duration = 8 t = np.linspace(0, duration, sr*duration, endpoint=False) sig = sine_sweep(sr,duration) indata = playrec(sig, sr)