X-Git-Url: http://sigrok.org/gitweb/?a=blobdiff_plain;f=sigrok-meter;h=f9ea82b868a7d19e2d95ba0eb2d69dd10b0aef46;hb=2f050135e86497a757fe1e0e3ee9b40193405ac3;hp=604890c5ccae60a5e02bf00bc57bfa4ca22393cf;hpb=73f2129ac6a334a627a39a20aeaa7a731b5b6a0f;p=sigrok-meter.git diff --git a/sigrok-meter b/sigrok-meter index 604890c..f9ea82b 100755 --- a/sigrok-meter +++ b/sigrok-meter @@ -24,47 +24,97 @@ import argparse import datetime import os.path -import PyQt4.QtCore as QtCore -import PyQt4.QtGui as QtGui import re import sigrok.core as sr import sys import textwrap -default_drivers = [('demo', {'analog_channels': 1})] +default_drivers = [('demo', {'analog_channels': 4})] default_loglevel = sr.LogLevel.WARN -def format_unit(u): - units = { - sr.Unit.VOLT: 'V', - sr.Unit.AMPERE: 'A', - sr.Unit.OHM: u'\u03A9', - sr.Unit.FARAD: 'F', - sr.Unit.KELVIN: 'K', - sr.Unit.CELSIUS: u'\u00B0C', - sr.Unit.FAHRENHEIT: u'\u00B0F', - sr.Unit.HERTZ: 'Hz', - sr.Unit.PERCENTAGE: '%', - # sr.Unit.BOOLEAN - sr.Unit.SECOND: 's', - sr.Unit.SIEMENS: 'S', - sr.Unit.DECIBEL_MW: 'dBu', - sr.Unit.DECIBEL_VOLT: 'dBV', - # sr.Unit.UNITLESS - sr.Unit.DECIBEL_SPL: 'dB', - # sr.Unit.CONCENTRATION - sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', - sr.Unit.VOLT_AMPERE: 'VA', - sr.Unit.WATT: 'W', - sr.Unit.WATT_HOUR: 'Wh', - sr.Unit.METER_SECOND: 'm/s', - sr.Unit.HECTOPASCAL: 'hPa', - sr.Unit.HUMIDITY_293K: '%rF', - sr.Unit.DEGREE: u'\u00B0', - sr.Unit.HENRY: 'H' +def parse_cli(): + parser = argparse.ArgumentParser( + description='Simple sigrok GUI for multimeters and dataloggers.', + epilog=textwrap.dedent('''\ + The DRIVER string is the same as for sigrok-cli(1). + + examples: + + %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 + + %(prog)s --driver uni-t-ut61e:conn=1a86.e008 + '''), + formatter_class=argparse.RawDescriptionHelpFormatter) + + parser.add_argument('-d', '--driver', + action='append', + help='The driver to use') + parser.add_argument('-l', '--loglevel', + type=int, + help='Set loglevel (5 is most verbose)') + parser.add_argument('--pyside', + action='store_true', + default=False, + help='Force use of PySide (default is to use PyQt4)') + args = parser.parse_args() + + result = { + 'drivers': default_drivers, + 'loglevel': default_loglevel, + 'pyside': args.pyside } - return units.get(u, '') + if args.driver: + result['drivers'] = [] + for d in args.driver: + m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) + if not m: + sys.exit('error parsing option "{}"'.format(d)) + + opts = m.group('opts').split(':')[1:] + opts = [tuple(kv.split('=')) for kv in opts] + opts = dict(opts) + + result['drivers'].append((m.group('name'), opts)) + + if args.loglevel != None: + try: + result['loglevel'] = sr.LogLevel.get(args.loglevel) + except: + sys.exit('error: invalid log level') + + return result + +if __name__ == '__main__': + # The command line parsing and import of the Qt modules is done here, + # so that the modules are imported before the classes below derive + # from classes therein. The rest of the main function is at the bottom. + + args = parse_cli() + + if args['pyside']: + from PySide import QtCore, QtGui + else: + try: + # Use version 2 API in all cases, because that's what PySide uses. + import sip + sip.setapi('QVariant', 2) + sip.setapi('QDate', 2) + sip.setapi('QDateTime', 2) + sip.setapi('QString', 2) + sip.setapi('QTextStream', 2) + sip.setapi('QTime', 2) + sip.setapi('QUrl', 2) + sip.setapi('QVariant', 2) + + from PyQt4 import QtCore, QtGui + + # Add PySide compatible names. + QtCore.Signal = QtCore.pyqtSignal + QtCore.Slot = QtCore.pyqtSlot + except: + sys.stderr.write('import of PyQt4 failed, using PySide\n') + from PySide import QtCore, QtGui class SamplingThread(QtCore.QObject): '''A class that handles the reception of sigrok packets in the background.''' @@ -73,64 +123,71 @@ class SamplingThread(QtCore.QObject): '''Helper class that does the actual work in another thread.''' '''Signal emitted when new data arrived.''' - measured = QtCore.pyqtSignal(object, object) + measured = QtCore.Signal(object, object) + + '''Signal emmited in case of an error.''' + error = QtCore.Signal(str) def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() + self.sampling = False + self.drivers = drivers + self.context = sr.Context_create() self.context.log_level = loglevel self.sr_pkg_version = self.context.package_version self.sr_lib_version = self.context.lib_version - self.devices = [] - for name, options in drivers: + @QtCore.Slot() + def start_sampling(self): + devices = [] + for name, options in self.drivers: try: dr = self.context.drivers[name] - self.devices.append(dr.scan(**options)[0]) + devices.append(dr.scan(**options)[0]) except: - print('error getting device for driver "{}", skipping'.format(name)) + self.error.emit( + 'Unable to get device for driver "{}".'.format(name)) + return - if not self.devices: - print('no devices found') - - def start_sampling(self): self.session = self.context.create_session() - for dev in self.devices: + for dev in devices: self.session.add_device(dev) dev.open() self.session.add_datafeed_callback(self.callback) self.session.start() + self.sampling = True self.session.run() + # If sampling is 'True' here, it means that 'stop_sampling()' was + # not called, therefore 'session.run()' ended too early, indicating + # an error. + if self.sampling: + self.error.emit('An error occured during the acquisition.') + def stop_sampling(self): - self.session.stop() + if self.sampling: + self.sampling = False + self.session.stop() def callback(self, device, packet): - if packet.type == sr.PacketType.ANALOG: - data = packet.payload.data - unit_str = format_unit(packet.payload.unit) - mqflags, mqflags_str = packet.payload.mq_flags, "" - - if sr.QuantityFlag.AC in mqflags: - mqflags_str = "AC" - elif sr.QuantityFlag.DC in mqflags: - mqflags_str = "DC" + if not sr: + # In rare cases it can happen that the callback fires while + # the interpreter is shutting down. Then the sigrok module + # is already set to 'None'. + return - for i in range(packet.payload.num_samples): - dev = "%s %s" % (device.vendor, device.model) - mag_str = "%f" % data[0][i] - val = ' '.join([mag_str, unit_str, mqflags_str]) - - self.measured.emit(dev, val) + if packet.type == sr.PacketType.ANALOG: + self.measured.emit(device, packet.payload) # wait a short time so that in any case we don't flood the GUI # with new data (for example if the demo device is used) self.thread().msleep(100) # signal used to start the worker across threads - _start_signal = QtCore.pyqtSignal() + _start_signal = QtCore.Signal() def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() @@ -141,7 +198,9 @@ class SamplingThread(QtCore.QObject): self._start_signal.connect(self.worker.start_sampling) + # expose the signals of the worker self.measured = self.worker.measured + self.error = self.worker.error self.thread.start() @@ -165,21 +224,210 @@ class SamplingThread(QtCore.QObject): '''Returns the version number fo the libsigrok library.''' return self.worker.sr_lib_version +class MeasurementDataModel(QtGui.QStandardItemModel): + '''Model to hold the measured values.''' + + '''Role used to identify and find the item.''' + _idRole = QtCore.Qt.UserRole + 1 + + '''Role used to store the device vendor and model.''' + descRole = QtCore.Qt.UserRole + 2 + + def __init__(self, parent): + super(self.__class__, self).__init__(parent) + + # Use the description text to sort the items for now, because the + # _idRole holds tuples, and using them to sort doesn't work. + self.setSortRole(MeasurementDataModel.descRole) + + # Used in 'format_mag()' to check against. + self.inf = float('inf') + + def format_unit(self, u): + units = { + sr.Unit.VOLT: 'V', + sr.Unit.AMPERE: 'A', + sr.Unit.OHM: u'\u03A9', + sr.Unit.FARAD: 'F', + sr.Unit.KELVIN: 'K', + sr.Unit.CELSIUS: u'\u00B0C', + sr.Unit.FAHRENHEIT: u'\u00B0F', + sr.Unit.HERTZ: 'Hz', + sr.Unit.PERCENTAGE: '%', + # sr.Unit.BOOLEAN + sr.Unit.SECOND: 's', + sr.Unit.SIEMENS: 'S', + sr.Unit.DECIBEL_MW: 'dBu', + sr.Unit.DECIBEL_VOLT: 'dBV', + # sr.Unit.UNITLESS + sr.Unit.DECIBEL_SPL: 'dB', + # sr.Unit.CONCENTRATION + sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', + sr.Unit.VOLT_AMPERE: 'VA', + sr.Unit.WATT: 'W', + sr.Unit.WATT_HOUR: 'Wh', + sr.Unit.METER_SECOND: 'm/s', + sr.Unit.HECTOPASCAL: 'hPa', + sr.Unit.HUMIDITY_293K: '%rF', + sr.Unit.DEGREE: u'\u00B0', + sr.Unit.HENRY: 'H' + } + + return units.get(u, '') + + def format_mqflags(self, mqflags): + if sr.QuantityFlag.AC in mqflags: + return 'AC' + elif sr.QuantityFlag.DC in mqflags: + return 'DC' + else: + return '' + + def format_mag(self, mag): + if mag == self.inf: + return u'\u221E' + return '{:f}'.format(mag) + + def getItem(self, device, channel): + '''Returns the item for the device + channel combination from the model, + or creates a new item if no existing one matches.''' + + # unique identifier for the device + channel + # TODO: isn't there something better? + uid = ( + device.vendor, + device.model, + device.serial_number(), + device.connection_id(), + channel.index + ) + + # find the correct item in the model + for row in range(self.rowCount()): + item = self.item(row) + rid = item.data(MeasurementDataModel._idRole) + rid = tuple(rid) # PySide returns a list + if uid == rid: + return item + + # nothing found, create a new item + desc = '{} {}, channel "{}"'.format( + device.vendor, device.model, channel.name) + + item = QtGui.QStandardItem() + item.setData(uid, MeasurementDataModel._idRole) + item.setData(desc, MeasurementDataModel.descRole) + self.appendRow(item) + self.sort(0) + return item + + @QtCore.Slot(object, object) + def update(self, device, payload): + '''Updates the data for the device (+channel) with the most recent + measurement from the given payload.''' + + if not len(payload.channels): + return + + # TODO: find a device with multiple channels in one packet + channel = payload.channels[0] + + item = self.getItem(device, channel) + + # the most recent value + mag = payload.data[0][-1] + + unit_str = self.format_unit(payload.unit) + mqflags_str = self.format_mqflags(payload.mq_flags) + mag_str = self.format_mag(mag) + disp = ' '.join([mag_str, unit_str, mqflags_str]) + item.setData(disp, QtCore.Qt.DisplayRole) + +class MultimeterDelegate(QtGui.QStyledItemDelegate): + '''Delegate to show the data items from a MeasurementDataModel.''' + + def __init__(self, parent, font): + '''Initializes the delegate. + + :param font: Font used for the description text, the value is drawn + with a slightly bigger and bold variant of the font. + ''' + + super(self.__class__, self).__init__(parent) + + self._nfont = font + self._bfont = QtGui.QFont(self._nfont) + + self._bfont.setBold(True) + if self._bfont.pixelSize() != -1: + self._bfont.setPixelSize(self._bfont.pixelSize() * 1.8) + else: + self._bfont.setPointSizeF(self._bfont.pointSizeF() * 1.8) + + fi = QtGui.QFontInfo(self._nfont) + self._nfontheight = fi.pixelSize() + + fm = QtGui.QFontMetrics(self._bfont) + r = fm.boundingRect('-XX.XXXXXX X XX') + self._size = QtCore.QSize(r.width() * 1.2, r.height() * 3.5) + + def sizeHint(self, option=None, index=None): + return self._size + + def paint(self, painter, options, index): + value = index.data(QtCore.Qt.DisplayRole) + desc = index.data(MeasurementDataModel.descRole) + + # description in the top left corner + painter.setFont(self._nfont) + p = options.rect.topLeft() + p += QtCore.QPoint(self._nfontheight, 2 * self._nfontheight) + painter.drawText(p, desc) + + # value in the center + painter.setFont(self._bfont) + r = options.rect.adjusted(self._nfontheight, 2.5 * self._nfontheight, + 0, 0) + painter.drawText(r, QtCore.Qt.AlignCenter, value) + +class EmptyMessageListView(QtGui.QListView): + '''List view that shows a message if the model im empty.''' + + def __init__(self, message, parent=None): + super(self.__class__, self).__init__(parent) + + self._message = message + + def paintEvent(self, event): + m = self.model() + if m and m.rowCount(): + super(self.__class__, self).paintEvent(event) + return + + painter = QtGui.QPainter(self.viewport()) + painter.drawText(self.rect(), QtCore.Qt.AlignCenter, self._message) + class SigrokMeter(QtGui.QMainWindow): '''The main window of the application.''' def __init__(self, thread): super(SigrokMeter, self).__init__() + + self.delegate = MultimeterDelegate(self, self.font()) + self.model = MeasurementDataModel(self) + self.model.rowsInserted.connect(self.modelRowsInserted) + self.setup_ui() self.thread = thread - self.thread.measured.connect(self.update, QtCore.Qt.QueuedConnection) + self.thread.measured.connect(self.model.update) + self.thread.error.connect(self.error) self.thread.start() def setup_ui(self): self.setWindowTitle('sigrok-meter') - self.setMinimumHeight(130) - self.setMinimumWidth(260) + # resizing the listView below will increase this again + self.resize(10, 10) p = os.path.abspath(os.path.dirname(__file__)) p = os.path.join(p, 'sigrok-logo-notext.png') @@ -202,24 +450,23 @@ class SigrokMeter(QtGui.QMainWindow): menuHelp = menubar.addMenu('&Help') menuHelp.addAction(actionAbout) - self.lblValue = QtGui.QLabel('waiting for data...') - self.lblValue.setAlignment(QtCore.Qt.AlignCenter) - font = self.lblValue.font() - font.setPointSize(font.pointSize() * 1.7) - font.setBold(True) - self.lblValue.setFont(font) - self.setCentralWidget(self.lblValue) + self.listView = EmptyMessageListView('waiting for data...') + self.listView.setFrameShape(QtGui.QFrame.NoFrame) + self.listView.viewport().setBackgroundRole(QtGui.QPalette.Window) + self.listView.viewport().setAutoFillBackground(True) + self.listView.setMinimumWidth(260) + self.listView.setSelectionMode(QtGui.QAbstractItemView.NoSelection) + self.listView.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) + self.listView.setVerticalScrollMode(QtGui.QAbstractItemView.ScrollPerPixel) + self.listView.setItemDelegate(self.delegate) + self.listView.setModel(self.model) + self.listView.setUniformItemSizes(True) + self.listView.setMinimumSize(self.delegate.sizeHint()) + + self.setCentralWidget(self.listView) self.centralWidget().setContentsMargins(0, 0, 0, 0) - self.lblDevName = QtGui.QLabel() - self.lblDevName.setToolTip('Name of used measurement device.') - self.statusBar().insertWidget(0, self.lblDevName, 10) - self.lblTime = QtGui.QLabel() - self.lblTime.setToolTip('Time of the last measurement.') - self.statusBar().insertWidget(1, self.lblTime) - - self.statusBar().setSizeGripEnabled(False) - + @QtCore.Slot() def show_about(self): text = textwrap.dedent('''\
@@ -238,68 +485,21 @@ class SigrokMeter(QtGui.QMainWindow): QtGui.QMessageBox.about(self, 'About sigrok-meter', text) - def update(self, device, value): - '''Updates the labels with new measurement values.''' - - n = datetime.datetime.now().time() - now = '{:02}:{:02}:{:02}.{:03}'.format( - n.hour, n.minute, n.second, n.microsecond / 1000) - - self.lblValue.setText(value) - self.lblDevName.setText(device) - self.lblTime.setText(now) - -def parse_cli(): - parser = argparse.ArgumentParser( - description='Simple sigrok GUI for multimeters and dataloggers.', - epilog=textwrap.dedent('''\ - The DRIVER string is the same as for sigrok-cli(1). - - examples: - - %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 - - %(prog)s --driver uni-t-ut61e:conn=1a86.e008 - '''), - formatter_class=argparse.RawDescriptionHelpFormatter) - - parser.add_argument('-d', '--driver', - action='append', - help='The driver to use') - parser.add_argument('-l', '--loglevel', - type=int, - help='Set loglevel (5 is most verbose)') - args = parser.parse_args() - - result = { - 'drivers': default_drivers, - 'loglevel': default_loglevel - } + @QtCore.Slot(str) + def error(self, msg): + '''Error handler for the sampling thread.''' + QtGui.QMessageBox.critical(self, 'Error', msg) + self.close() - if args.driver: - result['drivers'] = [] - for d in args.driver: - m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) - if not m: - sys.exit('error parsing option "{}"'.format(d)) + @QtCore.Slot(object, int, int) + def modelRowsInserted(self, parent, start, end): + '''Resizes the list view to the size of the content.''' - opts = m.group('opts').split(':')[1:] - opts = [tuple(kv.split('=')) for kv in opts] - opts = dict(opts) - - result['drivers'].append((m.group('name'), opts)) - - if args.loglevel != None: - try: - result['loglevel'] = sr.LogLevel.get(args.loglevel) - except: - sys.exit('error: invalid log level') - - return result + rows = self.model.rowCount() + dh = self.delegate.sizeHint().height() + self.listView.setMinimumHeight(dh * rows) if __name__ == '__main__': - args = parse_cli() - thread = SamplingThread(args['drivers'], args['loglevel']) app = QtGui.QApplication([])