Help: LabJackPython.NullHandleException: Couldn't open device. Please check that the device you are trying to open is connected. | LabJack
 

Help: LabJackPython.NullHandleException: Couldn't open device. Please check that the device you are trying to open is connected.

4 posts / 0 new
Last post
questions
questions's picture
Help: LabJackPython.NullHandleException: Couldn't open device. Please check that the device you are trying to open is connected.

I'm trying to connect the an ECG to a heartbeat discrimination task so that it collects data in the background during the trials (without interfering with a connected PPG) and ultimately produces a data excel form parallel to the hardware collection (so we can compare PPG and ECG data collected). In hopes of doing so, I created an ECG interface code to pair with the heartbeat task code. However, when I try to run the code I keep getting this response:

 

Last login: Fri Mar  2 11:25:57 on ttys000

[email protected]-iMac ~ > cd '/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/' && '/usr/bin/pythonw'  '/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/HeartbeatDiscriminationTaskKelli.py'  && echo Exit status: $? && exit 1

Traceback (most recent call last):

  File "/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/HeartbeatDiscriminationTaskKelli.py", line 124, in <module>

    TaskManager()

  File "/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/HeartbeatDiscriminationTaskKelli.py", line 52, in __init__

    self.ecg_interface = ECGInterface(self) #connect to ecg for data read

  File "/Users/researchassistant/Google Drive/RetroInteroPro/code/InteroceptiveTasks/HeartbeatDiscriminationTask/ECGInterface.py", line 74, in __init__

    self.device = u3.U3()

  File "build/bdist.macosx-10.12-intel/egg/u3.py", line 102, in __init__

  File "build/bdist.macosx-10.12-intel/egg/u3.py", line 137, in open

  File "build/bdist.macosx-10.12-intel/egg/LabJackPython.py", line 607, in open

  File "build/bdist.macosx-10.12-intel/egg/LabJackPython.py", line 1352, in openLabJack

  File "build/bdist.macosx-10.12-intel/egg/LabJackPython.py", line 1248, in _openLabJackUsingExodriver

LabJackPython.NullHandleException: Couldn't open device. Please check that the device you are trying to open is connected.

[email protected]-iMac HeartbeatDiscriminationTask > 

 

 

I've done a handful of research on how to rewrite the code, but everything I seem to do keeps producing the same response. If you could give me any guidance on how to edit the code that would be great. I included my modified codes below. Thank you so much! Good luck and congrats on all your graduate applications by the way!

 

Best,

 

 

Kelli

 

Modified HeartbeatTaskCode:

 

#! /usr/bin/evn python

 

import sys

import os

 

from random import shuffle

import gobject

 

gobject.threads_init()

 

from TaskGUI import *

from HardwareInterface import *

from AudioOutput import *

from ECGInterface import *

 

class TaskManager:

    def __init__(self):

        #SET CONSTANTS

        self.LEVELS = {0:0, 1:0.5} #delay levels in s (0 = synchronous, 1 = asynchronous)

        self.TRIAL_NUMBER = 25 #number of trials per level

 

        #GUI constants#TODO add and make prettier

        #self.START_WIDTH = 

        #self.START_HEIGHT =

 

        #hardware constants

        #before running, make sure BIOPAC is outputting a negative value from Analog output 0 to the LabJack ground so negative values don't get excluded

        self.LABJACK_PORT = 0 #port of labjack to look for signal from BIOPAC, black wire should be inserted here

        self.DIFF_PORT = 1 #to get negative values we have to compare to a null value, red wire should be inserted here

        self.ECG_PORT = 3 #to get ECG data values during task run, yellow

 

        self.TRIAL_TONES = 20 #number of tones per trial

        self.TERMINATE_TRIAL = 60 #if no tones after this time terminate process and adjust ear monitor

        self.SAMPLING_INTERVAL = 0.01 #interval between sampling voltage to avoid concurrent recording (sec)

 

        #audio output constants

        self.BITRATE = 16000 #not entirely sure what this is

        self.FREQUENCY = 800 #tone pitch, in Hz I think

        self.TONE_DURATION = 0.05 #length tone should be played for

 

        #file names

        self.RESPONSE_FILE_NAME = "heartbeat_responses.csv" #subject responses appended here for all subjects

        self.HARDWARE_FILE_EXTENSION = "_heartrate.csv" #this will be added to subject ID for file containing subject ear 

        self.ECG_FILE_EXTENSION = "_ecgdata.csv" #this will be added to the subject ID for file with subject ECG data

 

#heartrate information during each trial

 

        #INITIALIZE INTERFACES

        self.user_interface = TaskGUI(self) #build main window with data input

        self.hardware_interface = HardwareInterface(self) #connect to hardware for heartbeat data input

        self.audio_control = AudioOutput(self.BITRATE) #controls output sounds

        self.ecg_interface = ECGInterface(self) #connect to ecg for data read

        

        self.user_interface.render() #show GUI

 

    #OUTPUT CONTROL

 

    def open_output_files(self, output_name, subject_id):

        self.subject_id = subject_id

        hardware_output_name = self.subject_id + self.HARDWARE_FILE_EXTENSION

        ecg_output_name = self.subject_id + self.ECG_FILE_EXTENSION

 

        #open output files

        self.response_file = open(output_name, 'a')#open for appending so that previous results won't be deleted

        self.hardware_output = open(hardware_output_name, 'w') #will overwrite any previous contents, so be careful

        self.ecg_output = open(ecg_output_name, 'w') #will overwrite any previous contents as well

 

        #add headers

        if os.stat(output_name).st_size == 0: #checking if file is empty

            self.response_file.write("trial,subject_id,condition:0 =syn;1= asyn,subject_response,confidence\n")

 

        self.hardware_output.write("trial, time, voltage, slope, pulse\n")

 

        self.ecg_output.write("trial, time, voltage, slope\n")

 

    def write_trial_response(self, subject_response, confidence):

        self.response_file.write(str(self.trial_number) + "," + self.subject_id + "," + str(self.level) + "," +  str(subject_response) + "," + str(confidence) + "\n")

 

    def write_hardware_log(self, time, voltage, slope, peak):

        self.hardware_output.write(str(self.trial_number) + "," + str(time) + "," + str(voltage) + "," + str(slope) + "," + str(peak) + "\n")

 

    def write_ecg_log(self, time, voltage, slope):

        self.hardware_output.write(str(self.trial_number) + "," + str(time) + "," + str(voltage) + "," + str(slope) + "\n")

 

   #RUN TRIALS 

 

    def determine_trial_order(self, number_trials):

        order = []

        for level in self.LEVELS:

            order += [level] * number_trials

        shuffle(order)

        return(order)

 

    def run_trials(self, number_trials):

        order = self.determine_trial_order(number_trials)

 

        self.trial_number = 1#keep track of which trial we're on

 

        for trial in order:

            self.level = trial

            print str(self.trial_number) + ": " + str(self.level)

            self.run_trial() #display trial page and play tones

            self.user_interface.render() #will return when answer choice selected

            self.trial_number += 1

 

        self.user_interface.finished_task() #display final page

        self.user_interface.render()

 

        self.user_interface.stop_rendering() #quit all GUI activity

 

    def run_trial(self):

        self.user_interface.display_trial(self.trial_number) #set up display

        Thread(target=self.hardware_interface.run_trial).start() #play audio threaded so that we can display GUI simultaneously

    

    def trial_finished(self):

        time.sleep(1)#to let the last of the tones finish playing (may have delay)

        self.user_interface.get_response() #display accuracy and confidence questions

        self.user_interface.render()   

 

    def play_tone(self):

        time.sleep(self.level)

        self.audio_control.play_tone(self.FREQUENCY, self.TONE_DURATION)

 

TaskManager()

 

ECGInterface Code:

 

#! /usr/bin/evn python

 

#This package controls communication with the hardware

#in our heartbeat discrimination task.

#-----------------------------------------------------

#we are using a LabJack U3-LV data acquisition unit to 

#digitize analog output from our BIOPAC PPGED without 

#purchasing their API

 

import sys

import os

 

import time

from threading import Thread

 

try:

    import u3 #LabJack python package

except ImportError:

    print "Before running, you must install the following:\n \

            EXODRIVER\n \

            Mac installer - https://labjack.com/sites/default/files/2013/05/Exodriver_NativeUSB_Setup.zip* \n \

            Build from source -  https://labjack.com/support/software/installers/exodriver/mac-and-linux/in-depth-build-instructions \n \

            \n \

            LABJACKPYTHON\n \

            GitHub - https://github.com/labjack/LabJackPython*\n \

            command line - pip install labjackpython\n \

            \n \

            *preferred method"

    sys.exit()

 

class TimePoint:

    def __init__(self, time, voltage):

        self.time = time

        self.voltage = voltage

 

    def get_slope(self, previous_point):

        return (self.voltage - previous_point.voltage)/(self.time - previous_point.time)

 

    def print_trial(self):

        return (str(self.time) + "," + str(self.voltage) + ",")

 

class Trial:

    def __init__(self, device, task_manager):

        self.device = device

        self.task_manager = task_manager

 

        self.start_time = time.time()

        start_voltage = self.device.getAIN(self.task_manager.ECG_PORT, self.task_manager.DIFF_PORT)#get differential voltage

 

        self.previous_point = TimePoint(0, start_voltage)#first data point

 

    def if_peak(self): #not very good when time points are very close together

        peak = False

        current_time = time.time() - self.start_time

 

        current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.ECG_PORT))

        slope = current_point.get_slope(self.previous_point)

 

        self.task_manager.write_hardware_log(current_point.time, current_point.voltage, slope)

        self.previous_point = current_point

 

    def if_foot(self):

        current_time = time.time() - self.start_time

        current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.ECG_PORT, self.task_manager.DIFF_PORT))

        slope = current_point.get_slope(self.previous_point)

 

        self.task_manager.write_hardware_log(current_point.time, current_point.voltage, slope, pulse)

        self.previous_point = current_point

 

class ECGInterface:

 

    def __init__(self, task_manager):

        self.task_manager = task_manager #class that controls all elements and I/O

        

        self.device = u3.U3()

        self.device.configU3(FIOAnalog = 15)

    

    def run_trial(self):

        self.trial = Trial(self.device, self.task_manager)

        tones_emitted = 0

        while(tones_emitted < self.task_manager.TRIAL_TONES and time.time() - self.trial.start_time < self.task_manager.TERMINATE_TRIAL):

            time.sleep(self.task_manager.SAMPLING_INTERVAL)

 

            if(self.trial.if_foot()):                

                tones_emitted += 1

                Thread(target=self.task_manager.play_tone).start()

        

        self.task_manager.trial_finished() 

LabJack Support
labjack support's picture
That error would indicate you

That error would indicate you can't get a USB handle to the U3, due to it being not connected to the system or it being inaccessible. Your "self.device = u3.U3()" call looks fine in that it is trying to open the first found U3.

Make sure no other processes/threads have the U3 open already. The U3 can only be opened by one process at a time, and other processes will be unable to find it until the process with the claimed U3 closes or the application closes the handle with a call like:

self.device.close()

So try the following:

1. Close other applications using the U3.

2. Disconnect and reconnect your U3 over USB. Make sure the LED on the U3 is on after reconnecting which means it is enumerated.

3. Try your application and see if it can find the U3. If it can find it, close the application and move on to 4.

4. Add a U3 close call to your code when it is done with the U3. Also, make sure your application isn't staying open with the U3 claimed, and another instance/process of the application doesn't try to claim it as well.

questions
questions's picture
hi! thank you, i was able to

hi! thank you, i was able to get it connected by combining my interfaces. I am now trying to have my ECG collect background data at the same sampling interval as my PPG without interfering with the program. How should i proceed?

 

#! /usr/bin/evn python

#This package controls communication with the hardware
#in our heartbeat discrimination task.
#-----------------------------------------------------
#we are using a LabJack U3-LV data acquisition unit to 
#digitize analog output from our BIOPAC PPGED without 
#purchasing their API

import sys
import os

import time
from threading import Thread

try:
    import u3 #LabJack python package
except ImportError:
    print "Before running, you must install the following:\n \
            EXODRIVER\n \
            Mac installer - https://labjack.com/sites/default/files/2013/05/Exodriver_NativeUSB_Setu... \n \
            Build from source -  https://labjack.com/support/software/installers/exodriver/mac-and-linux/... \n \
            \n \
            LABJACKPYTHON\n \
            GitHub - https://github.com/labjack/LabJackPython*\n \
            command line - pip install labjackpython\n \
            \n \
            *preferred method"
    sys.exit()

from AudioOutput import *

class TimePoint:
    def __init__(self, time, voltage):
        self.time = time
        self.voltage = voltage

    def get_slope(self, previous_point):
        return (self.voltage - previous_point.voltage)/(self.time - previous_point.time)

    def print_trial(self):
        return (str(self.time) + "," + str(self.voltage) + ",")

class Trial:
    def __init__(self, device, task_manager):
        self.device = device
        self.task_manager = task_manager

        self.start_time = time.time()
        start_voltage = self.device.getAIN(self.task_manager.LABJACK_PORT, self.task_manager.DIFF_PORT)#get differential voltage

        self.previous_point = TimePoint(0, start_voltage)#first data point

        self.tone_outputted = True #must be true at first to avoid outputting at first data point

    #is the PPG signal currently at a peak, calculated with single input data
    def if_peak(self): #not very good when time points are very close together
        peak = False
        current_time = time.time() - self.start_time

        current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.LABJACK_PORT))
        slope = current_point.get_slope(self.previous_point)

        if(slope < 1 and self.tone_outputted == False):
            peak = True
            self.tone_outputted = True
        elif(slope > 1 and self.previous_point.voltage < 0.5): #rising again
            self.tone_outputted = False

        self.task_manager.write_hardware_log(current_point.time, current_point.voltage, slope, peak)
        self.previous_point = current_point

        return peak

    def if_foot(self):
        current_time = time.time() - self.start_time
        current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.LABJACK_PORT, self.task_manager.DIFF_PORT))
        slope = current_point.get_slope(self.previous_point)

        pulse = False

        if(current_point.voltage < 0 and slope > 0 and self.tone_outputted == False):
            pulse = True
            self.tone_outputted = True
        elif(current_point.voltage > 0 and self.tone_outputted):
            self.tone_outputted = False

        self.task_manager.write_hardware_log(current_point.time, current_point.voltage, slope, pulse)
        self.previous_point = current_point

        return pulse

class ECG_Trial:
#also collect ECG data during trial
    def __init__(self, device, task_manager):
        self.device = device
        self.task_manager = task_manager

        self.start_time = time.time()
        start_voltage = self.device.getAIN(self.task_manager.ECG_PORT, self.task_manager.DIFF_PORT)#get differential voltage

        self.previous_point = TimePoint(0, start_voltage)#first data point

        current_time = time.time() - self.start_time
        current_point = TimePoint(current_time, self.device.getAIN(self.task_manager.ECG_PORT))
        slope = current_point.get_slope(self.previous_point)

        self.task_manager.write_ecg_log(current_point.time, current_point.voltage, slope)
        self.previous_point = current_point


class HardwareInterface2:
    def __init__(self, task_manager):
        self.task_manager = task_manager #class that controls all elements and I/O
        
        self.device = u3.U3()
        self.device.configU3(FIOAnalog = 15)
        
    def __del__(self):
        self.device.close()
    
    def run_trial(self):
        self.trial = ECG_Trial(self.device, self.task_manager)
    self.task_manager.SAMPLING_INTERVAL = Trial(self.task_manager.SAMPLING_INTERVAL)
    self.trial = Trial(self.device, self.task_manager)
        tones_emitted = 0
        while(tones_emitted < self.task_manager.TRIAL_TONES and time.time() - self.trial.start_time < self.task_manager.TERMINATE_TRIAL):
            time.sleep(self.task_manager.SAMPLING_INTERVAL)

            if(self.trial.if_foot()):                
                tones_emitted += 1
                Thread(target=self.task_manager.play_tone).start()
        
        self.task_manager.trial_finished() #let the task manager know to display the questions
 

LabJack Support
labjack support's picture
It looks like

It looks like HardwareInterface2.run_trial is controlling the ECG (ECG_Trial) and PPG (Trial) sampling. I'm not exactly sure what code you don't want interfering, but if certain code is blocking other code and you don't want it to, you should look into using a separate thread for that code so the other code will also run. Look into putting the ECG and PPG sampling code in a thread different then your main program's thread.

If that doesn't help, please provide the chunk of code where interfering is happening. Describe which part of the code is interfering with the other code, and how you would prefer it to run.