import os import sys import time import importlib import queue import threading from datetime import datetime def log(logMessage): print(f'{datetime.now().strftime("%H:%M:%S")}: {logMessage}') class ReadResult(): def __init__(self, result = None, error = None): self.data = None self.deviceScanBacklog = None self.ljmScanBacklog = None self.error = error if result is not None: self.data = result[0] self.deviceScanBacklog = result[1] self.ljmScanBacklog = result[2] class LabjackAcquisition(): DEFAULT_QUEUE_GET_TIMEOUT = 3.0 DEFAULT_QUEUE_PUT_TIMEOUT = 1.0 def __init__(self): self.ljm = None self.handle = None self.reconnectTimeout = 5 self.dataQueue = queue.Queue() self.workerThread = None self.isShutdown = False self.labjackAddress = '10.6.0.102' self.channelNames = ["AIN1", "AIN3", "AIN5", "AIN7", "AIN9", "AIN11", "AIN0", "AIN2", "AIN4", "AIN6", "AIN8", "AIN10", "EIO0"] self.samplesPerChannel = 4 self.sampleFrequencyInHz = 200 self.maxReconnectTimeout = 300 def Initialize(self): '''Sets up the collection mode context as well as testing if the Labjack libraries are present''' # Do a quick import test here/fail fast. Throws an Import exception if the module isn't installed self.ljm = importlib.import_module('labjack.ljm') self.reconnectTimeout = 3 def Connect(self): if self.isShutdown: return False '''Open the Labjack and set up the connection parameters''' log("Opening labjack at {}...".format(self.labjackAddress)) try: self.CloseConnection() self.handle = self.ljm.openS("T7", "TCP", self.labjackAddress) self.reconnectTimeout = 3 except Exception as e: log("Could not open the labjack device at address {}. Reason: {}".format(self.labjackAddress, e)) return False info = self.ljm.getHandleInfo(self.handle) log("Opened LabJack with device type: %i, connection type: %i, serial number: %i, address: %s:%i" % (info[0], info[1], info[2], self.ljm.numberToIP(info[3]), info[4])) # Ensure triggered stream is disabled. self.ljm.eWriteName(self.handle, "STREAM_TRIGGER_INDEX", 0) # Enabling internally-clocked stream. self.ljm.eWriteName(self.handle, "STREAM_CLOCK_SOURCE", 0) self.ljm.eWriteName(self.handle, "AIN_ALL_RANGE", 0) # All negative channels are single-ended, AIN0 range is +/-10 V, # stream settling is 0 (default) and stream resolution index # is 0 (default). aNames = ["AIN_ALL_NEGATIVE_CH", "STREAM_RESOLUTION_INDEX", "FIO1", "FIO3", "FIO5", "FIO7"] aValues = [self.ljm.constants.GND, 0, 0, 0, 0, 0] # Write the analog inputs' negative channels (when applicable), ranges, # stream settling time and stream resolution configuration. numFrames = len(aNames) self.ljm.eWriteNames(self.handle, numFrames, aNames, aValues) return self.IsConnected() def _WorkLoop(self): '''An infinite loop checking the data queue for both data frames and errors. Data received gets added to a context, which will release complete data frames when they are collected. Once collected, the frame is published.''' numAddresses = len(self.channelNames) aScanList = self.ljm.namesToAddresses(numAddresses, self.channelNames)[0] while not self.isShutdown: # Connect / reconnect if we need to if not self.IsConnected(): try: if self.Connect(): self.ljm.eStreamStart(self.handle, self.samplesPerChannel, numAddresses, aScanList, int(self.sampleFrequencyInHz)) # Set up our callback. It is a static method that just turns around and tells # us that it was called self.ljm.setStreamCallback(self.handle, self.OnData) except: self.CloseConnection() if self.IsConnected(): try: # Block waiting on some data readResult = self.dataQueue.get(block=True, timeout=self.DEFAULT_QUEUE_GET_TIMEOUT) # None means it is time to quit if readResult is None: break # If the callback got an error for some reason, close, log, sleep. The reconnect will # happen at the top of the loop if readResult.error is not None: self.CloseConnection() log('An error occurred when attempting to read from the Labjack :' + str(readResult.error)) else: pass #got data except queue.Empty: log('Timed out waiting for data') self.CloseConnection() else: log("Will attempt to reconnect after {} seconds.".format(self.reconnectTimeout)) time.sleep(self.reconnectTimeout) self.reconnectTimeout = min(self.maxReconnectTimeout, 2 * self.reconnectTimeout) def Run(self): self.workerThread = threading.Thread(target=self._WorkLoop, name='Acquisition') self.workerThread.start() self.workerThread.join() def OnData(self, handle): '''Called from the Labjack thread, we are just here to call read - the other side of the queue can handle everything about it, including errors''' data = None try: readResult = self.ljm.eStreamRead(self.handle) data = ReadResult(readResult) #log('Data points (doubles2) read = {}, library scan backlog = {}'.format(len(data.data), data.ljmScanBacklog)) except self.ljm.LJMError as e: data = ReadResult(error = e) try: self.dataQueue.put(data, block=True, timeout=self.DEFAULT_QUEUE_PUT_TIMEOUT) except queue.Full: log('Timed out trying to put new data in the queue; dropping') def IsConnected(self): return self.handle is not None def CloseConnection(self): '''Properly closes handles and sets the internal state to uninitialized''' if self.handle is not None: log("Closing labjack at {}...".format(self.labjackAddress)) try: self.ljm.eStreamStop(self.handle) except: pass try: self.ljm.close(self.handle) finally: self.handle = None log("Labjack at {} closed".format(self.labjackAddress)) def Shutdown(self): '''Closes the Labjack connection and the platform connector objects''' self.isShutdown = True self.dataQueue.put(None, block=True, timeout=self.DEFAULT_QUEUE_PUT_TIMEOUT) self.CloseConnection() self.workerThread.join() if __name__ == "__main__": acq = None acq = LabjackAcquisition() if acq is not None: try: acq.Initialize() acq.Run() except KeyboardInterrupt: log('Run aborted by user.') except Exception as e: log("{}: {}".format(type(e).__name__, e)) log(traceback.format_exc()) finally: if acq is not None: try: acq.Shutdown() except: pass log('Exited')