from labjack import ljm import sys import os import traceback from datetime import datetime import time from datetime import timedelta import csv import numpy as np import logging ###################################################################################################################### # OPEN/CLOSE DEVICE, CONFIGURE STREAM, START/STOP STREAM # returns a device with configured stream def configure_stream(list_of_in_channels,list_of_out_channels,scan_frequency,stream_resolution_index,samples2write,waveforms): # Open first found LabJack handle = ljm.openS('T7', 'ANY', 'ANY') # T7 device, Any connection, Any identifier scans_per_read = int(scan_frequency / 2) # translate any fio and eio channels to virtual channel ins = [] for chnl in list_of_in_channels: if chnl.startswith('FIO') or chnl.startswith('EIO'): if 'FIO_EIO_STATE' not in ins: ins.append(convert_name_to_address('FIO_EIO_STATE')) else: ins.append(convert_name_to_address(chnl)) # outs outs = [] for chnl in list_of_out_channels: if chnl.startswith('FIO'): if convert_name_to_address('FIO_STATE') not in outs: outs.append(convert_name_to_address('FIO_STATE')) elif chnl.startswith('DAC'): if convert_name_to_address(chnl) not in outs: outs.append(convert_name_to_address(chnl)) # create stream outs ''' STREAM_OUTS = [ { "target": 2500, # 1000, #DAC0 "index": 0 }, { "target": 1002, #DAC1 "index": 1 } ] ''' stream_outs = create_stream_outs(outs) ################################################ # debug for fio # if fio is in out, set all fios as out # if fio in ins, set all fios to in # 1=output 0=input if convert_name_to_address('FIO_EIO_STATE') in ins: names=['FIO_DIRECTION','EIO_DIRECTION'] vals = [0,0] ljm.eWriteNames(handle, len(names), names, vals) if convert_name_to_address('FIO_STATE') in outs: name='FIO_DIRECTION' val = 255 ljm.eWriteName(handle, name, val) # set fio 3 and fio 0 as out and rest will remain as in # name='FIO_DIRECTION' # val = 9 # ljm.eWriteName(handle, name, val) ################################################### ######################## # configure stream # Ensure triggered stream is disabled. ljm.eWriteName(handle, "STREAM_TRIGGER_INDEX", 0) # Enabling internally-clocked stream. ljm.eWriteName(handle, "STREAM_CLOCK_SOURCE", 0) # All negative channels are single-ended, AIN0 and AIN1 ranges are # +/-10 V, stream settling is 0 (default) and stream resolution index # is 0 (default). # create list of aNames and aValues aNames = ['AIN_ALL_NEGATIVE_CH','STREAM_SETTLING_US'] aValues = [ljm.constants.GND,0] for chnl in list_of_in_channels: if chnl.startswith('A'): # create a name name = chnl + '_RANGE' aNames.append(name) aValues.append(10) # add resolution aNames.append('STREAM_RESOLUTION_INDEX') aValues.append(stream_resolution_index) # Write the analog inputs' negative channels (when applicable), ranges, # stream settling time and stream resolution configuration. numFrames = len(aNames) ljm.eWriteNames(handle, numFrames, aNames, aValues) ##################################### ######################### # Set up stream-out from file # read a file # first element of the list is index, second is values all_read_data = read_signal_from_csv(waveforms[0]) my_data_from_file = all_read_data[1] # translate for fio 0 and fio 3 if convert_name_to_address('FIO_STATE') in outs: translated = [] for x in my_data_from_file: if x == 0: translated.append(0) else: translated.append(9) # keep as is for analog out elif len(outs)>0: translated = my_data_from_file try: for stream_out in stream_outs: ljm.initializeAperiodicStreamOut(handle, stream_out["index"], stream_out["target"], scan_frequency) # Write some data to the buffer before the stream starts ljm.writeAperiodicStreamOut(handle, stream_out["index"], samples2write,translated) scan_list = make_scan_list( in_names=list_of_in_channels, stream_outs = stream_outs ) scan_list_string = "ScanList: " + str(scan_list) print(scan_list_string) scans_per_read_string = "ScansPerRead: " + str(scans_per_read) print(scans_per_read_string) logging.info(scan_list_string) logging.info(scans_per_read_string) except ljm.LJMError: stop_stream(handle) raise except Exception: stop_stream(handle) raise return handle,scans_per_read,scan_list,stream_outs,translated # starts the stream # returns scan_rate,start_time def start_stream(handle,scans_per_read,scan_list,initial_scanRate_hz): # start = None try: print("Starting stream...") logging.info("Starting stream...") scan_rate = ljm.eStreamStart(handle, scans_per_read, len(scan_list), scan_list, initial_scanRate_hz) start_time = ljm.getHostTick() actual_scan_rate_string = 'actual scan rate: '+str(scan_rate) print(actual_scan_rate_string) logging.info(actual_scan_rate_string) except ljm.LJMError: stop_stream(handle) raise except Exception: stop_stream(handle) raise return scan_rate,start_time # stops the stream def stop_stream(handle): print("\nStopping Stream") try: ljm.eStreamStop(handle) except ljm.LJMError as exception: if exception.errorString != "STREAM_NOT_RUNNING": raise ljm.close(handle) ##################################################################### # FUNCTIONS TO HELP CONFIGURE STREAM def convert_name_to_address(name): return ljm.nameToAddress(name)[0] def convert_names_to_addresses(names, length_limit=None): """Convert a list of names to a list of addresses using LJM. @para names: Names to be converted to addresses. @type names: iterable over str @para length_limit: Limit the number of names to read from the name array also limit the size of the returned addresses. @type length_limit: int @return: The given names converted to addresses. @rtype: iterable over str """ length = len(names) if length_limit: length = length_limit addresses_and_types = ljm.namesToAddresses(length, names) # ljm.namesToAddresses returns a tuple of a list of addresses and a list of # types. The list of addresses is indexed at 0 of that tuple. return addresses_and_types[0] # convert stream outs to a specific list of dictionaries form def create_stream_outs(outs): """ STREAM_OUTS = [ { "target": str register name that stream-out writeData will be sent to, "index": int STREAM_OUT# offset. 0 would generate names like "STREAM_OUT0_BUFFER_STATUS", etc. }, ... ] """ stream_outs = [] for i in range(len(outs)): # max 4 outs if i < 4: single_dict = { "target": outs[i], "index": i } stream_outs.append(single_dict) return stream_outs def make_scan_list(in_names, stream_outs): """Creates a list of integer addresses from lists of in and out names.""" in_addresses = [] out_addresses = [] if in_names: in_addresses = convert_names_to_addresses(in_names) for stream_out in stream_outs: out_addresses.append(4800+stream_out["index"]) return in_addresses + out_addresses ###################################################################################################################### ############################################################################################ # FILES RELATED TO STREAMING # function to create csv file with digital signal # first column time in ms # second column 0 or 1 signal def create_digital_out_file(file_name,save_path,duration_ms): path2save = os.path.join(save_path,file_name) with open(path2save, 'w', newline='') as csvfile: my_writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) # excel did not recognize comas as separators, I had to add this line on top od csv file # https://kb.paessler.com/en/topic/2293-i-have-trouble-opening-csv-files-with-microsoft-excel-is-there-a-quick-way-to-fix-this my_writer.writerow(["sep=,"]) # change signal from low to high every nth row n = 100 # write to csv first_signal = 0 for i in range(duration_ms): if i%n == 0: first_signal = 0 if first_signal == 1 else 1 my_writer.writerow([i,first_signal]) print('Done') # function to read digital signal from csv file # where first column is ts in ms, and second column is signal # returns a list of lists (numpy array), where first element is a list of tmestamps # second element is a list of signal values def read_signal_from_csv(path): read_list = [] with open(path, newline='') as csvfile: my_reader = csv.reader(csvfile, delimiter=',') for row in my_reader: try: read_list.append([float(row[0]),float(row[1])]) except: pass transposed_arr = np.transpose(np.asarray(read_list)) # print(transposed_arr) return transposed_arr def save_data_testt7(path,file_name,all_data): with open(os.path.join(path,file_name), 'w', newline='') as csvfile: dump_writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) # excel did not recognize comas as separators, I had to add this line on top od csv file # https://kb.paessler.com/en/topic/2293-i-have-trouble-opening-csv-files-with-microsoft-excel-is-there-a-quick-way-to-fix-this dump_writer.writerow(["sep=,"]) for i in range(len(all_data[0])): my_row = [i+1] for j in range(len(all_data)): my_row.append(all_data[j][i]) dump_writer.writerow(my_row) def create_sine_out(file_name,save_path,duration): freq = 20 amp = 0.5 time = np.linspace(0, 2, duration) signal = amp*np.sin(2*np.pi*freq*time) path2save = os.path.join(save_path,file_name) with open(path2save, 'w', newline='') as csvfile: my_writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) # excel did not recognize comas as separators, I had to add this line on top od csv file # https://kb.paessler.com/en/topic/2293-i-have-trouble-opening-csv-files-with-microsoft-excel-is-there-a-quick-way-to-fix-this my_writer.writerow(["sep=,"]) # write to csv for i in range(duration): my_writer.writerow([i,signal[i]]) print('Done')