Hello,
I am streaming an analog input from my LabJack U3-HV with a Raspberry Pi 3b using Python.
The code uses multiprocessing, with one process dedicated to getting data from the LabJack and another one to processing and plotting the data. I have made the data collection loop as lean as possible to minimize the probablility of a buffer overflow. The main process runs a PyQT GUI that plots the data.
What I have found is that up to sampling rates of 2997 samples/s everything works well, but above that I start having many packets with errors. At 2998 and 2999 some packets have no errors, but by 3000 samples/s all 48 packets have an error. The error code is always 55 (STREAM_SCAN_OVERLAP).
When I just transfer data without doing anything to it (using the same streaming code, just not from a GUI program), I can reach rates up to 40000 samples/s before hitting packets with error 59 (which means that the FIFO buffer was overran).
I tried with a Raspberry Pi 4, which is faster, but the problem seems to be the same, so it does not look just like a speed issue.
I don't have easy access to a Windows machine, but the fact that the non-GUI code seems to work well makes me think it is not a hardware issue.
Updated: If I set the resolution to 3, I can get to up to 40000 samples/s! I though that resolution = 0 would auto-select given the sample rate, but it seems it does not. So choosing the right resolution seems important to avoid the overlap errors.
Thanks!
Pablo B.
Can you share how you handle with the loop and multiprocessing the input reading. I am trying to do similar things using, if possible, a funcion which is EVENT BASE rathen then LOOP base.
I am using something more loop based, rather than event based, but here go the main parts of the code, in case you find them useful.
I am using two processes, one that is continuously reading data from the LJ, and pushing it into a pipe. Then, from another process I have a loop that polls the pipe to see whether there is new data and, in the affirmative case, reads it. There is another pipe, which is used to stop the streaming process.
Note that I took this from working code, but I removed some parts (which are not relevant to the basic mechanism), so it may not work just as it is (Note that parts of the code were taken from the LabJack site!).
def streamer(self): try: self.LJ.streamStart() except Exception as e: raise # Flush the control pipe. while self.cr_conn.poll() is True: self.cr_conn.recv() # Get data as long as the control pipe gives something larger than zero. while last_pop > -1: # Initialize an empty string for raw StreamData bytes. # Raw data bytes from StreamData are returned in byte string format. rawDataString = b"" # For Python3 # Reset the missed samples counter to zero. missed = 0 # Run a loop to read samples, and append them to a list. try: # Process each returned request from the dictionary returned bythe LabJack. status = "OK" for r in self.LJ.streamData(False): if r is not None: if r['errors'] != 0: status="({time:.3f}) ERROR: {errors}".format(time=my_timer(), errors=r['errors']) if r['numPackets'] != self.LJ.packetsPerRequest: status="({time:.3f}) UNDERFLOW: {npackets}".format(time=my_timer(), npackets=r['numPackets']) if r['missed'] != 0: missed += r['missed'] # Get the raw data. rawDataString += r['result'] break else: # Got no data back from our read. # This only happens if your stream isn't faster than the # the USB read timeout, ~1 sec. status="({time:.3f}) TIMEOUT: No data".format(time=my_timer()) except Exception as e: print ("Stopping:", repr(e)) finally: # Send data through the data pipe. count += 1 pending = count - last_pop - 1 self.ds_conn.send((rawDataString, missed, status)) # Once we're out of the loop, clean up. self.LJ.streamStop() # Stop the stream. # Flush again the control pipe (just in case). while self.cr_conn.poll() is True: self.cr_conn.recv()The reader (running in the main process), gets the raw data from the pipe and converts in into voltages, and returns in to whoever called it.
def get_stream_data(self): (rawDataString, m, s) = self.dr_conn.recv() # Check for packet errors. The error information is stored as a dictionary, with the error number and how many packets appeared with it. #StreamData packets are 64 bytes and the 11th byte is the error code. #Iterating through error code bytes and displaying the error code #when detected. err_dict = {} for err in rawDataString[11::64]: try: err_dict[err] += 1 except KeyError: err_dict[err] = 1 # Convert all raw data bytes to their proper voltages using the right input calibration. V = self.LJ.processStreamData(rawDataString)[self.chanid] ## Update instance values. self.tot_missed += m self.total_samples += self.samples return ((V, m, repr(err_dict)))There is a function to check whether the data pipe has any data waiting
def data_available(self, timeout=1): """ Returns true if the receiving pipe has data. """ return self.dr_conn.poll(timeout)To start the streaming, we create a Process object:
# Set up the streaming process. self.streaming_process = mp.Process(target=self.streamer) # Start the streaming. self.streaming_process.start()To read, first create the instance of the Streamer class that has the above methods, and call the get_stream_data() method in a loop while there is data available.
streamer = Streamer() streamer.start_streaming() while streamer.data_available() is True: (V,m,s) = streamer.get_stream_data()I hope this is somewhat useful...
To make it event-based could be tricky! Mostly because you do not want your streaming process to call a callback, you want that one focused on getting data off the LJ. It's probably doable, but above my current expertise I would guess.
Pablo B.