X-Git-Url: http://sigrok.org/gitweb/?a=blobdiff_plain;f=sigrok-meter;h=0c263a448ef3df4fce1dd5f0a8b34e8b5df66203;hb=e65cc368ddce9b843d191ad7ec5e07fa6b2c6f85;hp=289d25c6dd97d8b1b1b7b190c634b0ecfdd4427d;hpb=627e5230c43fdda5595a20c52e6d5da976054752;p=sigrok-meter.git diff --git a/sigrok-meter b/sigrok-meter index 289d25c..0c263a4 100755 --- a/sigrok-meter +++ b/sigrok-meter @@ -1,8 +1,10 @@ -#!/usr/bin/python3 +#!/usr/bin/env python + ## ## This file is part of the sigrok-meter project. ## ## Copyright (C) 2013 Uwe Hermann +## Copyright (C) 2014 Jens Steinhauser ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -19,85 +21,478 @@ ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA ## -from multiprocessing import Process, Queue -from gi.repository import Gtk, GObject -from sigrok.core import * -from sigrok.core import lowlevel as ll - -def init_and_run(queue): - def datafeed_in(device, packet): - if packet.type is PacketType.ANALOG: - data = packet.payload.data - unit, unit_str = packet.payload.unit, "" - if unit is Unit.VOLT: - unit_str = " V" - elif unit is Unit.OHM: - unit_str = " Ohm" - elif unit is Unit.AMPERE: - unit_str = " A" - mqflags, mqflags_str = packet.payload.mqflags, "" - if QuantityFlag.AC in mqflags: - mqflags_str = " AC" - elif QuantityFlag.DC in mqflags: - mqflags_str = " DC" - for i in range(packet.payload.num_samples): - dev = "%s" % device.vendor - val = "%f%s%s" % (data[i], unit_str, mqflags_str) - queue.put((dev, val)) - - # log = Log() - # log.level = LogLevel.SPEW - context = Context() - drivers_to_use = ['voltcraft-vc820', 'victor-dmm'] - drivers = [context.drivers[d] for d in drivers_to_use] - devices = [d.scan()[0] for d in drivers] - for dev in devices: - dev.limit_samples = 1000 - session = Session(context) - for dev in devices: - session.add_device(dev) - session.add_callback(datafeed_in) - session.start() - session.run() - session.stop() - -class SigrokMeter: - def __init__(self): - self.builder = Gtk.Builder() - self.builder.add_from_file("sigrok-meter.glade") - self.builder.connect_signals(self) - self.value_label = self.builder.get_object("value_label") - self.value_label2 = self.builder.get_object("value_label2") - self.win = self.builder.get_object("mainwindow") - self.win.show_all() - self.queue = Queue() - GObject.timeout_add(100, self.update_label_if_needed) - - def update_label_if_needed(self): +import argparse +import datetime +import os.path +import re +import sigrok.core as sr +import sys +import textwrap + +default_drivers = [('demo', {'analog_channels': 4})] +default_loglevel = sr.LogLevel.WARN + +def parse_cli(): + parser = argparse.ArgumentParser( + description='Simple sigrok GUI for multimeters and dataloggers.', + epilog=textwrap.dedent('''\ + The DRIVER string is the same as for sigrok-cli(1). + + examples: + + %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 + + %(prog)s --driver uni-t-ut61e:conn=1a86.e008 + '''), + formatter_class=argparse.RawDescriptionHelpFormatter) + + parser.add_argument('-d', '--driver', + action='append', + help='The driver to use') + parser.add_argument('-l', '--loglevel', + type=int, + help='Set loglevel (5 is most verbose)') + parser.add_argument('--pyside', + action='store_true', + default=False, + help='Force use of PySide (default is to use PyQt4)') + args = parser.parse_args() + + result = { + 'drivers': default_drivers, + 'loglevel': default_loglevel, + 'pyside': args.pyside + } + + if args.driver: + result['drivers'] = [] + for d in args.driver: + m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) + if not m: + sys.exit('error parsing option "{}"'.format(d)) + + opts = m.group('opts').split(':')[1:] + opts = [tuple(kv.split('=')) for kv in opts] + opts = dict(opts) + + result['drivers'].append((m.group('name'), opts)) + + if args.loglevel != None: try: - t = self.queue.get_nowait() - l = self.value_label if t[0] != "Victor" else self.value_label2 - l.set_text("%s: %s" % (t[0], t[1])) + result['loglevel'] = sr.LogLevel.get(args.loglevel) except: - pass - GObject.timeout_add(100, self.update_label_if_needed) + sys.exit('error: invalid log level') + + return result + +if __name__ == '__main__': + # The command line parsing and import of the Qt modules is done here, + # so that the modules are imported before the classes below derive + # from classes therein. The rest of the main function is at the bottom. + + args = parse_cli() + + if args['pyside']: + from PySide import QtCore, QtGui + else: + try: + # Use version 2 API in all cases, because that's what PySide uses. + import sip + sip.setapi('QVariant', 2) + sip.setapi('QDate', 2) + sip.setapi('QDateTime', 2) + sip.setapi('QString', 2) + sip.setapi('QTextStream', 2) + sip.setapi('QTime', 2) + sip.setapi('QUrl', 2) + sip.setapi('QVariant', 2) + + from PyQt4 import QtCore, QtGui + + # Add PySide compatible names. + QtCore.Signal = QtCore.pyqtSignal + QtCore.Slot = QtCore.pyqtSlot + except: + sys.stderr.write('import of PyQt4 failed, using PySide\n') + from PySide import QtCore, QtGui + +class SamplingThread(QtCore.QObject): + '''A class that handles the reception of sigrok packets in the background.''' + + class Worker(QtCore.QObject): + '''Helper class that does the actual work in another thread.''' + + '''Signal emitted when new data arrived.''' + measured = QtCore.Signal(object, object) + + '''Signal emmited in case of an error.''' + error = QtCore.Signal(str) + + def __init__(self, context, drivers): + super(self.__class__, self).__init__() + + self.context = context + self.drivers = drivers + + self.sampling = False + + @QtCore.Slot() + def start_sampling(self): + devices = [] + for name, options in self.drivers: + try: + dr = self.context.drivers[name] + devices.append(dr.scan(**options)[0]) + except: + self.error.emit( + 'Unable to get device for driver "{}".'.format(name)) + return + + self.session = self.context.create_session() + for dev in devices: + self.session.add_device(dev) + dev.open() + self.session.add_datafeed_callback(self.callback) + self.session.start() + self.sampling = True + self.session.run() + + # If sampling is 'True' here, it means that 'stop_sampling()' was + # not called, therefore 'session.run()' ended too early, indicating + # an error. + if self.sampling: + self.error.emit('An error occured during the acquisition.') + + def stop_sampling(self): + if self.sampling: + self.sampling = False + self.session.stop() + + def callback(self, device, packet): + if not sr: + # In rare cases it can happen that the callback fires while + # the interpreter is shutting down. Then the sigrok module + # is already set to 'None'. + return + + if packet.type == sr.PacketType.ANALOG: + self.measured.emit(device, packet.payload) + + # signal used to start the worker across threads + _start_signal = QtCore.Signal() + + def __init__(self, context, drivers): + super(self.__class__, self).__init__() + + self.worker = self.Worker(context, drivers) + self.thread = QtCore.QThread() + self.worker.moveToThread(self.thread) + + self._start_signal.connect(self.worker.start_sampling) + + # expose the signals of the worker + self.measured = self.worker.measured + self.error = self.worker.error + + self.thread.start() + + def start(self): + '''Starts sampling''' + self._start_signal.emit() + + def stop(self): + '''Stops sampling and the background thread.''' + self.worker.stop_sampling() + self.thread.quit() + self.thread.wait() + +class MeasurementDataModel(QtGui.QStandardItemModel): + '''Model to hold the measured values.''' + + '''Role used to identify and find the item.''' + _idRole = QtCore.Qt.UserRole + 1 + + '''Role used to store the device vendor and model.''' + descRole = QtCore.Qt.UserRole + 2 + + def __init__(self, parent): + super(self.__class__, self).__init__(parent) + + # Use the description text to sort the items for now, because the + # _idRole holds tuples, and using them to sort doesn't work. + self.setSortRole(MeasurementDataModel.descRole) + + # Used in 'format_mag()' to check against. + self.inf = float('inf') + + def format_unit(self, u): + units = { + sr.Unit.VOLT: 'V', + sr.Unit.AMPERE: 'A', + sr.Unit.OHM: u'\u03A9', + sr.Unit.FARAD: 'F', + sr.Unit.KELVIN: 'K', + sr.Unit.CELSIUS: u'\u00B0C', + sr.Unit.FAHRENHEIT: u'\u00B0F', + sr.Unit.HERTZ: 'Hz', + sr.Unit.PERCENTAGE: '%', + # sr.Unit.BOOLEAN + sr.Unit.SECOND: 's', + sr.Unit.SIEMENS: 'S', + sr.Unit.DECIBEL_MW: 'dBu', + sr.Unit.DECIBEL_VOLT: 'dBV', + # sr.Unit.UNITLESS + sr.Unit.DECIBEL_SPL: 'dB', + # sr.Unit.CONCENTRATION + sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', + sr.Unit.VOLT_AMPERE: 'VA', + sr.Unit.WATT: 'W', + sr.Unit.WATT_HOUR: 'Wh', + sr.Unit.METER_SECOND: 'm/s', + sr.Unit.HECTOPASCAL: 'hPa', + sr.Unit.HUMIDITY_293K: '%rF', + sr.Unit.DEGREE: u'\u00B0', + sr.Unit.HENRY: 'H' + } - def on_quit(self, *args): - Gtk.main_quit(*args) + return units.get(u, '') - def on_about(self, action): - about = self.builder.get_object("aboutdialog") - sr_pkg = ll.sr_package_version_string_get() - sr_lib = ll.sr_lib_version_string_get() - s = "Using libsigrok %s (lib version %s)." % (sr_pkg, sr_lib) - about.set_comments(s) - about.run() - about.hide() + def format_mqflags(self, mqflags): + if sr.QuantityFlag.AC in mqflags: + return 'AC' + elif sr.QuantityFlag.DC in mqflags: + return 'DC' + else: + return '' + + def format_mag(self, mag): + if mag == self.inf: + return u'\u221E' + return '{:f}'.format(mag) + + def getItem(self, device, channel): + '''Returns the item for the device + channel combination from the model, + or creates a new item if no existing one matches.''' + + # unique identifier for the device + channel + # TODO: isn't there something better? + uid = ( + device.vendor, + device.model, + device.serial_number(), + device.connection_id(), + channel.index + ) + + # find the correct item in the model + for row in range(self.rowCount()): + item = self.item(row) + rid = item.data(MeasurementDataModel._idRole) + rid = tuple(rid) # PySide returns a list + if uid == rid: + return item + + # nothing found, create a new item + desc = '{} {}, channel "{}"'.format( + device.vendor, device.model, channel.name) + + item = QtGui.QStandardItem() + item.setData(uid, MeasurementDataModel._idRole) + item.setData(desc, MeasurementDataModel.descRole) + self.appendRow(item) + self.sort(0) + return item + + @QtCore.Slot(object, object) + def update(self, device, payload): + '''Updates the data for the device (+channel) with the most recent + measurement from the given payload.''' + + if not len(payload.channels): + return + + # TODO: find a device with multiple channels in one packet + channel = payload.channels[0] + + item = self.getItem(device, channel) + + # the most recent value + mag = payload.data[0][-1] + + unit_str = self.format_unit(payload.unit) + mqflags_str = self.format_mqflags(payload.mq_flags) + mag_str = self.format_mag(mag) + disp = ' '.join([mag_str, unit_str, mqflags_str]) + item.setData(disp, QtCore.Qt.DisplayRole) + +class MultimeterDelegate(QtGui.QStyledItemDelegate): + '''Delegate to show the data items from a MeasurementDataModel.''' + + def __init__(self, parent, font): + '''Initializes the delegate. + + :param font: Font used for the description text, the value is drawn + with a slightly bigger and bold variant of the font. + ''' + + super(self.__class__, self).__init__(parent) + + self._nfont = font + self._bfont = QtGui.QFont(self._nfont) + + self._bfont.setBold(True) + if self._bfont.pixelSize() != -1: + self._bfont.setPixelSize(self._bfont.pixelSize() * 1.8) + else: + self._bfont.setPointSizeF(self._bfont.pointSizeF() * 1.8) + + fi = QtGui.QFontInfo(self._nfont) + self._nfontheight = fi.pixelSize() + + fm = QtGui.QFontMetrics(self._bfont) + r = fm.boundingRect('-XX.XXXXXX X XX') + self._size = QtCore.QSize(r.width() * 1.2, r.height() * 3.5) + + def sizeHint(self, option=None, index=None): + return self._size + + def paint(self, painter, options, index): + value = index.data(QtCore.Qt.DisplayRole) + desc = index.data(MeasurementDataModel.descRole) + + # description in the top left corner + painter.setFont(self._nfont) + p = options.rect.topLeft() + p += QtCore.QPoint(self._nfontheight, 2 * self._nfontheight) + painter.drawText(p, desc) + + # value in the center + painter.setFont(self._bfont) + r = options.rect.adjusted(self._nfontheight, 2.5 * self._nfontheight, + 0, 0) + painter.drawText(r, QtCore.Qt.AlignCenter, value) + +class EmptyMessageListView(QtGui.QListView): + '''List view that shows a message if the model im empty.''' + + def __init__(self, message, parent=None): + super(self.__class__, self).__init__(parent) + + self._message = message + + def paintEvent(self, event): + m = self.model() + if m and m.rowCount(): + super(self.__class__, self).paintEvent(event) + return + + painter = QtGui.QPainter(self.viewport()) + painter.drawText(self.rect(), QtCore.Qt.AlignCenter, self._message) + +class SigrokMeter(QtGui.QMainWindow): + '''The main window of the application.''' + + def __init__(self, context, drivers): + super(SigrokMeter, self).__init__() + + self.context = context + + self.delegate = MultimeterDelegate(self, self.font()) + self.model = MeasurementDataModel(self) + self.model.rowsInserted.connect(self.modelRowsInserted) + + self.setup_ui() + + self.thread = SamplingThread(self.context, drivers) + self.thread.measured.connect(self.model.update) + self.thread.error.connect(self.error) + self.thread.start() + + def setup_ui(self): + self.setWindowTitle('sigrok-meter') + # resizing the listView below will increase this again + self.resize(10, 10) + + p = os.path.abspath(os.path.dirname(__file__)) + p = os.path.join(p, 'sigrok-logo-notext.png') + self.setWindowIcon(QtGui.QIcon(p)) + + actionQuit = QtGui.QAction(self) + actionQuit.setText('&Quit') + actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit')) + actionQuit.setShortcut('Ctrl+Q') + actionQuit.triggered.connect(self.close) + + actionAbout = QtGui.QAction(self) + actionAbout.setText('&About') + actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about')) + actionAbout.triggered.connect(self.show_about) + + menubar = self.menuBar() + menuFile = menubar.addMenu('&File') + menuFile.addAction(actionQuit) + menuHelp = menubar.addMenu('&Help') + menuHelp.addAction(actionAbout) + + self.listView = EmptyMessageListView('waiting for data...') + self.listView.setFrameShape(QtGui.QFrame.NoFrame) + self.listView.viewport().setBackgroundRole(QtGui.QPalette.Window) + self.listView.viewport().setAutoFillBackground(True) + self.listView.setMinimumWidth(260) + self.listView.setSelectionMode(QtGui.QAbstractItemView.NoSelection) + self.listView.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) + self.listView.setVerticalScrollMode(QtGui.QAbstractItemView.ScrollPerPixel) + self.listView.setItemDelegate(self.delegate) + self.listView.setModel(self.model) + self.listView.setUniformItemSizes(True) + self.listView.setMinimumSize(self.delegate.sizeHint()) + + self.setCentralWidget(self.listView) + self.centralWidget().setContentsMargins(0, 0, 0, 0) + + def closeEvent(self, event): + self.thread.stop() + event.accept() + + @QtCore.Slot() + def show_about(self): + text = textwrap.dedent('''\ +
+ sigrok-meter
+ 0.1.0
+ Using libsigrok {} (lib version {}).
+ + http://www.sigrok.org
+
+ This program comes with ABSOLUTELY NO WARRANTY;
+ for details visit + + http://www.gnu.org/licenses/gpl.html +
+ '''.format(self.context.package_version, self.context.lib_version)) + + QtGui.QMessageBox.about(self, 'About sigrok-meter', text) + + @QtCore.Slot(str) + def error(self, msg): + '''Error handler for the sampling thread.''' + QtGui.QMessageBox.critical(self, 'Error', msg) + self.close() + + @QtCore.Slot(object, int, int) + def modelRowsInserted(self, parent, start, end): + '''Resizes the list view to the size of the content.''' + + rows = self.model.rowCount() + dh = self.delegate.sizeHint().height() + self.listView.setMinimumHeight(dh * rows) if __name__ == '__main__': - s = SigrokMeter() - process = Process(target=init_and_run, args=(s.queue,)) - process.start() - Gtk.main() - process.terminate() + context = sr.Context_create() + context.log_level = args['loglevel'] + + app = QtGui.QApplication([]) + s = SigrokMeter(context, args['drivers']) + s.show() + sys.exit(app.exec_())