X-Git-Url: http://sigrok.org/gitweb/?a=blobdiff_plain;f=sigrok-meter;h=92de89973b17bb20f30ddb438be7ca4f7cd02201;hb=1f199679144d9f01279cb6f28f9deb90394b415a;hp=9fc2c2b6fdec7ad80f9bdf38af7a15c524bbddd3;hpb=782f592684ead79ffbc50880a066affdf9995b7a;p=sigrok-meter.git diff --git a/sigrok-meter b/sigrok-meter index 9fc2c2b..92de899 100755 --- a/sigrok-meter +++ b/sigrok-meter @@ -4,6 +4,7 @@ ## This file is part of the sigrok-meter project. ## ## Copyright (C) 2013 Uwe Hermann +## Copyright (C) 2014 Jens Steinhauser ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -21,103 +22,328 @@ ## import argparse -from multiprocessing import Process, Queue -from gi.repository import Gtk, GObject +import datetime +import os.path +import re import sigrok.core as sr import sys +import textwrap +default_drivers = [('demo', {'analog_channels': 1})] default_loglevel = sr.LogLevel.WARN -def init_and_run(queue, loglevel): - def datafeed_in(device, packet): - if packet.type == sr.PacketType.ANALOG: - data = packet.payload.data - unit, unit_str = packet.payload.unit, "" - if unit == sr.Unit.VOLT: - unit_str = " V" - elif unit == sr.Unit.OHM: - unit_str = " Ohm" - elif unit == sr.Unit.AMPERE: - unit_str = " A" - mqflags, mqflags_str = packet.payload.mq_flags, "" - if sr.QuantityFlag.AC in mqflags: - mqflags_str = " AC" - elif sr.QuantityFlag.DC in mqflags: - mqflags_str = " DC" - for i in range(packet.payload.num_samples): - dev = "%s %s" % (device.vendor, device.model) - val = "%f%s%s" % (data[0][i], unit_str, mqflags_str) - queue.put((dev, val)) - - context = sr.Context_create() - context.log_level = loglevel - drivers_to_use = ['tecpel-dmm-8061-ser'] - drivers = [context.drivers[d] for d in drivers_to_use] - devices = [d.scan(conn="/dev/ttyUSB0")[0] for d in drivers] - # devices = [d.scan()[0] for d in drivers] - # for dev in devices: - # dev.limit_samples = 1000 - session = context.create_session() - for dev in devices: - session.add_device(dev) - dev.open() - session.add_datafeed_callback(datafeed_in) - session.start() - session.run() - session.stop() - -class SigrokMeter: - def __init__(self): - self.builder = Gtk.Builder() - self.builder.add_from_file("sigrok-meter.glade") - self.builder.connect_signals(self) - self.value_label = self.builder.get_object("value_label") - self.value_label2 = self.builder.get_object("value_label2") - self.win = self.builder.get_object("mainwindow") - self.win.show_all() - self.queue = Queue() - GObject.timeout_add(100, self.update_label_if_needed) - - def update_label_if_needed(self): - try: - t = self.queue.get_nowait() - l = self.value_label if t[0] != "Victor" else self.value_label2 - l.set_text("%s: %s" % (t[0], t[1])) - except: - pass - GObject.timeout_add(100, self.update_label_if_needed) - - def on_quit(self, *args): - Gtk.main_quit(*args) - - def on_about(self, action): - about = self.builder.get_object("aboutdialog") - context = sr.Context_create() - sr_pkg = context.package_version - sr_lib = context.lib_version - s = "Using libsigrok %s (lib version %s)." % (sr_pkg, sr_lib) - about.set_comments(s) - about.run() - about.hide() - -if __name__ == '__main__': +def parse_cli(): parser = argparse.ArgumentParser( - description='Simple sigrok GUI for multimeters and dataloggers.') + description='Simple sigrok GUI for multimeters and dataloggers.', + epilog=textwrap.dedent('''\ + The DRIVER string is the same as for sigrok-cli(1). + + examples: + + %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 + + %(prog)s --driver uni-t-ut61e:conn=1a86.e008 + '''), + formatter_class=argparse.RawDescriptionHelpFormatter) + + parser.add_argument('-d', '--driver', + action='append', + help='The driver to use') parser.add_argument('-l', '--loglevel', type=int, help='Set loglevel (5 is most verbose)') + parser.add_argument('--pyside', + action='store_true', + default=False, + help='Force use of PySide (default is to use PyQt4)') args = parser.parse_args() - loglevel = default_loglevel + result = { + 'drivers': default_drivers, + 'loglevel': default_loglevel, + 'pyside': args.pyside + } + + if args.driver: + result['drivers'] = [] + for d in args.driver: + m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) + if not m: + sys.exit('error parsing option "{}"'.format(d)) + + opts = m.group('opts').split(':')[1:] + opts = [tuple(kv.split('=')) for kv in opts] + opts = dict(opts) + + result['drivers'].append((m.group('name'), opts)) + if args.loglevel != None: try: - loglevel = sr.LogLevel.get(args.loglevel) + result['loglevel'] = sr.LogLevel.get(args.loglevel) except: sys.exit('error: invalid log level') - s = SigrokMeter() - process = Process(target=init_and_run, args=(s.queue, loglevel)) - process.start() - Gtk.main() - process.terminate() + return result + +if __name__ == '__main__': + # The command line parsing and import of the Qt modules is done here, + # so that the modules are imported before the classes below derive + # from classes therein. The rest of the main function is at the bottom. + + args = parse_cli() + + global qt_signal + + if args['pyside']: + from PySide import QtCore, QtGui + qt_signal = QtCore.Signal + else: + try: + from PyQt4 import QtCore, QtGui + qt_signal = QtCore.pyqtSignal + except: + sys.stderr.write('import of PyQt4 failed, using PySide\n') + from PySide import QtCore, QtGui + qt_signal = QtCore.Signal + +class SamplingThread(QtCore.QObject): + '''A class that handles the reception of sigrok packets in the background.''' + + class Worker(QtCore.QObject): + '''Helper class that does the actual work in another thread.''' + + '''Signal emitted when new data arrived.''' + measured = qt_signal(object) + + def __init__(self, drivers, loglevel): + super(self.__class__, self).__init__() + + self.context = sr.Context_create() + self.context.log_level = loglevel + + self.sr_pkg_version = self.context.package_version + self.sr_lib_version = self.context.lib_version + + self.devices = [] + for name, options in drivers: + try: + dr = self.context.drivers[name] + self.devices.append(dr.scan(**options)[0]) + except: + print('error getting device for driver "{}", skipping'.format(name)) + + if not self.devices: + print('no devices found') + + def start_sampling(self): + self.session = self.context.create_session() + for dev in self.devices: + self.session.add_device(dev) + dev.open() + self.session.add_datafeed_callback(self.callback) + self.session.start() + self.session.run() + + def stop_sampling(self): + self.session.stop() + + def callback(self, device, packet): + if packet.type == sr.PacketType.ANALOG: + dev = '{} {}'.format(device.vendor, device.model) + + # only send the most recent value + mag = packet.payload.data[0][-1] + + self.measured.emit((dev, mag, packet.payload.unit, + packet.payload.mq_flags)) + + # wait a short time so that in any case we don't flood the GUI + # with new data (for example if the demo device is used) + self.thread().msleep(100) + + # signal used to start the worker across threads + _start_signal = qt_signal() + + def __init__(self, drivers, loglevel): + super(self.__class__, self).__init__() + + self.worker = self.Worker(drivers, loglevel) + self.thread = QtCore.QThread() + self.worker.moveToThread(self.thread) + + self._start_signal.connect(self.worker.start_sampling) + + self.measured = self.worker.measured + + self.thread.start() + + def start(self): + '''Starts sampling''' + self._start_signal.emit() + + def stop(self): + '''Stops sampling and the background thread.''' + self.worker.stop_sampling() + self.thread.quit() + # the timeout is needed when the demo device is used, because it + # produces so much outstanding data that quitting takes a long time + self.thread.wait(500) + + def sr_pkg_version(self): + '''Returns the version number of the libsigrok package.''' + return self.worker.sr_pkg_version + + def sr_lib_version(self): + '''Returns the version number fo the libsigrok library.''' + return self.worker.sr_lib_version + +class SigrokMeter(QtGui.QMainWindow): + '''The main window of the application.''' + + def __init__(self, thread): + super(SigrokMeter, self).__init__() + self.setup_ui() + + self.inf = float('inf') + + self.thread = thread + self.thread.measured.connect(self.update, QtCore.Qt.QueuedConnection) + self.thread.start() + + def setup_ui(self): + self.setWindowTitle('sigrok-meter') + self.setMinimumHeight(130) + self.setMinimumWidth(260) + + p = os.path.abspath(os.path.dirname(__file__)) + p = os.path.join(p, 'sigrok-logo-notext.png') + self.setWindowIcon(QtGui.QIcon(p)) + + actionQuit = QtGui.QAction(self) + actionQuit.setText('&Quit') + actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit')) + actionQuit.setShortcut('Ctrl+Q') + actionQuit.triggered.connect(self.close) + + actionAbout = QtGui.QAction(self) + actionAbout.setText('&About') + actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about')) + actionAbout.triggered.connect(self.show_about) + + menubar = self.menuBar() + menuFile = menubar.addMenu('&File') + menuFile.addAction(actionQuit) + menuHelp = menubar.addMenu('&Help') + menuHelp.addAction(actionAbout) + + self.lblValue = QtGui.QLabel('waiting for data...') + self.lblValue.setAlignment(QtCore.Qt.AlignCenter) + font = self.lblValue.font() + font.setPointSize(font.pointSize() * 1.7) + font.setBold(True) + self.lblValue.setFont(font) + self.setCentralWidget(self.lblValue) + self.centralWidget().setContentsMargins(0, 0, 0, 0) + + self.lblDevName = QtGui.QLabel() + self.lblDevName.setToolTip('Name of used measurement device.') + self.statusBar().insertWidget(0, self.lblDevName, 10) + self.lblTime = QtGui.QLabel() + self.lblTime.setToolTip('Time of the last measurement.') + self.statusBar().insertWidget(1, self.lblTime) + + self.statusBar().setSizeGripEnabled(False) + + def show_about(self): + text = textwrap.dedent('''\ +
+ sigrok-meter
+ 0.1.0
+ Using libsigrok {} (lib version {}).
+ + http://www.sigrok.org
+
+ This program comes with ABSOLUTELY NO WARRANTY;
+ for details visit + + http://www.gnu.org/licenses/gpl.html +
+ '''.format(self.thread.sr_pkg_version(), self.thread.sr_lib_version())) + + QtGui.QMessageBox.about(self, 'About sigrok-meter', text) + + def format_unit(self, u): + units = { + sr.Unit.VOLT: 'V', + sr.Unit.AMPERE: 'A', + sr.Unit.OHM: u'\u03A9', + sr.Unit.FARAD: 'F', + sr.Unit.KELVIN: 'K', + sr.Unit.CELSIUS: u'\u00B0C', + sr.Unit.FAHRENHEIT: u'\u00B0F', + sr.Unit.HERTZ: 'Hz', + sr.Unit.PERCENTAGE: '%', + # sr.Unit.BOOLEAN + sr.Unit.SECOND: 's', + sr.Unit.SIEMENS: 'S', + sr.Unit.DECIBEL_MW: 'dBu', + sr.Unit.DECIBEL_VOLT: 'dBV', + # sr.Unit.UNITLESS + sr.Unit.DECIBEL_SPL: 'dB', + # sr.Unit.CONCENTRATION + sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', + sr.Unit.VOLT_AMPERE: 'VA', + sr.Unit.WATT: 'W', + sr.Unit.WATT_HOUR: 'Wh', + sr.Unit.METER_SECOND: 'm/s', + sr.Unit.HECTOPASCAL: 'hPa', + sr.Unit.HUMIDITY_293K: '%rF', + sr.Unit.DEGREE: u'\u00B0', + sr.Unit.HENRY: 'H' + } + + return units.get(u, '') + + def format_mqflags(self, mqflags): + if sr.QuantityFlag.AC in mqflags: + s = 'AC' + elif sr.QuantityFlag.DC in mqflags: + s = 'DC' + else: + s = '' + + return s + + def format_mag(self, mag): + if mag == self.inf: + return u'\u221E' + return '{:f}'.format(mag) + + def update(self, data): + '''Updates the labels with new measurement values.''' + + device, mag, unit, mqflags = data + + unit_str = self.format_unit(unit) + mqflags_str = self.format_mqflags(mqflags) + mag_str = self.format_mag(mag) + value = ' '.join([mag_str, unit_str, mqflags_str]) + + n = datetime.datetime.now().time() + now = '{:02}:{:02}:{:02}.{:03}'.format( + n.hour, n.minute, n.second, n.microsecond / 1000) + + self.lblValue.setText(value) + self.lblDevName.setText(device) + self.lblTime.setText(now) + +if __name__ == '__main__': + thread = SamplingThread(args['drivers'], args['loglevel']) + + app = QtGui.QApplication([]) + s = SigrokMeter(thread) + s.show() + r = app.exec_() + thread.stop() + sys.exit(r)