+import argparse
+import datetime
+import os.path
+import PyQt4.QtCore as QtCore
+import PyQt4.QtGui as QtGui
+import re
+import sigrok.core as sr
+import sys
+import textwrap
+
+default_drivers = [('demo', {'analog_channels': 1})]
+default_loglevel = sr.LogLevel.WARN
+
+def format_unit(u):
+ units = {
+ sr.Unit.VOLT: 'V',
+ sr.Unit.AMPERE: 'A',
+ sr.Unit.OHM: u'\u03A9',
+ sr.Unit.FARAD: 'F',
+ sr.Unit.KELVIN: 'K',
+ sr.Unit.CELSIUS: u'\u00B0C',
+ sr.Unit.FAHRENHEIT: u'\u00B0F',
+ sr.Unit.HERTZ: 'Hz',
+ sr.Unit.PERCENTAGE: '%',
+ # sr.Unit.BOOLEAN
+ sr.Unit.SECOND: 's',
+ sr.Unit.SIEMENS: 'S',
+ sr.Unit.DECIBEL_MW: 'dBu',
+ sr.Unit.DECIBEL_VOLT: 'dBV',
+ # sr.Unit.UNITLESS
+ sr.Unit.DECIBEL_SPL: 'dB',
+ # sr.Unit.CONCENTRATION
+ sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm',
+ sr.Unit.VOLT_AMPERE: 'VA',
+ sr.Unit.WATT: 'W',
+ sr.Unit.WATT_HOUR: 'Wh',
+ sr.Unit.METER_SECOND: 'm/s',
+ sr.Unit.HECTOPASCAL: 'hPa',
+ sr.Unit.HUMIDITY_293K: '%rF',
+ sr.Unit.DEGREE: u'\u00B0',
+ sr.Unit.HENRY: 'H'
+ }
+
+ return units.get(u, '')
+
+class SamplingThread(QtCore.QObject):
+ '''A class that handles the reception of sigrok packets in the background.'''
+
+ class Worker(QtCore.QObject):
+ '''Helper class that does the actual work in another thread.'''
+
+ '''Signal emitted when new data arrived.'''
+ measured = QtCore.pyqtSignal(object, object)
+
+ def __init__(self, drivers, loglevel):
+ super(self.__class__, self).__init__()
+
+ self.context = sr.Context_create()
+ self.context.log_level = loglevel
+
+ self.sr_pkg_version = self.context.package_version
+ self.sr_lib_version = self.context.lib_version
+
+ self.devices = []
+ for name, options in drivers:
+ try:
+ dr = self.context.drivers[name]
+ self.devices.append(dr.scan(**options)[0])
+ except:
+ print('error getting device for driver "{}", skipping'.format(name))
+
+ if not self.devices:
+ print('no devices found')
+
+ def start_sampling(self):
+ self.session = self.context.create_session()
+ for dev in self.devices:
+ self.session.add_device(dev)
+ dev.open()
+ self.session.add_datafeed_callback(self.callback)
+ self.session.start()
+ self.session.run()
+
+ def stop_sampling(self):
+ self.session.stop()
+
+ def callback(self, device, packet):
+ if packet.type == sr.PacketType.ANALOG:
+ data = packet.payload.data
+ unit_str = format_unit(packet.payload.unit)
+ mqflags, mqflags_str = packet.payload.mq_flags, ""
+
+ if sr.QuantityFlag.AC in mqflags:
+ mqflags_str = "AC"
+ elif sr.QuantityFlag.DC in mqflags:
+ mqflags_str = "DC"
+
+ for i in range(packet.payload.num_samples):
+ dev = "%s %s" % (device.vendor, device.model)
+ mag_str = "%f" % data[0][i]
+ val = ' '.join([mag_str, unit_str, mqflags_str])
+
+ self.measured.emit(dev, val)
+
+ # wait a short time so that in any case we don't flood the GUI
+ # with new data (for example if the demo device is used)
+ self.thread().msleep(100)
+
+ # signal used to start the worker across threads
+ _start_signal = QtCore.pyqtSignal()
+
+ def __init__(self, drivers, loglevel):
+ super(self.__class__, self).__init__()
+
+ self.worker = self.Worker(drivers, loglevel)
+ self.thread = QtCore.QThread()
+ self.worker.moveToThread(self.thread)
+
+ self._start_signal.connect(self.worker.start_sampling)
+
+ self.measured = self.worker.measured
+
+ self.thread.start()
+
+ def start(self):
+ '''Starts sampling'''
+ self._start_signal.emit()
+
+ def stop(self):
+ '''Stops sampling and the background thread.'''
+ self.worker.stop_sampling()
+ self.thread.quit()
+ # the timeout is needed when the demo device is used, because it
+ # produces so much outstanding data that quitting takes a long time
+ self.thread.wait(500)
+
+ def sr_pkg_version(self):
+ '''Returns the version number of the libsigrok package.'''
+ return self.worker.sr_pkg_version