import u3 import sys from datetime import datetime import traceback import numpy as np import pandas as pd class EFT_Devices(): ''' Define and initialize devices that are contained in EFT Labjack - u3_LV ''' def __init__(self,DUTs = "DUT1"): # initialize the u3_instrument parent class. # super(EFT_Devices,self).__init__(**args) # initialize the Power instrument subclass object. self.__initialized = False self.__error = None self.DUTs = DUTs # self.__log = logger.getLogger(level='info') # self.__log.info("Creating PFTP Power Switch Instrument") # Labjack Channels: # Hall sensor channels self.HALL_1_HA = "FIO0" self.HALL_1_HB = "FIO1" self.HALL_1_HC = "FIO3" self.HALL_2_HA = "FIO4" self.HALL_2_HB = "FIO5" self.HALL_2_HC = "FIO6" # Motor Encoder channels self.ENC_1_CHAN_Ap = "counter0" self.ENC_2_CHAN_Ap = "counter1" #Data streaming information self.ScanRate = 1600 # scan frequency of stream mode in Hz self.ScanResolution = 3 # scan resolution # MAX_REQUESTS is the number of packets to be read. self.MAX_REQUESTS = 27 # Maximum number of request to be issued ( Each request contains 48 packet, each packet contains 25 samples) # Check if labjack is connected (Note: Assuming only one u3 is connected try: # Initialize the labjack. self._u3 = u3.U3() # Apply proper calibration readings self._u3.getCalibrationData() # set IO pins to digital if self.DUTs == "DUT1": self._u3.configIO(FIOAnalog=0, EIOAnalog=0, EnableCounter0=True, TimerCounterPinOffset=8) elif self.DUTs == 'DUT2': self._u3.configIO(FIOAnalog=0, EIOAnalog=0, EnableCounter0=True, TimerCounterPinOffset=9) elif self.DUTs == "TestBothDUTs": self._u3.configIO(FIOAnalog=0, EIOAnalog=0, EnableCounter0=True, EnableCounter1=True,TimerCounterPinOffset=8) # configure all IO pins (FIO,EIO,CIO) as inputs self._u3.getFeedback(u3.PortDirWrite(Direction=[0, 0, 0])) self.__initialized = True except Exception as exc: self.__error = str(exc) # self.__log.exception(" Labjack Init Failed (%s)" % str(exc)) """ PUBLIC METHODS # """ def LJU3StreamData(self): self.HallData_EncoderData = [] self.NumberOfDUTs = self.DUTs self.DUT1 = "DUT1" self.DUT2 = "DUT2" self.TestBothDUTs = "TestBothDUTs" info = self._u3.configU3() print("Opened the following Labjack \n\nDevice type: {}\nSerial number: {}\nLJ Firmware Version: {}\nLocal ID: {}\n".format( info["DeviceName"], info["SerialNumber"], info["FirmwareVersion"],info["LocalID"])) self.DUTCounter0 = False self.DUTCounter1 = False self.DUTCounter0_1 = False self.IOToScan = [] if self.NumberOfDUTs == self.DUT1: self.IOToScan = [ self.HALL_1_HA, self.HALL_1_HB, self.HALL_1_HC, self.ENC_1_CHAN_Ap, ] self.DataframeHeader = ["HALL_1_HA", "HALL_1_HB", "HALL_1_HC", "ENC_1_CHAN_Ap", ] self.ALLDIO = [193,194, 210, 224] self.NChannels = [31] * len(self.ALLDIO) self.DUTCounter0 = True elif self.NumberOfDUTs == self.DUT2: self.IOToScan = [ self.HALL_2_HA, self.HALL_2_HB, self.HALL_2_HC, self.ENC_2_CHAN_Ap, ] self.DataframeHeader = ["HALL_2_HA", "HALL_2_HB", "HALL_2_HC", "ENC_2_CHAN_Ap", ] self.ALLDIO = [193,194, 211, 224] self.NChannels = [31] * len(self.ALLDIO) self.DUTCounter1 = True elif self.NumberOfDUTs == self.TestBothDUTs: self.IOToScan = [ self.HALL_1_HA, self.HALL_1_HB, self.HALL_1_HC, self.ENC_1_CHAN_Ap, self.HALL_2_HA, self.HALL_2_HB, self.HALL_2_HC, self.ENC_2_CHAN_Ap ] self.DataframeHeader = ["HALL_1_HA", "HALL_1_HB", "HALL_1_HC", "ENC_1_CHAN_Ap", "HALL_2_HA", "HALL_2_HB", "HALL_2_HC", "ENC_2_CHAN_Ap" ] self.ALLDIO = [193,194, 210, 224, 211, 224] self.NChannels = [31] * len(self.ALLDIO) self.DUTCounter0_1 = True else: print("No DUT detected. Please install DUT.") # self.ALLDIO = [193,194] # self.NChannels = [31] * len(self.ALLDIO) print ("Configuring U3 Stream") self._u3.streamConfig(NumChannels=len(self.ALLDIO), PChannels=self.ALLDIO, NChannels=self.NChannels, Resolution=self.ScanResolution, ScanFrequency=self.ScanRate ) if self._u3 is None: print(""" Labjack has not been configured. Please configure labjack for streaming.\n Exiting...""") sys.exit(0) try: # When streaming, negative channels and ranges can be configured for # individual analog inputs, but the stream has only one settling time and # resolution. missed = 0 dataCount = 0 packetCount = 0 print ("Data scan rate is %i" % self.ScanRate) print ("Performing %i stream reading" % self.MAX_REQUESTS) print ("\nstarting data stream\n") self._u3.streamStart() TimeStart = datetime.now() # print (TimeStart) for r in self._u3.streamData(): if r is not None: # Our stop condition if dataCount >= self.MAX_REQUESTS: break if r['errors'] != 0: print ("Error: %s ; " % r['errors'], datetime.now()) if r['numPackets'] != self._u3.packetsPerRequest: print ("----- UNDERFLOW : %s : " % r['numPackets'], datetime.now()) if r['missed'] != 0: missed += r['missed'] print ("+++ Missed ", r['missed']) # retrieve samples/readings for each channel for k in range(len(r["AIN193"])): fio = [(r["AIN193"][k][0] >> j) & 1 for j in range(8)] eio = [(r["AIN193"][k][1] >> j) & 1 for j in range(8)] cio = [(r["AIN194"][k][0] >> j) & 1 for j in range(8)] if self.DUTCounter0: counter0 = (r["AIN210"][k] + r["AIN224"][k]) << 16 # counter0 = (r["AIN210"][k] << 16) if self.DUTCounter1: counter1 = r["AIN211"][k] + (r["AIN224"][k] << 16) if self.DUTCounter0_1: counter0 = r["AIN210"][k] + (r["AIN224"][k * 2] << 16) counter1 = r["AIN211"][k] + (r["AIN224"][k * 2 + 1] << 16) temp = [] for IO in self.IOToScan: if 'FIO' in IO: FIO = fio[int(IO[-1])] temp.append(FIO) if 'EIO' in IO: EIO = eio[int(IO[-1])] temp.append(EIO) if 'CIO' in IO: CIO = cio[int(IO[-1])] temp.append(CIO) if 'counter0' in IO: temp.append(counter0) if 'counter1' in IO: temp.append(counter1) self.HallData_EncoderData.append(temp) dataCount += 1 # print ("number of request : %s"%dataCount) packetCount += r['numPackets'] else: print ("No data", datetime.now()) except: print ("".join(i for i in traceback.format_exc())) finally: stop = datetime.now() self._u3.streamStop() print ("stream stopped.") self._u3.close() sampleTotal = packetCount * self._u3.streamSamplesPerPacket scanTotal = sampleTotal / len(self.ALLDIO) # sampleTotal / NumChannels print ("%s requests with %s packets per request with %s samples per packet = %s samples total for %s channels." % ( dataCount, (float(packetCount) / dataCount), self._u3.streamSamplesPerPacket, sampleTotal, len(self.ALLDIO))) print ("%s samples were lost due to errors." % missed) sampleTotal -= missed print ("Adjusted number of samples = %s" % sampleTotal) runTime = (stop - TimeStart).seconds + float((stop - TimeStart).microseconds) / 1000000 print ("The experiment took %s seconds." % runTime) print ("Scan Rate : %s scans / %s seconds = %s Hz" % (scanTotal, runTime, float(scanTotal) / runTime)) print ("Sample Rate : %s samples / %s seconds = %s Hz" % (sampleTotal, runTime, float(sampleTotal) / runTime)) # data = np.array(self.HallData_EncoderData) data = pd.DataFrame(data=self.HallData_EncoderData, columns=self.DataframeHeader) # data = self._u3.processStreamData(data) # print (np.shape(data)) return data # # U3_LV = EFT_Devices() # info = U3_LV._u3.configU3() # Data = U3_LV.LJU3StreamData() # print np.shape(Data) # print (Data)