import argparse
import datetime
import os.path
-import PyQt4.QtCore as QtCore
-import PyQt4.QtGui as QtGui
import re
import sigrok.core as sr
import sys
default_drivers = [('demo', {'analog_channels': 1})]
default_loglevel = sr.LogLevel.WARN
-def format_unit(u):
- units = {
- sr.Unit.VOLT: 'V',
- sr.Unit.AMPERE: 'A',
- sr.Unit.OHM: u'\u03A9',
- sr.Unit.FARAD: 'F',
- sr.Unit.KELVIN: 'K',
- sr.Unit.CELSIUS: u'\u00B0C',
- sr.Unit.FAHRENHEIT: u'\u00B0F',
- sr.Unit.HERTZ: 'Hz',
- sr.Unit.PERCENTAGE: '%',
- # sr.Unit.BOOLEAN
- sr.Unit.SECOND: 's',
- sr.Unit.SIEMENS: 'S',
- sr.Unit.DECIBEL_MW: 'dBu',
- sr.Unit.DECIBEL_VOLT: 'dBV',
- # sr.Unit.UNITLESS
- sr.Unit.DECIBEL_SPL: 'dB',
- # sr.Unit.CONCENTRATION
- sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm',
- sr.Unit.VOLT_AMPERE: 'VA',
- sr.Unit.WATT: 'W',
- sr.Unit.WATT_HOUR: 'Wh',
- sr.Unit.METER_SECOND: 'm/s',
- sr.Unit.HECTOPASCAL: 'hPa',
- sr.Unit.HUMIDITY_293K: '%rF',
- sr.Unit.DEGREE: u'\u00B0',
- sr.Unit.HENRY: 'H'
+def parse_cli():
+ parser = argparse.ArgumentParser(
+ description='Simple sigrok GUI for multimeters and dataloggers.',
+ epilog=textwrap.dedent('''\
+ The DRIVER string is the same as for sigrok-cli(1).
+
+ examples:
+
+ %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0
+
+ %(prog)s --driver uni-t-ut61e:conn=1a86.e008
+ '''),
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+
+ parser.add_argument('-d', '--driver',
+ action='append',
+ help='The driver to use')
+ parser.add_argument('-l', '--loglevel',
+ type=int,
+ help='Set loglevel (5 is most verbose)')
+ parser.add_argument('--pyside',
+ action='store_true',
+ default=False,
+ help='Force use of PySide (default is to use PyQt4)')
+ args = parser.parse_args()
+
+ result = {
+ 'drivers': default_drivers,
+ 'loglevel': default_loglevel,
+ 'pyside': args.pyside
}
- return units.get(u, '')
+ if args.driver:
+ result['drivers'] = []
+ for d in args.driver:
+ m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)', d)
+ if not m:
+ sys.exit('error parsing option "{}"'.format(d))
+
+ opts = m.group('opts').split(':')[1:]
+ opts = [tuple(kv.split('=')) for kv in opts]
+ opts = dict(opts)
+
+ result['drivers'].append((m.group('name'), opts))
+
+ if args.loglevel != None:
+ try:
+ result['loglevel'] = sr.LogLevel.get(args.loglevel)
+ except:
+ sys.exit('error: invalid log level')
+
+ return result
+
+if __name__ == '__main__':
+ # The command line parsing and import of the Qt modules is done here,
+ # so that the modules are imported before the classes below derive
+ # from classes therein. The rest of the main function is at the bottom.
+
+ args = parse_cli()
+
+ if args['pyside']:
+ from PySide import QtCore, QtGui
+ else:
+ try:
+ # Use version 2 API in all cases, because that's what PySide uses.
+ import sip
+ sip.setapi('QVariant', 2)
+ sip.setapi('QDate', 2)
+ sip.setapi('QDateTime', 2)
+ sip.setapi('QString', 2)
+ sip.setapi('QTextStream', 2)
+ sip.setapi('QTime', 2)
+ sip.setapi('QUrl', 2)
+ sip.setapi('QVariant', 2)
+
+ from PyQt4 import QtCore, QtGui
+
+ # Add PySide compatible names.
+ QtCore.Signal = QtCore.pyqtSignal
+ QtCore.Slot = QtCore.pyqtSlot
+ except:
+ sys.stderr.write('import of PyQt4 failed, using PySide\n')
+ from PySide import QtCore, QtGui
class SamplingThread(QtCore.QObject):
'''A class that handles the reception of sigrok packets in the background.'''
'''Helper class that does the actual work in another thread.'''
'''Signal emitted when new data arrived.'''
- measured = QtCore.pyqtSignal(object, object)
+ measured = QtCore.Signal(object, object)
+
+ '''Signal emmited in case of an error.'''
+ error = QtCore.Signal(str)
def __init__(self, drivers, loglevel):
super(self.__class__, self).__init__()
+ self.sampling = False
+ self.drivers = drivers
+
self.context = sr.Context_create()
self.context.log_level = loglevel
self.sr_pkg_version = self.context.package_version
self.sr_lib_version = self.context.lib_version
- self.devices = []
- for name, options in drivers:
+ @QtCore.Slot()
+ def start_sampling(self):
+ devices = []
+ for name, options in self.drivers:
try:
dr = self.context.drivers[name]
- self.devices.append(dr.scan(**options)[0])
+ devices.append(dr.scan(**options)[0])
except:
- print('error getting device for driver "{}", skipping'.format(name))
-
- if not self.devices:
- print('no devices found')
+ self.error.emit(
+ 'Unable to get device for driver "{}".'.format(name))
+ return
- def start_sampling(self):
self.session = self.context.create_session()
- for dev in self.devices:
+ for dev in devices:
self.session.add_device(dev)
dev.open()
self.session.add_datafeed_callback(self.callback)
self.session.start()
+ self.sampling = True
self.session.run()
+ # If sampling is 'True' here, it means that 'stop_sampling()' was
+ # not called, therefore 'session.run()' ended too early, indicating
+ # an error.
+ if self.sampling:
+ self.error.emit('An error occured during the acquisition.')
+
def stop_sampling(self):
- self.session.stop()
+ if self.sampling:
+ self.sampling = False
+ self.session.stop()
def callback(self, device, packet):
if packet.type == sr.PacketType.ANALOG:
- data = packet.payload.data
- unit_str = format_unit(packet.payload.unit)
- mqflags, mqflags_str = packet.payload.mq_flags, ""
+ dev = '{} {}'.format(device.vendor, device.model)
- if sr.QuantityFlag.AC in mqflags:
- mqflags_str = "AC"
- elif sr.QuantityFlag.DC in mqflags:
- mqflags_str = "DC"
+ # only send the most recent value
+ mag = packet.payload.data[0][-1]
- for i in range(packet.payload.num_samples):
- dev = "%s %s" % (device.vendor, device.model)
- mag_str = "%f" % data[0][i]
- val = ' '.join([mag_str, unit_str, mqflags_str])
-
- self.measured.emit(dev, val)
+ self.measured.emit((dev, mag, packet.payload.unit,
+ packet.payload.mq_flags))
# wait a short time so that in any case we don't flood the GUI
# with new data (for example if the demo device is used)
self.thread().msleep(100)
# signal used to start the worker across threads
- _start_signal = QtCore.pyqtSignal()
+ _start_signal = QtCore.Signal()
def __init__(self, drivers, loglevel):
super(self.__class__, self).__init__()
self._start_signal.connect(self.worker.start_sampling)
+ # expose the signals of the worker
self.measured = self.worker.measured
+ self.error = self.worker.error
self.thread.start()
super(SigrokMeter, self).__init__()
self.setup_ui()
+ self.inf = float('inf')
+
self.thread = thread
self.thread.measured.connect(self.update, QtCore.Qt.QueuedConnection)
+ self.thread.error.connect(self.error)
self.thread.start()
def setup_ui(self):
self.statusBar().setSizeGripEnabled(False)
+ @QtCore.Slot()
def show_about(self):
text = textwrap.dedent('''\
<div align="center">
QtGui.QMessageBox.about(self, 'About sigrok-meter', text)
- def update(self, device, value):
+ def format_unit(self, u):
+ units = {
+ sr.Unit.VOLT: 'V',
+ sr.Unit.AMPERE: 'A',
+ sr.Unit.OHM: u'\u03A9',
+ sr.Unit.FARAD: 'F',
+ sr.Unit.KELVIN: 'K',
+ sr.Unit.CELSIUS: u'\u00B0C',
+ sr.Unit.FAHRENHEIT: u'\u00B0F',
+ sr.Unit.HERTZ: 'Hz',
+ sr.Unit.PERCENTAGE: '%',
+ # sr.Unit.BOOLEAN
+ sr.Unit.SECOND: 's',
+ sr.Unit.SIEMENS: 'S',
+ sr.Unit.DECIBEL_MW: 'dBu',
+ sr.Unit.DECIBEL_VOLT: 'dBV',
+ # sr.Unit.UNITLESS
+ sr.Unit.DECIBEL_SPL: 'dB',
+ # sr.Unit.CONCENTRATION
+ sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm',
+ sr.Unit.VOLT_AMPERE: 'VA',
+ sr.Unit.WATT: 'W',
+ sr.Unit.WATT_HOUR: 'Wh',
+ sr.Unit.METER_SECOND: 'm/s',
+ sr.Unit.HECTOPASCAL: 'hPa',
+ sr.Unit.HUMIDITY_293K: '%rF',
+ sr.Unit.DEGREE: u'\u00B0',
+ sr.Unit.HENRY: 'H'
+ }
+
+ return units.get(u, '')
+
+ def format_mqflags(self, mqflags):
+ if sr.QuantityFlag.AC in mqflags:
+ s = 'AC'
+ elif sr.QuantityFlag.DC in mqflags:
+ s = 'DC'
+ else:
+ s = ''
+
+ return s
+
+ def format_mag(self, mag):
+ if mag == self.inf:
+ return u'\u221E'
+ return '{:f}'.format(mag)
+
+ @QtCore.Slot(object)
+ def update(self, data):
'''Updates the labels with new measurement values.'''
+ device, mag, unit, mqflags = data
+
+ unit_str = self.format_unit(unit)
+ mqflags_str = self.format_mqflags(mqflags)
+ mag_str = self.format_mag(mag)
+ value = ' '.join([mag_str, unit_str, mqflags_str])
+
n = datetime.datetime.now().time()
now = '{:02}:{:02}:{:02}.{:03}'.format(
n.hour, n.minute, n.second, n.microsecond / 1000)
self.lblDevName.setText(device)
self.lblTime.setText(now)
-def parse_cli():
- parser = argparse.ArgumentParser(
- description='Simple sigrok GUI for multimeters and dataloggers.',
- epilog=textwrap.dedent('''\
- The DRIVER string is the same as for sigrok-cli(1).
-
- examples:
-
- %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0
-
- %(prog)s --driver uni-t-ut61e:conn=1a86.e008
- '''),
- formatter_class=argparse.RawDescriptionHelpFormatter)
-
- parser.add_argument('-d', '--driver',
- action='append',
- help='The driver to use')
- parser.add_argument('-l', '--loglevel',
- type=int,
- help='Set loglevel (5 is most verbose)')
- args = parser.parse_args()
-
- result = {
- 'drivers': default_drivers,
- 'loglevel': default_loglevel
- }
-
- if args.driver:
- result['drivers'] = []
- for d in args.driver:
- m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)', d)
- if not m:
- sys.exit('error parsing option "{}"'.format(d))
-
- opts = m.group('opts').split(':')[1:]
- opts = [tuple(kv.split('=')) for kv in opts]
- opts = dict(opts)
-
- result['drivers'].append((m.group('name'), opts))
-
- if args.loglevel != None:
- try:
- result['loglevel'] = sr.LogLevel.get(args.loglevel)
- except:
- sys.exit('error: invalid log level')
-
- return result
+ @QtCore.Slot(str)
+ def error(self, msg):
+ '''Error handler for the sampling thread.'''
+ QtGui.QMessageBox.critical(self, 'Error', msg)
+ self.close()
if __name__ == '__main__':
- args = parse_cli()
-
thread = SamplingThread(args['drivers'], args['loglevel'])
app = QtGui.QApplication([])