X-Git-Url: http://sigrok.org/gitweb/?a=blobdiff_plain;f=sigrok-meter;h=8501cfa65ba0c68e727b2d981e168b6e4d309e5a;hb=b9a9a7a1d0ae08f1a937b9c4f356da1dd87a019b;hp=9fc2c2b6fdec7ad80f9bdf38af7a15c524bbddd3;hpb=782f592684ead79ffbc50880a066affdf9995b7a;p=sigrok-meter.git diff --git a/sigrok-meter b/sigrok-meter index 9fc2c2b..8501cfa 100755 --- a/sigrok-meter +++ b/sigrok-meter @@ -4,6 +4,7 @@ ## This file is part of the sigrok-meter project. ## ## Copyright (C) 2013 Uwe Hermann +## Copyright (C) 2014 Jens Steinhauser ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -21,103 +22,484 @@ ## import argparse -from multiprocessing import Process, Queue -from gi.repository import Gtk, GObject +import datetime +import os.path +import re import sigrok.core as sr import sys +import textwrap +default_drivers = [('demo', {'analog_channels': 4})] default_loglevel = sr.LogLevel.WARN -def init_and_run(queue, loglevel): - def datafeed_in(device, packet): - if packet.type == sr.PacketType.ANALOG: - data = packet.payload.data - unit, unit_str = packet.payload.unit, "" - if unit == sr.Unit.VOLT: - unit_str = " V" - elif unit == sr.Unit.OHM: - unit_str = " Ohm" - elif unit == sr.Unit.AMPERE: - unit_str = " A" - mqflags, mqflags_str = packet.payload.mq_flags, "" - if sr.QuantityFlag.AC in mqflags: - mqflags_str = " AC" - elif sr.QuantityFlag.DC in mqflags: - mqflags_str = " DC" - for i in range(packet.payload.num_samples): - dev = "%s %s" % (device.vendor, device.model) - val = "%f%s%s" % (data[0][i], unit_str, mqflags_str) - queue.put((dev, val)) - - context = sr.Context_create() - context.log_level = loglevel - drivers_to_use = ['tecpel-dmm-8061-ser'] - drivers = [context.drivers[d] for d in drivers_to_use] - devices = [d.scan(conn="/dev/ttyUSB0")[0] for d in drivers] - # devices = [d.scan()[0] for d in drivers] - # for dev in devices: - # dev.limit_samples = 1000 - session = context.create_session() - for dev in devices: - session.add_device(dev) - dev.open() - session.add_datafeed_callback(datafeed_in) - session.start() - session.run() - session.stop() - -class SigrokMeter: - def __init__(self): - self.builder = Gtk.Builder() - self.builder.add_from_file("sigrok-meter.glade") - self.builder.connect_signals(self) - self.value_label = self.builder.get_object("value_label") - self.value_label2 = self.builder.get_object("value_label2") - self.win = self.builder.get_object("mainwindow") - self.win.show_all() - self.queue = Queue() - GObject.timeout_add(100, self.update_label_if_needed) - - def update_label_if_needed(self): - try: - t = self.queue.get_nowait() - l = self.value_label if t[0] != "Victor" else self.value_label2 - l.set_text("%s: %s" % (t[0], t[1])) - except: - pass - GObject.timeout_add(100, self.update_label_if_needed) - - def on_quit(self, *args): - Gtk.main_quit(*args) - - def on_about(self, action): - about = self.builder.get_object("aboutdialog") - context = sr.Context_create() - sr_pkg = context.package_version - sr_lib = context.lib_version - s = "Using libsigrok %s (lib version %s)." % (sr_pkg, sr_lib) - about.set_comments(s) - about.run() - about.hide() - -if __name__ == '__main__': +def parse_cli(): parser = argparse.ArgumentParser( - description='Simple sigrok GUI for multimeters and dataloggers.') + description='Simple sigrok GUI for multimeters and dataloggers.', + epilog=textwrap.dedent('''\ + The DRIVER string is the same as for sigrok-cli(1). + + examples: + + %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 + + %(prog)s --driver uni-t-ut61e:conn=1a86.e008 + '''), + formatter_class=argparse.RawDescriptionHelpFormatter) + + parser.add_argument('-d', '--driver', + action='append', + help='The driver to use') parser.add_argument('-l', '--loglevel', type=int, help='Set loglevel (5 is most verbose)') + parser.add_argument('--pyside', + action='store_true', + default=False, + help='Force use of PySide (default is to use PyQt4)') args = parser.parse_args() - loglevel = default_loglevel + result = { + 'drivers': default_drivers, + 'loglevel': default_loglevel, + 'pyside': args.pyside + } + + if args.driver: + result['drivers'] = [] + for d in args.driver: + m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) + if not m: + sys.exit('error parsing option "{}"'.format(d)) + + opts = m.group('opts').split(':')[1:] + opts = [tuple(kv.split('=')) for kv in opts] + opts = dict(opts) + + result['drivers'].append((m.group('name'), opts)) + if args.loglevel != None: try: - loglevel = sr.LogLevel.get(args.loglevel) + result['loglevel'] = sr.LogLevel.get(args.loglevel) except: sys.exit('error: invalid log level') - s = SigrokMeter() - process = Process(target=init_and_run, args=(s.queue, loglevel)) - process.start() - Gtk.main() - process.terminate() + return result + +if __name__ == '__main__': + # The command line parsing and import of the Qt modules is done here, + # so that the modules are imported before the classes below derive + # from classes therein. The rest of the main function is at the bottom. + + args = parse_cli() + + if args['pyside']: + from PySide import QtCore, QtGui + else: + try: + # Use version 2 API in all cases, because that's what PySide uses. + import sip + sip.setapi('QVariant', 2) + sip.setapi('QDate', 2) + sip.setapi('QDateTime', 2) + sip.setapi('QString', 2) + sip.setapi('QTextStream', 2) + sip.setapi('QTime', 2) + sip.setapi('QUrl', 2) + sip.setapi('QVariant', 2) + + from PyQt4 import QtCore, QtGui + + # Add PySide compatible names. + QtCore.Signal = QtCore.pyqtSignal + QtCore.Slot = QtCore.pyqtSlot + except: + sys.stderr.write('import of PyQt4 failed, using PySide\n') + from PySide import QtCore, QtGui + +class SamplingThread(QtCore.QObject): + '''A class that handles the reception of sigrok packets in the background.''' + + class Worker(QtCore.QObject): + '''Helper class that does the actual work in another thread.''' + + '''Signal emitted when new data arrived.''' + measured = QtCore.Signal(object, object) + + '''Signal emmited in case of an error.''' + error = QtCore.Signal(str) + + def __init__(self, drivers, loglevel): + super(self.__class__, self).__init__() + + self.sampling = False + self.drivers = drivers + + self.context = sr.Context_create() + self.context.log_level = loglevel + + self.sr_pkg_version = self.context.package_version + self.sr_lib_version = self.context.lib_version + + @QtCore.Slot() + def start_sampling(self): + devices = [] + for name, options in self.drivers: + try: + dr = self.context.drivers[name] + devices.append(dr.scan(**options)[0]) + except: + self.error.emit( + 'Unable to get device for driver "{}".'.format(name)) + return + + self.session = self.context.create_session() + for dev in devices: + self.session.add_device(dev) + dev.open() + self.session.add_datafeed_callback(self.callback) + self.session.start() + self.sampling = True + self.session.run() + + # If sampling is 'True' here, it means that 'stop_sampling()' was + # not called, therefore 'session.run()' ended too early, indicating + # an error. + if self.sampling: + self.error.emit('An error occured during the acquisition.') + + def stop_sampling(self): + if self.sampling: + self.sampling = False + self.session.stop() + + def callback(self, device, packet): + if packet.type == sr.PacketType.ANALOG: + self.measured.emit(device, packet.payload) + + # wait a short time so that in any case we don't flood the GUI + # with new data (for example if the demo device is used) + self.thread().msleep(100) + + # signal used to start the worker across threads + _start_signal = QtCore.Signal() + + def __init__(self, drivers, loglevel): + super(self.__class__, self).__init__() + + self.worker = self.Worker(drivers, loglevel) + self.thread = QtCore.QThread() + self.worker.moveToThread(self.thread) + + self._start_signal.connect(self.worker.start_sampling) + + # expose the signals of the worker + self.measured = self.worker.measured + self.error = self.worker.error + + self.thread.start() + + def start(self): + '''Starts sampling''' + self._start_signal.emit() + + def stop(self): + '''Stops sampling and the background thread.''' + self.worker.stop_sampling() + self.thread.quit() + # the timeout is needed when the demo device is used, because it + # produces so much outstanding data that quitting takes a long time + self.thread.wait(500) + + def sr_pkg_version(self): + '''Returns the version number of the libsigrok package.''' + return self.worker.sr_pkg_version + + def sr_lib_version(self): + '''Returns the version number fo the libsigrok library.''' + return self.worker.sr_lib_version + +class MeasurementDataModel(QtGui.QStandardItemModel): + '''Model to hold the measured values.''' + + '''Role used to identify and find the item.''' + _idRole = QtCore.Qt.UserRole + 1 + + '''Role used to store the device vendor and model.''' + descRole = QtCore.Qt.UserRole + 2 + + def __init__(self, parent): + super(self.__class__, self).__init__(parent) + + # Use the description text to sort the items for now, because the + # _idRole holds tuples, and using them to sort doesn't work. + self.setSortRole(MeasurementDataModel.descRole) + + # Used in 'format_mag()' to check against. + self.inf = float('inf') + + def format_unit(self, u): + units = { + sr.Unit.VOLT: 'V', + sr.Unit.AMPERE: 'A', + sr.Unit.OHM: u'\u03A9', + sr.Unit.FARAD: 'F', + sr.Unit.KELVIN: 'K', + sr.Unit.CELSIUS: u'\u00B0C', + sr.Unit.FAHRENHEIT: u'\u00B0F', + sr.Unit.HERTZ: 'Hz', + sr.Unit.PERCENTAGE: '%', + # sr.Unit.BOOLEAN + sr.Unit.SECOND: 's', + sr.Unit.SIEMENS: 'S', + sr.Unit.DECIBEL_MW: 'dBu', + sr.Unit.DECIBEL_VOLT: 'dBV', + # sr.Unit.UNITLESS + sr.Unit.DECIBEL_SPL: 'dB', + # sr.Unit.CONCENTRATION + sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', + sr.Unit.VOLT_AMPERE: 'VA', + sr.Unit.WATT: 'W', + sr.Unit.WATT_HOUR: 'Wh', + sr.Unit.METER_SECOND: 'm/s', + sr.Unit.HECTOPASCAL: 'hPa', + sr.Unit.HUMIDITY_293K: '%rF', + sr.Unit.DEGREE: u'\u00B0', + sr.Unit.HENRY: 'H' + } + + return units.get(u, '') + + def format_mqflags(self, mqflags): + if sr.QuantityFlag.AC in mqflags: + return 'AC' + elif sr.QuantityFlag.DC in mqflags: + return 'DC' + else: + return '' + + def format_mag(self, mag): + if mag == self.inf: + return u'\u221E' + return '{:f}'.format(mag) + + def getItem(self, device, channel): + '''Returns the item for the device + channel combination from the model, + or creates a new item if no existing one matches.''' + + # unique identifier for the device + channel + # TODO: isn't there something better? + uid = ( + device.vendor, + device.model, + device.serial_number(), + device.connection_id(), + channel.index + ) + + # find the correct item in the model + for row in range(self.rowCount()): + item = self.item(row) + rid = item.data(MeasurementDataModel._idRole) + rid = tuple(rid) # PySide returns a list + if uid == rid: + return item + + # nothing found, create a new item + desc = '{} {}, channel "{}"'.format( + device.vendor, device.model, channel.name) + + item = QtGui.QStandardItem() + item.setData(uid, MeasurementDataModel._idRole) + item.setData(desc, MeasurementDataModel.descRole) + self.appendRow(item) + self.sort(0) + return item + + @QtCore.Slot(object, object) + def update(self, device, payload): + '''Updates the data for the device (+channel) with the most recent + measurement from the given payload.''' + + if not len(payload.channels): + return + + # TODO: find a device with multiple channels in one packet + channel = payload.channels[0] + + item = self.getItem(device, channel) + + # the most recent value + mag = payload.data[0][-1] + + unit_str = self.format_unit(payload.unit) + mqflags_str = self.format_mqflags(payload.mq_flags) + mag_str = self.format_mag(mag) + disp = ' '.join([mag_str, unit_str, mqflags_str]) + item.setData(disp, QtCore.Qt.DisplayRole) + +class MultimeterDelegate(QtGui.QStyledItemDelegate): + '''Delegate to show the data items from a MeasurementDataModel.''' + + def __init__(self, parent, font): + '''Initializes the delegate. + + :param font: Font used for the description text, the value is drawn + with a slightly bigger and bold variant of the font. + ''' + + super(self.__class__, self).__init__(parent) + + self._nfont = font + self._bfont = QtGui.QFont(self._nfont) + + self._bfont.setBold(True) + if self._bfont.pixelSize() != -1: + self._bfont.setPixelSize(self._bfont.pixelSize() * 1.8) + else: + self._bfont.setPointSizeF(self._bfont.pointSizeF() * 1.8) + + fi = QtGui.QFontInfo(self._nfont) + self._nfontheight = fi.pixelSize() + + fm = QtGui.QFontMetrics(self._bfont) + r = fm.boundingRect('-XX.XXXXXX X XX') + self._size = QtCore.QSize(r.width() * 1.2, r.height() * 3.5) + + def sizeHint(self, option=None, index=None): + return self._size + + def paint(self, painter, options, index): + value = index.data(QtCore.Qt.DisplayRole) + desc = index.data(MeasurementDataModel.descRole) + + # description in the top left corner + painter.setFont(self._nfont) + p = options.rect.topLeft() + p += QtCore.QPoint(self._nfontheight, 2 * self._nfontheight) + painter.drawText(p, desc) + + # value in the center + painter.setFont(self._bfont) + r = options.rect.adjusted(self._nfontheight, 2.5 * self._nfontheight, + 0, 0) + painter.drawText(r, QtCore.Qt.AlignCenter, value) + +class EmptyMessageListView(QtGui.QListView): + '''List view that shows a message if the model im empty.''' + + def __init__(self, message, parent=None): + super(self.__class__, self).__init__(parent) + + self._message = message + + def paintEvent(self, event): + m = self.model() + if m and m.rowCount(): + super(self.__class__, self).paintEvent(event) + return + + painter = QtGui.QPainter(self.viewport()) + painter.drawText(self.rect(), QtCore.Qt.AlignCenter, self._message) + +class SigrokMeter(QtGui.QMainWindow): + '''The main window of the application.''' + + def __init__(self, thread): + super(SigrokMeter, self).__init__() + + self.delegate = MultimeterDelegate(self, self.font()) + self.model = MeasurementDataModel(self) + self.model.rowsInserted.connect(self.modelRowsInserted) + + self.setup_ui() + + self.thread = thread + self.thread.measured.connect(self.model.update) + self.thread.error.connect(self.error) + self.thread.start() + + def setup_ui(self): + self.setWindowTitle('sigrok-meter') + # resizing the listView below will increase this again + self.resize(10, 10) + + p = os.path.abspath(os.path.dirname(__file__)) + p = os.path.join(p, 'sigrok-logo-notext.png') + self.setWindowIcon(QtGui.QIcon(p)) + + actionQuit = QtGui.QAction(self) + actionQuit.setText('&Quit') + actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit')) + actionQuit.setShortcut('Ctrl+Q') + actionQuit.triggered.connect(self.close) + + actionAbout = QtGui.QAction(self) + actionAbout.setText('&About') + actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about')) + actionAbout.triggered.connect(self.show_about) + + menubar = self.menuBar() + menuFile = menubar.addMenu('&File') + menuFile.addAction(actionQuit) + menuHelp = menubar.addMenu('&Help') + menuHelp.addAction(actionAbout) + + self.listView = EmptyMessageListView('waiting for data...') + self.listView.setFrameShape(QtGui.QFrame.NoFrame) + self.listView.viewport().setBackgroundRole(QtGui.QPalette.Window) + self.listView.viewport().setAutoFillBackground(True) + self.listView.setMinimumWidth(260) + self.listView.setSelectionMode(QtGui.QAbstractItemView.NoSelection) + self.listView.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) + self.listView.setVerticalScrollMode(QtGui.QAbstractItemView.ScrollPerPixel) + self.listView.setItemDelegate(self.delegate) + self.listView.setModel(self.model) + self.listView.setUniformItemSizes(True) + self.listView.setMinimumSize(self.delegate.sizeHint()) + + self.setCentralWidget(self.listView) + self.centralWidget().setContentsMargins(0, 0, 0, 0) + + @QtCore.Slot() + def show_about(self): + text = textwrap.dedent('''\ +
+ sigrok-meter
+ 0.1.0
+ Using libsigrok {} (lib version {}).
+ + http://www.sigrok.org
+
+ This program comes with ABSOLUTELY NO WARRANTY;
+ for details visit + + http://www.gnu.org/licenses/gpl.html +
+ '''.format(self.thread.sr_pkg_version(), self.thread.sr_lib_version())) + + QtGui.QMessageBox.about(self, 'About sigrok-meter', text) + + @QtCore.Slot(str) + def error(self, msg): + '''Error handler for the sampling thread.''' + QtGui.QMessageBox.critical(self, 'Error', msg) + self.close() + + @QtCore.Slot(object, int, int) + def modelRowsInserted(self, parent, start, end): + '''Resizes the list view to the size of the content.''' + + rows = self.model.rowCount() + dh = self.delegate.sizeHint().height() + self.listView.setMinimumHeight(dh * rows) + +if __name__ == '__main__': + thread = SamplingThread(args['drivers'], args['loglevel']) + + app = QtGui.QApplication([]) + s = SigrokMeter(thread) + s.show() + r = app.exec_() + thread.stop() + sys.exit(r)