X-Git-Url: http://sigrok.org/gitweb/?a=blobdiff_plain;f=sigrok-meter;h=884ad2d08781a187db2f83f67615484166061ca8;hb=480cdb7bc04548aec46b7342d6fa1acafa60331c;hp=8501cfa65ba0c68e727b2d981e168b6e4d309e5a;hpb=b9a9a7a1d0ae08f1a937b9c4f356da1dd87a019b;p=sigrok-meter.git diff --git a/sigrok-meter b/sigrok-meter index 8501cfa..884ad2d 100755 --- a/sigrok-meter +++ b/sigrok-meter @@ -1,5 +1,4 @@ #!/usr/bin/env python - ## ## This file is part of the sigrok-meter project. ## @@ -22,35 +21,43 @@ ## import argparse -import datetime -import os.path -import re import sigrok.core as sr import sys import textwrap -default_drivers = [('demo', {'analog_channels': 4})] -default_loglevel = sr.LogLevel.WARN +default_drivers = [('demo:analog_channels=4', 'samplerate=4')] +default_loglevel = 2 def parse_cli(): parser = argparse.ArgumentParser( description='Simple sigrok GUI for multimeters and dataloggers.', epilog=textwrap.dedent('''\ - The DRIVER string is the same as for sigrok-cli(1). + The DRIVER string is the same as for sigrok-cli(1). The nth + CONFIG is applied to the nth DRIVER. If there are more drivers + than configs, the remaining drivers use the default configuration. - examples: + Examples: %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 %(prog)s --driver uni-t-ut61e:conn=1a86.e008 + + %(prog)s --driver demo:analog_channels=1 \\ + --config samplerate=10 '''), formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('-d', '--driver', action='append', + default=[], help='The driver to use') + parser.add_argument('-c', '--config', + action='append', + default=[], + help='Specify device configuration options') parser.add_argument('-l', '--loglevel', type=int, + default=default_loglevel, help='Set loglevel (5 is most verbose)') parser.add_argument('--pyside', action='store_true', @@ -58,448 +65,40 @@ def parse_cli(): help='Force use of PySide (default is to use PyQt4)') args = parser.parse_args() - result = { - 'drivers': default_drivers, - 'loglevel': default_loglevel, - 'pyside': args.pyside - } + if len(args.config) > len(args.driver): + sys.exit('Error: More configurations than drivers given.') + # Merge drivers and configurations into a list of tuples. + setattr(args, 'drivers', []) + if not args.driver: + args.drivers = default_drivers + sys.stderr.write('No driver given, using demo driver.\n') if args.driver: - result['drivers'] = [] - for d in args.driver: - m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) - if not m: - sys.exit('error parsing option "{}"'.format(d)) - - opts = m.group('opts').split(':')[1:] - opts = [tuple(kv.split('=')) for kv in opts] - opts = dict(opts) - - result['drivers'].append((m.group('name'), opts)) - - if args.loglevel != None: - try: - result['loglevel'] = sr.LogLevel.get(args.loglevel) - except: - sys.exit('error: invalid log level') + args.config.extend([''] * (len(args.driver) - len(args.config))) + args.drivers = zip(args.driver, args.config) + del args.driver + del args.config - return result + return args if __name__ == '__main__': - # The command line parsing and import of the Qt modules is done here, - # so that the modules are imported before the classes below derive - # from classes therein. The rest of the main function is at the bottom. - args = parse_cli() - if args['pyside']: - from PySide import QtCore, QtGui - else: - try: - # Use version 2 API in all cases, because that's what PySide uses. - import sip - sip.setapi('QVariant', 2) - sip.setapi('QDate', 2) - sip.setapi('QDateTime', 2) - sip.setapi('QString', 2) - sip.setapi('QTextStream', 2) - sip.setapi('QTime', 2) - sip.setapi('QUrl', 2) - sip.setapi('QVariant', 2) - - from PyQt4 import QtCore, QtGui - - # Add PySide compatible names. - QtCore.Signal = QtCore.pyqtSignal - QtCore.Slot = QtCore.pyqtSlot - except: - sys.stderr.write('import of PyQt4 failed, using PySide\n') - from PySide import QtCore, QtGui - -class SamplingThread(QtCore.QObject): - '''A class that handles the reception of sigrok packets in the background.''' - - class Worker(QtCore.QObject): - '''Helper class that does the actual work in another thread.''' - - '''Signal emitted when new data arrived.''' - measured = QtCore.Signal(object, object) - - '''Signal emmited in case of an error.''' - error = QtCore.Signal(str) - - def __init__(self, drivers, loglevel): - super(self.__class__, self).__init__() - - self.sampling = False - self.drivers = drivers - - self.context = sr.Context_create() - self.context.log_level = loglevel - - self.sr_pkg_version = self.context.package_version - self.sr_lib_version = self.context.lib_version - - @QtCore.Slot() - def start_sampling(self): - devices = [] - for name, options in self.drivers: - try: - dr = self.context.drivers[name] - devices.append(dr.scan(**options)[0]) - except: - self.error.emit( - 'Unable to get device for driver "{}".'.format(name)) - return - - self.session = self.context.create_session() - for dev in devices: - self.session.add_device(dev) - dev.open() - self.session.add_datafeed_callback(self.callback) - self.session.start() - self.sampling = True - self.session.run() - - # If sampling is 'True' here, it means that 'stop_sampling()' was - # not called, therefore 'session.run()' ended too early, indicating - # an error. - if self.sampling: - self.error.emit('An error occured during the acquisition.') - - def stop_sampling(self): - if self.sampling: - self.sampling = False - self.session.stop() - - def callback(self, device, packet): - if packet.type == sr.PacketType.ANALOG: - self.measured.emit(device, packet.payload) - - # wait a short time so that in any case we don't flood the GUI - # with new data (for example if the demo device is used) - self.thread().msleep(100) - - # signal used to start the worker across threads - _start_signal = QtCore.Signal() - - def __init__(self, drivers, loglevel): - super(self.__class__, self).__init__() - - self.worker = self.Worker(drivers, loglevel) - self.thread = QtCore.QThread() - self.worker.moveToThread(self.thread) - - self._start_signal.connect(self.worker.start_sampling) - - # expose the signals of the worker - self.measured = self.worker.measured - self.error = self.worker.error - - self.thread.start() - - def start(self): - '''Starts sampling''' - self._start_signal.emit() - - def stop(self): - '''Stops sampling and the background thread.''' - self.worker.stop_sampling() - self.thread.quit() - # the timeout is needed when the demo device is used, because it - # produces so much outstanding data that quitting takes a long time - self.thread.wait(500) - - def sr_pkg_version(self): - '''Returns the version number of the libsigrok package.''' - return self.worker.sr_pkg_version - - def sr_lib_version(self): - '''Returns the version number fo the libsigrok library.''' - return self.worker.sr_lib_version - -class MeasurementDataModel(QtGui.QStandardItemModel): - '''Model to hold the measured values.''' - - '''Role used to identify and find the item.''' - _idRole = QtCore.Qt.UserRole + 1 - - '''Role used to store the device vendor and model.''' - descRole = QtCore.Qt.UserRole + 2 - - def __init__(self, parent): - super(self.__class__, self).__init__(parent) - - # Use the description text to sort the items for now, because the - # _idRole holds tuples, and using them to sort doesn't work. - self.setSortRole(MeasurementDataModel.descRole) - - # Used in 'format_mag()' to check against. - self.inf = float('inf') - - def format_unit(self, u): - units = { - sr.Unit.VOLT: 'V', - sr.Unit.AMPERE: 'A', - sr.Unit.OHM: u'\u03A9', - sr.Unit.FARAD: 'F', - sr.Unit.KELVIN: 'K', - sr.Unit.CELSIUS: u'\u00B0C', - sr.Unit.FAHRENHEIT: u'\u00B0F', - sr.Unit.HERTZ: 'Hz', - sr.Unit.PERCENTAGE: '%', - # sr.Unit.BOOLEAN - sr.Unit.SECOND: 's', - sr.Unit.SIEMENS: 'S', - sr.Unit.DECIBEL_MW: 'dBu', - sr.Unit.DECIBEL_VOLT: 'dBV', - # sr.Unit.UNITLESS - sr.Unit.DECIBEL_SPL: 'dB', - # sr.Unit.CONCENTRATION - sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', - sr.Unit.VOLT_AMPERE: 'VA', - sr.Unit.WATT: 'W', - sr.Unit.WATT_HOUR: 'Wh', - sr.Unit.METER_SECOND: 'm/s', - sr.Unit.HECTOPASCAL: 'hPa', - sr.Unit.HUMIDITY_293K: '%rF', - sr.Unit.DEGREE: u'\u00B0', - sr.Unit.HENRY: 'H' - } - - return units.get(u, '') - - def format_mqflags(self, mqflags): - if sr.QuantityFlag.AC in mqflags: - return 'AC' - elif sr.QuantityFlag.DC in mqflags: - return 'DC' - else: - return '' - - def format_mag(self, mag): - if mag == self.inf: - return u'\u221E' - return '{:f}'.format(mag) - - def getItem(self, device, channel): - '''Returns the item for the device + channel combination from the model, - or creates a new item if no existing one matches.''' - - # unique identifier for the device + channel - # TODO: isn't there something better? - uid = ( - device.vendor, - device.model, - device.serial_number(), - device.connection_id(), - channel.index - ) + import qtcompat + qtcompat.load_modules(args.pyside) + QtCore = qtcompat.QtCore + QtGui = qtcompat.QtGui + import mainwindow - # find the correct item in the model - for row in range(self.rowCount()): - item = self.item(row) - rid = item.data(MeasurementDataModel._idRole) - rid = tuple(rid) # PySide returns a list - if uid == rid: - return item - - # nothing found, create a new item - desc = '{} {}, channel "{}"'.format( - device.vendor, device.model, channel.name) - - item = QtGui.QStandardItem() - item.setData(uid, MeasurementDataModel._idRole) - item.setData(desc, MeasurementDataModel.descRole) - self.appendRow(item) - self.sort(0) - return item - - @QtCore.Slot(object, object) - def update(self, device, payload): - '''Updates the data for the device (+channel) with the most recent - measurement from the given payload.''' - - if not len(payload.channels): - return - - # TODO: find a device with multiple channels in one packet - channel = payload.channels[0] - - item = self.getItem(device, channel) - - # the most recent value - mag = payload.data[0][-1] - - unit_str = self.format_unit(payload.unit) - mqflags_str = self.format_mqflags(payload.mq_flags) - mag_str = self.format_mag(mag) - disp = ' '.join([mag_str, unit_str, mqflags_str]) - item.setData(disp, QtCore.Qt.DisplayRole) - -class MultimeterDelegate(QtGui.QStyledItemDelegate): - '''Delegate to show the data items from a MeasurementDataModel.''' - - def __init__(self, parent, font): - '''Initializes the delegate. - - :param font: Font used for the description text, the value is drawn - with a slightly bigger and bold variant of the font. - ''' - - super(self.__class__, self).__init__(parent) - - self._nfont = font - self._bfont = QtGui.QFont(self._nfont) - - self._bfont.setBold(True) - if self._bfont.pixelSize() != -1: - self._bfont.setPixelSize(self._bfont.pixelSize() * 1.8) - else: - self._bfont.setPointSizeF(self._bfont.pointSizeF() * 1.8) - - fi = QtGui.QFontInfo(self._nfont) - self._nfontheight = fi.pixelSize() - - fm = QtGui.QFontMetrics(self._bfont) - r = fm.boundingRect('-XX.XXXXXX X XX') - self._size = QtCore.QSize(r.width() * 1.2, r.height() * 3.5) - - def sizeHint(self, option=None, index=None): - return self._size - - def paint(self, painter, options, index): - value = index.data(QtCore.Qt.DisplayRole) - desc = index.data(MeasurementDataModel.descRole) - - # description in the top left corner - painter.setFont(self._nfont) - p = options.rect.topLeft() - p += QtCore.QPoint(self._nfontheight, 2 * self._nfontheight) - painter.drawText(p, desc) - - # value in the center - painter.setFont(self._bfont) - r = options.rect.adjusted(self._nfontheight, 2.5 * self._nfontheight, - 0, 0) - painter.drawText(r, QtCore.Qt.AlignCenter, value) - -class EmptyMessageListView(QtGui.QListView): - '''List view that shows a message if the model im empty.''' - - def __init__(self, message, parent=None): - super(self.__class__, self).__init__(parent) - - self._message = message - - def paintEvent(self, event): - m = self.model() - if m and m.rowCount(): - super(self.__class__, self).paintEvent(event) - return - - painter = QtGui.QPainter(self.viewport()) - painter.drawText(self.rect(), QtCore.Qt.AlignCenter, self._message) - -class SigrokMeter(QtGui.QMainWindow): - '''The main window of the application.''' - - def __init__(self, thread): - super(SigrokMeter, self).__init__() - - self.delegate = MultimeterDelegate(self, self.font()) - self.model = MeasurementDataModel(self) - self.model.rowsInserted.connect(self.modelRowsInserted) - - self.setup_ui() - - self.thread = thread - self.thread.measured.connect(self.model.update) - self.thread.error.connect(self.error) - self.thread.start() - - def setup_ui(self): - self.setWindowTitle('sigrok-meter') - # resizing the listView below will increase this again - self.resize(10, 10) - - p = os.path.abspath(os.path.dirname(__file__)) - p = os.path.join(p, 'sigrok-logo-notext.png') - self.setWindowIcon(QtGui.QIcon(p)) - - actionQuit = QtGui.QAction(self) - actionQuit.setText('&Quit') - actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit')) - actionQuit.setShortcut('Ctrl+Q') - actionQuit.triggered.connect(self.close) - - actionAbout = QtGui.QAction(self) - actionAbout.setText('&About') - actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about')) - actionAbout.triggered.connect(self.show_about) - - menubar = self.menuBar() - menuFile = menubar.addMenu('&File') - menuFile.addAction(actionQuit) - menuHelp = menubar.addMenu('&Help') - menuHelp.addAction(actionAbout) - - self.listView = EmptyMessageListView('waiting for data...') - self.listView.setFrameShape(QtGui.QFrame.NoFrame) - self.listView.viewport().setBackgroundRole(QtGui.QPalette.Window) - self.listView.viewport().setAutoFillBackground(True) - self.listView.setMinimumWidth(260) - self.listView.setSelectionMode(QtGui.QAbstractItemView.NoSelection) - self.listView.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) - self.listView.setVerticalScrollMode(QtGui.QAbstractItemView.ScrollPerPixel) - self.listView.setItemDelegate(self.delegate) - self.listView.setModel(self.model) - self.listView.setUniformItemSizes(True) - self.listView.setMinimumSize(self.delegate.sizeHint()) - - self.setCentralWidget(self.listView) - self.centralWidget().setContentsMargins(0, 0, 0, 0) - - @QtCore.Slot() - def show_about(self): - text = textwrap.dedent('''\ -
- sigrok-meter
- 0.1.0
- Using libsigrok {} (lib version {}).
- - http://www.sigrok.org
-
- This program comes with ABSOLUTELY NO WARRANTY;
- for details visit - - http://www.gnu.org/licenses/gpl.html -
- '''.format(self.thread.sr_pkg_version(), self.thread.sr_lib_version())) - - QtGui.QMessageBox.about(self, 'About sigrok-meter', text) - - @QtCore.Slot(str) - def error(self, msg): - '''Error handler for the sampling thread.''' - QtGui.QMessageBox.critical(self, 'Error', msg) - self.close() - - @QtCore.Slot(object, int, int) - def modelRowsInserted(self, parent, start, end): - '''Resizes the list view to the size of the content.''' - - rows = self.model.rowCount() - dh = self.delegate.sizeHint().height() - self.listView.setMinimumHeight(dh * rows) - -if __name__ == '__main__': - thread = SamplingThread(args['drivers'], args['loglevel']) + context = sr.Context_create() + try: + loglevel = sr.LogLevel.get(args.loglevel) + context.log_level = loglevel + except: + sys.exit('Error: invalid log level.') app = QtGui.QApplication([]) - s = SigrokMeter(thread) + s = mainwindow.MainWindow(context, args.drivers) s.show() - r = app.exec_() - thread.stop() - sys.exit(r) + sys.exit(app.exec_())