I'm trying to connect the an ECG to a heartbeat discrimination task so that it collects data in the background during the trials (without interfering with a connected PPG) and ultimately produces a data excel form parallel to the hardware collection (so we can compare PPG and ECG data collected). In hopes of doing so, I created an ECG interface code to pair with the heartbeat task code. However, when I try to run the code I keep getting this response:
Last login: Fri Mar 2 11:25:57 on ttys000
[email protected]-iMac ~ > cd '/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/' && '/usr/bin/pythonw' '/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/HeartbeatDiscriminationTaskKelli.py' && echo Exit status: $? && exit 1
Traceback (most recent call last):
File "/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/HeartbeatDiscriminationTaskKelli.py", line 124, in <module>
TaskManager()
File "/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/HeartbeatDiscriminationTaskKelli.py", line 52, in __init__
self.ecg_interface = ECGInterface(self) #connect to ecg for data read
File "/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/ECGInterface.py", line 74, in __init__
self.device = u3.U3()
File "build/bdist.macosx-10.12-intel/egg/u3.py", line 102, in __init__
File "build/bdist.macosx-10.12-intel/egg/u3.py", line 137, in open
File "build/bdist.macosx-10.12-intel/egg/LabJackPython.py", line 607, in open
File "build/bdist.macosx-10.12-intel/egg/LabJackPython.py", line 1352, in openLabJack
File "build/bdist.macosx-10.12-intel/egg/LabJackPython.py", line 1248, in _openLabJackUsingExodriver
LabJackPython.NullHandleException: Couldn't open device. Please check that the device you are trying to open is connected.
[email protected]-iMac HeartbeatDiscriminationTask >
I've done a handful of research on how to rewrite the code, but everything I seem to do keeps producing the same response. If you could give me any guidance on how to edit the code that would be great. I included my modified codes below. Thank you so much! Good luck and congrats on all your graduate applications by the way!
Best,
Kelli
Modified HeartbeatTaskCode:
#! /usr/bin/evn python
import sys
import os
from random import shuffle
import gobject
gobject.threads_init()
from TaskGUI import *
from HardwareInterface import *
from AudioOutput import *
from ECGInterface import *
class TaskManager:
def __init__(self):
#SET CONSTANTS
self.LEVELS = {0:0, 1:0.5} #delay levels in s (0 = synchronous, 1 = asynchronous)
self.TRIAL_NUMBER = 25 #number of trials per level
#GUI constants#TODO add and make prettier
#self.START_WIDTH =
#self.START_HEIGHT =
#hardware constants
#before running, make sure BIOPAC is outputting a negative value from Analog output 0 to the LabJack ground so negative values don't get excluded
self.LABJACK_PORT = 0 #port of labjack to look for signal from BIOPAC, black wire should be inserted here
self.DIFF_PORT = 1 #to get negative values we have to compare to a null value, red wire should be inserted here
self.ECG_PORT = 3 #to get ECG data values during task run, yellow
self.TRIAL_TONES = 20 #number of tones per trial
self.TERMINATE_TRIAL = 60 #if no tones after this time terminate process and adjust ear monitor
self.SAMPLING_INTERVAL = 0.01 #interval between sampling voltage to avoid concurrent recording (sec)
#audio output constants
self.BITRATE = 16000 #not entirely sure what this is
self.FREQUENCY = 800 #tone pitch, in Hz I think
self.TONE_DURATION = 0.05 #length tone should be played for
#file names
self.RESPONSE_FILE_NAME = "heartbeat_responses.csv" #subject responses appended here for all subjects
self.HARDWARE_FILE_EXTENSION = "_heartrate.csv" #this will be added to subject ID for file containing subject ear
self.ECG_FILE_EXTENSION = "_ecgdata.csv" #this will be added to the subject ID for file with subject ECG data
#heartrate information during each trial
#INITIALIZE INTERFACES
self.user_interface = TaskGUI(self) #build main window with data input
self.hardware_interface = HardwareInterface(self) #connect to hardware for heartbeat data input
self.audio_control = AudioOutput(self.BITRATE) #controls output sounds
self.ecg_interface = ECGInterface(self) #connect to ecg for data read
self.user_interface.render() #show GUI
#OUTPUT CONTROL
def open_output_files(self, output_name, subject_id):
self.subject_id = subject_id
hardware_output_name = self.subject_id + self.HARDWARE_FILE_EXTENSION
ecg_output_name = self.subject_id + self.ECG_FILE_EXTENSION
#open output files
self.response_file = open(output_name, 'a')#open for appending so that previous results won't be deleted
self.hardware_output = open(hardware_output_name, 'w') #will overwrite any previous contents, so be careful
self.ecg_output = open(ecg_output_name, 'w') #will overwrite any previous contents as well
#add headers
if os.stat(output_name).st_size == 0: #checking if file is empty
self.response_file.write("trial,subject_id,condition:0 =syn;1= asyn,subject_response,confidence\n")
self.hardware_output.write("trial, time, voltage, slope, pulse\n")
self.ecg_output.write("trial, time, voltage, slope\n")
def write_trial_response(self, subject_response, confidence):
self.response_file.write(str(self.trial_number) + "," + self.subject_id + "," + str(self.level) + "," + str(subject_response) + "," + str(confidence) + "\n")
def write_hardware_log(self, time, voltage, slope, peak):
self.hardware_output.write(str(self.trial_number) + "," + str(time) + "," + str(voltage) + "," + str(slope) + "," + str(peak) + "\n")
def write_ecg_log(self, time, voltage, slope):
self.hardware_output.write(str(self.trial_number) + "," + str(time) + "," + str(voltage) + "," + str(slope) + "\n")
#RUN TRIALS
def determine_trial_order(self, number_trials):
order = []
for level in self.LEVELS:
order += [level] * number_trials
shuffle(order)
return(order)
def run_trials(self, number_trials):
order = self.determine_trial_order(number_trials)
self.trial_number = 1#keep track of which trial we're on
for trial in order:
self.level = trial
print str(self.trial_number) + ": " + str(self.level)
self.run_trial() #display trial page and play tones
self.user_interface.render() #will return when answer choice selected
self.trial_number += 1
self.user_interface.finished_task() #display final page
self.user_interface.render()
self.user_interface.stop_rendering() #quit all GUI activity
def run_trial(self):
self.user_interface.display_trial(self.trial_number) #set up display
Thread(target=self.hardware_interface.run_trial).start() #play audio threaded so that we can display GUI simultaneously
def trial_finished(self):
time.sleep(1)#to let the last of the tones finish playing (may have delay)
self.user_interface.get_response() #display accuracy and confidence questions
self.user_interface.render()
def play_tone(self):
time.sleep(self.level)
self.audio_control.play_tone(self.FREQUENCY, self.TONE_DURATION)
TaskManager()
ECGInterface Code:
#! /usr/bin/evn python
#This package controls communication with the hardware
#in our heartbeat discrimination task.
#-----------------------------------------------------
#we are using a LabJack U3-LV data acquisition unit to
#digitize analog output from our BIOPAC PPGED without
#purchasing their API
import sys
import os
import time
from threading import Thread
try:
import u3 #LabJack python package
except ImportError:
print "Before running, you must install the following:\n \
EXODRIVER\n \
Mac installer - https://labjack.com/sites/default/files/2013/05/Exodriver_NativeUSB_Setup.zip* \n \
Build from source - https://labjack.com/support/software/installers/exodriver/mac-and-linux/in-depth-build-instructions \n \
\n \
LABJACKPYTHON\n \
GitHub - https://github.com/labjack/LabJackPython*\n \
command line - pip install labjackpython\n \
\n \
*preferred method"
sys.exit()
class TimePoint:
def __init__(self, time, voltage):
self.time = time
self.voltage = voltage
def get_slope(self, previous_point):
return (self.voltage - previous_point.voltage)/(self.time - previous_point.time)
def print_trial(self):
return (str(self.time) + "," + str(self.voltage) + ",")
class Trial:
def __init__(self, device, task_manager):
self.device = device
self.task_manager = task_manager
self.start_time = time.time()
start_voltage = self.device.getAIN(self.task_manager.ECG_PORT, self.task_manager.DIFF_PORT)#get differential voltage
self.previous_point = TimePoint(0, start_voltage)#first data point
def if_peak(self): #not very good when time points are very close together
peak = False
current_time = time.time() - self.start_time
current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.ECG_PORT))
slope = current_point.get_slope(self.previous_point)
self.task_manager.write_hardware_log(current_point.time, current_point.voltage, slope)
self.previous_point = current_point
def if_foot(self):
current_time = time.time() - self.start_time
current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.ECG_PORT, self.task_manager.DIFF_PORT))
slope = current_point.get_slope(self.previous_point)
self.task_manager.write_hardware_log(current_point.time, current_point.voltage, slope, pulse)
self.previous_point = current_point
class ECGInterface:
def __init__(self, task_manager):
self.task_manager = task_manager #class that controls all elements and I/O
self.device = u3.U3()
self.device.configU3(FIOAnalog = 15)
def run_trial(self):
self.trial = Trial(self.device, self.task_manager)
tones_emitted = 0
while(tones_emitted < self.task_manager.TRIAL_TONES and time.time() - self.trial.start_time < self.task_manager.TERMINATE_TRIAL):
time.sleep(self.task_manager.SAMPLING_INTERVAL)
if(self.trial.if_foot()):
tones_emitted += 1
Thread(target=self.task_manager.play_tone).start()
self.task_manager.trial_finished()
That error would indicate you can't get a USB handle to the U3, due to it being not connected to the system or it being inaccessible. Your "self.device = u3.U3()" call looks fine in that it is trying to open the first found U3.
Make sure no other processes/threads have the U3 open already. The U3 can only be opened by one process at a time, and other processes will be unable to find it until the process with the claimed U3 closes or the application closes the handle with a call like:
self.device.close()
So try the following:
1. Close other applications using the U3.
2. Disconnect and reconnect your U3 over USB. Make sure the LED on the U3 is on after reconnecting which means it is enumerated.
3. Try your application and see if it can find the U3. If it can find it, close the application and move on to 4.
4. Add a U3 close call to your code when it is done with the U3. Also, make sure your application isn't staying open with the U3 claimed, and another instance/process of the application doesn't try to claim it as well.
hi! thank you, i was able to get it connected by combining my interfaces. I am now trying to have my ECG collect background data at the same sampling interval as my PPG without interfering with the program. How should i proceed?
#! /usr/bin/evn python
#This package controls communication with the hardware
#in our heartbeat discrimination task.
#-----------------------------------------------------
#we are using a LabJack U3-LV data acquisition unit to
#digitize analog output from our BIOPAC PPGED without
#purchasing their API
import sys
import os
import time
from threading import Thread
try:
import u3 #LabJack python package
except ImportError:
print "Before running, you must install the following:\n \
EXODRIVER\n \
Mac installer - https://labjack.com/sites/default/files/2013/05/Exodriver_NativeUSB_Setu... \n \
Build from source - https://labjack.com/support/software/installers/exodriver/mac-and-linux/... \n \
\n \
LABJACKPYTHON\n \
GitHub - https://github.com/labjack/LabJackPython*\n \
command line - pip install labjackpython\n \
\n \
*preferred method"
sys.exit()
from AudioOutput import *
class TimePoint:
def __init__(self, time, voltage):
self.time = time
self.voltage = voltage
def get_slope(self, previous_point):
return (self.voltage - previous_point.voltage)/(self.time - previous_point.time)
def print_trial(self):
return (str(self.time) + "," + str(self.voltage) + ",")
class Trial:
def __init__(self, device, task_manager):
self.device = device
self.task_manager = task_manager
self.start_time = time.time()
start_voltage = self.device.getAIN(self.task_manager.LABJACK_PORT, self.task_manager.DIFF_PORT)#get differential voltage
self.previous_point = TimePoint(0, start_voltage)#first data point
self.tone_outputted = True #must be true at first to avoid outputting at first data point
#is the PPG signal currently at a peak, calculated with single input data
def if_peak(self): #not very good when time points are very close together
peak = False
current_time = time.time() - self.start_time
current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.LABJACK_PORT))
slope = current_point.get_slope(self.previous_point)
if(slope < 1 and self.tone_outputted == False):
peak = True
self.tone_outputted = True
elif(slope > 1 and self.previous_point.voltage < 0.5): #rising again
self.tone_outputted = False
self.task_manager.write_hardware_log(current_point.time, current_point.voltage, slope, peak)
self.previous_point = current_point
return peak
def if_foot(self):
current_time = time.time() - self.start_time
current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.LABJACK_PORT, self.task_manager.DIFF_PORT))
slope = current_point.get_slope(self.previous_point)
pulse = False
if(current_point.voltage < 0 and slope > 0 and self.tone_outputted == False):
pulse = True
self.tone_outputted = True
elif(current_point.voltage > 0 and self.tone_outputted):
self.tone_outputted = False
self.task_manager.write_hardware_log(current_point.time, current_point.voltage, slope, pulse)
self.previous_point = current_point
return pulse
class ECG_Trial:
#also collect ECG data during trial
def __init__(self, device, task_manager):
self.device = device
self.task_manager = task_manager
self.start_time = time.time()
start_voltage = self.device.getAIN(self.task_manager.ECG_PORT, self.task_manager.DIFF_PORT)#get differential voltage
self.previous_point = TimePoint(0, start_voltage)#first data point
current_time = time.time() - self.start_time
current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.ECG_PORT))
slope = current_point.get_slope(self.previous_point)
self.task_manager.write_ecg_log(current_point.time, current_point.voltage, slope)
self.previous_point = current_point
class HardwareInterface2:
def __init__(self, task_manager):
self.task_manager = task_manager #class that controls all elements and I/O
self.device = u3.U3()
self.device.configU3(FIOAnalog = 15)
def __del__(self):
self.device.close()
def run_trial(self):
self.trial = ECG_Trial(self.device, self.task_manager)
self.task_manager.SAMPLING_INTERVAL = Trial(self.task_manager.SAMPLING_INTERVAL)
self.trial = Trial(self.device, self.task_manager)
tones_emitted = 0
while(tones_emitted < self.task_manager.TRIAL_TONES and time.time() - self.trial.start_time < self.task_manager.TERMINATE_TRIAL):
time.sleep(self.task_manager.SAMPLING_INTERVAL)
if(self.trial.if_foot()):
tones_emitted += 1
Thread(target=self.task_manager.play_tone).start()
self.task_manager.trial_finished() #let the task manager know to display the questions
It looks like HardwareInterface2.run_trial is controlling the ECG (ECG_Trial) and PPG (Trial) sampling. I'm not exactly sure what code you don't want interfering, but if certain code is blocking other code and you don't want it to, you should look into using a separate thread for that code so the other code will also run. Look into putting the ECG and PPG sampling code in a thread different then your main program's thread.
If that doesn't help, please provide the chunk of code where interfering is happening. Describe which part of the code is interfering with the other code, and how you would prefer it to run.