X-Git-Url: http://sigrok.org/gitweb/?a=blobdiff_plain;f=sigrok-meter;h=d69101c84c0a8e1d55d3101899e238705f8c4ce2;hb=f517686f9b9dd004876be97ad6376021163d4120;hp=604890c5ccae60a5e02bf00bc57bfa4ca22393cf;hpb=73f2129ac6a334a627a39a20aeaa7a731b5b6a0f;p=sigrok-meter.git diff --git a/sigrok-meter b/sigrok-meter index 604890c..d69101c 100755 --- a/sigrok-meter +++ b/sigrok-meter @@ -24,8 +24,6 @@ import argparse import datetime import os.path -import PyQt4.QtCore as QtCore -import PyQt4.QtGui as QtGui import re import sigrok.core as sr import sys @@ -34,37 +32,89 @@ import textwrap default_drivers = [('demo', {'analog_channels': 1})] default_loglevel = sr.LogLevel.WARN -def format_unit(u): - units = { - sr.Unit.VOLT: 'V', - sr.Unit.AMPERE: 'A', - sr.Unit.OHM: u'\u03A9', - sr.Unit.FARAD: 'F', - sr.Unit.KELVIN: 'K', - sr.Unit.CELSIUS: u'\u00B0C', - sr.Unit.FAHRENHEIT: u'\u00B0F', - sr.Unit.HERTZ: 'Hz', - sr.Unit.PERCENTAGE: '%', - # sr.Unit.BOOLEAN - sr.Unit.SECOND: 's', - sr.Unit.SIEMENS: 'S', - sr.Unit.DECIBEL_MW: 'dBu', - sr.Unit.DECIBEL_VOLT: 'dBV', - # sr.Unit.UNITLESS - sr.Unit.DECIBEL_SPL: 'dB', - # sr.Unit.CONCENTRATION - sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', - sr.Unit.VOLT_AMPERE: 'VA', - sr.Unit.WATT: 'W', - sr.Unit.WATT_HOUR: 'Wh', - sr.Unit.METER_SECOND: 'm/s', - sr.Unit.HECTOPASCAL: 'hPa', - sr.Unit.HUMIDITY_293K: '%rF', - sr.Unit.DEGREE: u'\u00B0', - sr.Unit.HENRY: 'H' +def parse_cli(): + parser = argparse.ArgumentParser( + description='Simple sigrok GUI for multimeters and dataloggers.', + epilog=textwrap.dedent('''\ + The DRIVER string is the same as for sigrok-cli(1). + + examples: + + %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 + + %(prog)s --driver uni-t-ut61e:conn=1a86.e008 + '''), + formatter_class=argparse.RawDescriptionHelpFormatter) + + parser.add_argument('-d', '--driver', + action='append', + help='The driver to use') + parser.add_argument('-l', '--loglevel', + type=int, + help='Set loglevel (5 is most verbose)') + parser.add_argument('--pyside', + action='store_true', + default=False, + help='Force use of PySide (default is to use PyQt4)') + args = parser.parse_args() + + result = { + 'drivers': default_drivers, + 'loglevel': default_loglevel, + 'pyside': args.pyside } - return units.get(u, '') + if args.driver: + result['drivers'] = [] + for d in args.driver: + m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) + if not m: + sys.exit('error parsing option "{}"'.format(d)) + + opts = m.group('opts').split(':')[1:] + opts = [tuple(kv.split('=')) for kv in opts] + opts = dict(opts) + + result['drivers'].append((m.group('name'), opts)) + + if args.loglevel != None: + try: + result['loglevel'] = sr.LogLevel.get(args.loglevel) + except: + sys.exit('error: invalid log level') + + return result + +if __name__ == '__main__': + # The command line parsing and import of the Qt modules is done here, + # so that the modules are imported before the classes below derive + # from classes therein. The rest of the main function is at the bottom. + + args = parse_cli() + + if args['pyside']: + from PySide import QtCore, QtGui + else: + try: + # Use version 2 API in all cases, because that's what PySide uses. + import sip + sip.setapi('QVariant', 2) + sip.setapi('QDate', 2) + sip.setapi('QDateTime', 2) + sip.setapi('QString', 2) + sip.setapi('QTextStream', 2) + sip.setapi('QTime', 2) + sip.setapi('QUrl', 2) + sip.setapi('QVariant', 2) + + from PyQt4 import QtCore, QtGui + + # Add PySide compatible names. + QtCore.Signal = QtCore.pyqtSignal + QtCore.Slot = QtCore.pyqtSlot + except: + sys.stderr.write('import of PyQt4 failed, using PySide\n') + from PySide import QtCore, QtGui class SamplingThread(QtCore.QObject): '''A class that handles the reception of sigrok packets in the background.''' @@ -73,64 +123,71 @@ class SamplingThread(QtCore.QObject): '''Helper class that does the actual work in another thread.''' '''Signal emitted when new data arrived.''' - measured = QtCore.pyqtSignal(object, object) + measured = QtCore.Signal(object, object) + + '''Signal emmited in case of an error.''' + error = QtCore.Signal(str) def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() + self.sampling = False + self.drivers = drivers + self.context = sr.Context_create() self.context.log_level = loglevel self.sr_pkg_version = self.context.package_version self.sr_lib_version = self.context.lib_version - self.devices = [] - for name, options in drivers: + @QtCore.Slot() + def start_sampling(self): + devices = [] + for name, options in self.drivers: try: dr = self.context.drivers[name] - self.devices.append(dr.scan(**options)[0]) + devices.append(dr.scan(**options)[0]) except: - print('error getting device for driver "{}", skipping'.format(name)) - - if not self.devices: - print('no devices found') + self.error.emit( + 'Unable to get device for driver "{}".'.format(name)) + return - def start_sampling(self): self.session = self.context.create_session() - for dev in self.devices: + for dev in devices: self.session.add_device(dev) dev.open() self.session.add_datafeed_callback(self.callback) self.session.start() + self.sampling = True self.session.run() + # If sampling is 'True' here, it means that 'stop_sampling()' was + # not called, therefore 'session.run()' ended too early, indicating + # an error. + if self.sampling: + self.error.emit('An error occured during the acquisition.') + def stop_sampling(self): - self.session.stop() + if self.sampling: + self.sampling = False + self.session.stop() def callback(self, device, packet): if packet.type == sr.PacketType.ANALOG: - data = packet.payload.data - unit_str = format_unit(packet.payload.unit) - mqflags, mqflags_str = packet.payload.mq_flags, "" + dev = '{} {}'.format(device.vendor, device.model) - if sr.QuantityFlag.AC in mqflags: - mqflags_str = "AC" - elif sr.QuantityFlag.DC in mqflags: - mqflags_str = "DC" + # only send the most recent value + mag = packet.payload.data[0][-1] - for i in range(packet.payload.num_samples): - dev = "%s %s" % (device.vendor, device.model) - mag_str = "%f" % data[0][i] - val = ' '.join([mag_str, unit_str, mqflags_str]) - - self.measured.emit(dev, val) + self.measured.emit((dev, mag, packet.payload.unit, + packet.payload.mq_flags)) # wait a short time so that in any case we don't flood the GUI # with new data (for example if the demo device is used) self.thread().msleep(100) # signal used to start the worker across threads - _start_signal = QtCore.pyqtSignal() + _start_signal = QtCore.Signal() def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() @@ -141,7 +198,9 @@ class SamplingThread(QtCore.QObject): self._start_signal.connect(self.worker.start_sampling) + # expose the signals of the worker self.measured = self.worker.measured + self.error = self.worker.error self.thread.start() @@ -172,8 +231,11 @@ class SigrokMeter(QtGui.QMainWindow): super(SigrokMeter, self).__init__() self.setup_ui() + self.inf = float('inf') + self.thread = thread self.thread.measured.connect(self.update, QtCore.Qt.QueuedConnection) + self.thread.error.connect(self.error) self.thread.start() def setup_ui(self): @@ -220,6 +282,7 @@ class SigrokMeter(QtGui.QMainWindow): self.statusBar().setSizeGripEnabled(False) + @QtCore.Slot() def show_about(self): text = textwrap.dedent('''\
@@ -238,9 +301,64 @@ class SigrokMeter(QtGui.QMainWindow): QtGui.QMessageBox.about(self, 'About sigrok-meter', text) - def update(self, device, value): + def format_unit(self, u): + units = { + sr.Unit.VOLT: 'V', + sr.Unit.AMPERE: 'A', + sr.Unit.OHM: u'\u03A9', + sr.Unit.FARAD: 'F', + sr.Unit.KELVIN: 'K', + sr.Unit.CELSIUS: u'\u00B0C', + sr.Unit.FAHRENHEIT: u'\u00B0F', + sr.Unit.HERTZ: 'Hz', + sr.Unit.PERCENTAGE: '%', + # sr.Unit.BOOLEAN + sr.Unit.SECOND: 's', + sr.Unit.SIEMENS: 'S', + sr.Unit.DECIBEL_MW: 'dBu', + sr.Unit.DECIBEL_VOLT: 'dBV', + # sr.Unit.UNITLESS + sr.Unit.DECIBEL_SPL: 'dB', + # sr.Unit.CONCENTRATION + sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', + sr.Unit.VOLT_AMPERE: 'VA', + sr.Unit.WATT: 'W', + sr.Unit.WATT_HOUR: 'Wh', + sr.Unit.METER_SECOND: 'm/s', + sr.Unit.HECTOPASCAL: 'hPa', + sr.Unit.HUMIDITY_293K: '%rF', + sr.Unit.DEGREE: u'\u00B0', + sr.Unit.HENRY: 'H' + } + + return units.get(u, '') + + def format_mqflags(self, mqflags): + if sr.QuantityFlag.AC in mqflags: + s = 'AC' + elif sr.QuantityFlag.DC in mqflags: + s = 'DC' + else: + s = '' + + return s + + def format_mag(self, mag): + if mag == self.inf: + return u'\u221E' + return '{:f}'.format(mag) + + @QtCore.Slot(object) + def update(self, data): '''Updates the labels with new measurement values.''' + device, mag, unit, mqflags = data + + unit_str = self.format_unit(unit) + mqflags_str = self.format_mqflags(mqflags) + mag_str = self.format_mag(mag) + value = ' '.join([mag_str, unit_str, mqflags_str]) + n = datetime.datetime.now().time() now = '{:02}:{:02}:{:02}.{:03}'.format( n.hour, n.minute, n.second, n.microsecond / 1000) @@ -249,57 +367,13 @@ class SigrokMeter(QtGui.QMainWindow): self.lblDevName.setText(device) self.lblTime.setText(now) -def parse_cli(): - parser = argparse.ArgumentParser( - description='Simple sigrok GUI for multimeters and dataloggers.', - epilog=textwrap.dedent('''\ - The DRIVER string is the same as for sigrok-cli(1). - - examples: - - %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 - - %(prog)s --driver uni-t-ut61e:conn=1a86.e008 - '''), - formatter_class=argparse.RawDescriptionHelpFormatter) - - parser.add_argument('-d', '--driver', - action='append', - help='The driver to use') - parser.add_argument('-l', '--loglevel', - type=int, - help='Set loglevel (5 is most verbose)') - args = parser.parse_args() - - result = { - 'drivers': default_drivers, - 'loglevel': default_loglevel - } - - if args.driver: - result['drivers'] = [] - for d in args.driver: - m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) - if not m: - sys.exit('error parsing option "{}"'.format(d)) - - opts = m.group('opts').split(':')[1:] - opts = [tuple(kv.split('=')) for kv in opts] - opts = dict(opts) - - result['drivers'].append((m.group('name'), opts)) - - if args.loglevel != None: - try: - result['loglevel'] = sr.LogLevel.get(args.loglevel) - except: - sys.exit('error: invalid log level') - - return result + @QtCore.Slot(str) + def error(self, msg): + '''Error handler for the sampling thread.''' + QtGui.QMessageBox.critical(self, 'Error', msg) + self.close() if __name__ == '__main__': - args = parse_cli() - thread = SamplingThread(args['drivers'], args['loglevel']) app = QtGui.QApplication([])