#!/usr/bin/env python
-
##
## This file is part of the sigrok-meter project.
##
##
import argparse
-import datetime
-import os.path
-import re
import sigrok.core as sr
import sys
import textwrap
+import signal
-default_drivers = [('demo', {'analog_channels': 4})]
-default_loglevel = sr.LogLevel.WARN
+default_drivers = [('demo:analog_channels=4', 'samplerate=4')]
def parse_cli():
parser = argparse.ArgumentParser(
description='Simple sigrok GUI for multimeters and dataloggers.',
epilog=textwrap.dedent('''\
- The DRIVER string is the same as for sigrok-cli(1).
+ The DRIVER string is the same as for sigrok-cli(1). Multiple
+ DRIVER and CONFIG items can be supplied. The nth CONFIG is applied
+ to the nth DRIVER. If there are more drivers than configs, the
+ remaining drivers use the default configuration.
- examples:
+ Examples:
%(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0
%(prog)s --driver uni-t-ut61e:conn=1a86.e008
+
+ %(prog)s --driver demo:analog_channels=1 \\
+ --config samplerate=10
+
+ %(prog)s --driver voltcraft-k204:conn=/dev/ttyUSB0 \\
+ --driver uni-t-ut61d:conn=1a86.e008 \\
+ --driver uni-t-ut61e-ser:conn=/dev/ttyUSB1
'''),
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-d', '--driver',
action='append',
+ default=[],
help='The driver to use')
+ parser.add_argument('-c', '--config',
+ action='append',
+ default=[],
+ help='Specify device configuration options')
parser.add_argument('-l', '--loglevel',
type=int,
+ default=None,
help='Set loglevel (5 is most verbose)')
parser.add_argument('--pyside',
action='store_true',
help='Force use of PySide (default is to use PyQt4)')
args = parser.parse_args()
- result = {
- 'drivers': default_drivers,
- 'loglevel': default_loglevel,
- 'pyside': args.pyside
- }
+ if len(args.config) > len(args.driver):
+ sys.exit('Error: More configurations than drivers given.')
+ # Merge drivers and configurations into a list of tuples.
+ setattr(args, 'drivers', [])
+ if not args.driver:
+ args.drivers = default_drivers
+ sys.stderr.write('No driver given, using demo driver.\n')
if args.driver:
- result['drivers'] = []
- for d in args.driver:
- m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)', d)
- if not m:
- sys.exit('error parsing option "{}"'.format(d))
-
- opts = m.group('opts').split(':')[1:]
- opts = [tuple(kv.split('=')) for kv in opts]
- opts = dict(opts)
-
- result['drivers'].append((m.group('name'), opts))
+ args.config.extend([''] * (len(args.driver) - len(args.config)))
+ args.drivers = zip(args.driver, args.config)
+ del args.driver
+ del args.config
- if args.loglevel != None:
- try:
- result['loglevel'] = sr.LogLevel.get(args.loglevel)
- except:
- sys.exit('error: invalid log level')
-
- return result
+ return args
if __name__ == '__main__':
- # The command line parsing and import of the Qt modules is done here,
- # so that the modules are imported before the classes below derive
- # from classes therein. The rest of the main function is at the bottom.
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
args = parse_cli()
- if args['pyside']:
- from PySide import QtCore, QtGui
- else:
- try:
- # Use version 2 API in all cases, because that's what PySide uses.
- import sip
- sip.setapi('QVariant', 2)
- sip.setapi('QDate', 2)
- sip.setapi('QDateTime', 2)
- sip.setapi('QString', 2)
- sip.setapi('QTextStream', 2)
- sip.setapi('QTime', 2)
- sip.setapi('QUrl', 2)
- sip.setapi('QVariant', 2)
-
- from PyQt4 import QtCore, QtGui
-
- # Add PySide compatible names.
- QtCore.Signal = QtCore.pyqtSignal
- QtCore.Slot = QtCore.pyqtSlot
- except:
- sys.stderr.write('import of PyQt4 failed, using PySide\n')
- from PySide import QtCore, QtGui
-
-class SamplingThread(QtCore.QObject):
- '''A class that handles the reception of sigrok packets in the background.'''
-
- class Worker(QtCore.QObject):
- '''Helper class that does the actual work in another thread.'''
-
- '''Signal emitted when new data arrived.'''
- measured = QtCore.Signal(object, object)
-
- '''Signal emmited in case of an error.'''
- error = QtCore.Signal(str)
-
- def __init__(self, drivers, loglevel):
- super(self.__class__, self).__init__()
-
- self.sampling = False
- self.drivers = drivers
-
- self.context = sr.Context_create()
- self.context.log_level = loglevel
-
- self.sr_pkg_version = self.context.package_version
- self.sr_lib_version = self.context.lib_version
-
- @QtCore.Slot()
- def start_sampling(self):
- devices = []
- for name, options in self.drivers:
- try:
- dr = self.context.drivers[name]
- devices.append(dr.scan(**options)[0])
- except:
- self.error.emit(
- 'Unable to get device for driver "{}".'.format(name))
- return
-
- self.session = self.context.create_session()
- for dev in devices:
- self.session.add_device(dev)
- dev.open()
- self.session.add_datafeed_callback(self.callback)
- self.session.start()
- self.sampling = True
- self.session.run()
-
- # If sampling is 'True' here, it means that 'stop_sampling()' was
- # not called, therefore 'session.run()' ended too early, indicating
- # an error.
- if self.sampling:
- self.error.emit('An error occured during the acquisition.')
-
- def stop_sampling(self):
- if self.sampling:
- self.sampling = False
- self.session.stop()
-
- def callback(self, device, packet):
- if packet.type == sr.PacketType.ANALOG:
- self.measured.emit(device, packet.payload)
-
- # wait a short time so that in any case we don't flood the GUI
- # with new data (for example if the demo device is used)
- self.thread().msleep(100)
-
- # signal used to start the worker across threads
- _start_signal = QtCore.Signal()
-
- def __init__(self, drivers, loglevel):
- super(self.__class__, self).__init__()
-
- self.worker = self.Worker(drivers, loglevel)
- self.thread = QtCore.QThread()
- self.worker.moveToThread(self.thread)
-
- self._start_signal.connect(self.worker.start_sampling)
-
- # expose the signals of the worker
- self.measured = self.worker.measured
- self.error = self.worker.error
-
- self.thread.start()
-
- def start(self):
- '''Starts sampling'''
- self._start_signal.emit()
-
- def stop(self):
- '''Stops sampling and the background thread.'''
- self.worker.stop_sampling()
- self.thread.quit()
- # the timeout is needed when the demo device is used, because it
- # produces so much outstanding data that quitting takes a long time
- self.thread.wait(500)
-
- def sr_pkg_version(self):
- '''Returns the version number of the libsigrok package.'''
- return self.worker.sr_pkg_version
-
- def sr_lib_version(self):
- '''Returns the version number fo the libsigrok library.'''
- return self.worker.sr_lib_version
-
-class MeasurementDataModel(QtGui.QStandardItemModel):
- '''Model to hold the measured values.'''
-
- '''Role used to identify and find the item.'''
- _idRole = QtCore.Qt.UserRole + 1
-
- '''Role used to store the device vendor and model.'''
- descRole = QtCore.Qt.UserRole + 2
-
- def __init__(self, parent):
- super(self.__class__, self).__init__(parent)
-
- # Use the description text to sort the items for now, because the
- # _idRole holds tuples, and using them to sort doesn't work.
- self.setSortRole(MeasurementDataModel.descRole)
-
- # Used in 'format_mag()' to check against.
- self.inf = float('inf')
-
- def format_unit(self, u):
- units = {
- sr.Unit.VOLT: 'V',
- sr.Unit.AMPERE: 'A',
- sr.Unit.OHM: u'\u03A9',
- sr.Unit.FARAD: 'F',
- sr.Unit.KELVIN: 'K',
- sr.Unit.CELSIUS: u'\u00B0C',
- sr.Unit.FAHRENHEIT: u'\u00B0F',
- sr.Unit.HERTZ: 'Hz',
- sr.Unit.PERCENTAGE: '%',
- # sr.Unit.BOOLEAN
- sr.Unit.SECOND: 's',
- sr.Unit.SIEMENS: 'S',
- sr.Unit.DECIBEL_MW: 'dBu',
- sr.Unit.DECIBEL_VOLT: 'dBV',
- # sr.Unit.UNITLESS
- sr.Unit.DECIBEL_SPL: 'dB',
- # sr.Unit.CONCENTRATION
- sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm',
- sr.Unit.VOLT_AMPERE: 'VA',
- sr.Unit.WATT: 'W',
- sr.Unit.WATT_HOUR: 'Wh',
- sr.Unit.METER_SECOND: 'm/s',
- sr.Unit.HECTOPASCAL: 'hPa',
- sr.Unit.HUMIDITY_293K: '%rF',
- sr.Unit.DEGREE: u'\u00B0',
- sr.Unit.HENRY: 'H'
- }
-
- return units.get(u, '')
-
- def format_mqflags(self, mqflags):
- if sr.QuantityFlag.AC in mqflags:
- return 'AC'
- elif sr.QuantityFlag.DC in mqflags:
- return 'DC'
- else:
- return ''
-
- def format_mag(self, mag):
- if mag == self.inf:
- return u'\u221E'
- return '{:f}'.format(mag)
+ import qtcompat
+ qtcompat.load_modules(args.pyside)
+ QtCore = qtcompat.QtCore
+ QtGui = qtcompat.QtGui
+ import mainwindow
- def getItem(self, device, channel):
- '''Returns the item for the device + channel combination from the model,
- or creates a new item if no existing one matches.'''
-
- # unique identifier for the device + channel
- # TODO: isn't there something better?
- uid = (
- device.vendor,
- device.model,
- device.serial_number(),
- device.connection_id(),
- channel.index
- )
-
- # find the correct item in the model
- for row in range(self.rowCount()):
- item = self.item(row)
- rid = item.data(MeasurementDataModel._idRole)
- rid = tuple(rid) # PySide returns a list
- if uid == rid:
- return item
-
- # nothing found, create a new item
- desc = '{} {}, channel "{}"'.format(
- device.vendor, device.model, channel.name)
-
- item = QtGui.QStandardItem()
- item.setData(uid, MeasurementDataModel._idRole)
- item.setData(desc, MeasurementDataModel.descRole)
- self.appendRow(item)
- self.sort(0)
- return item
-
- @QtCore.Slot(object, object)
- def update(self, device, payload):
- '''Updates the data for the device (+channel) with the most recent
- measurement from the given payload.'''
-
- if not len(payload.channels):
- return
-
- # TODO: find a device with multiple channels in one packet
- channel = payload.channels[0]
-
- item = self.getItem(device, channel)
-
- # the most recent value
- mag = payload.data[0][-1]
-
- unit_str = self.format_unit(payload.unit)
- mqflags_str = self.format_mqflags(payload.mq_flags)
- mag_str = self.format_mag(mag)
- disp = ' '.join([mag_str, unit_str, mqflags_str])
- item.setData(disp, QtCore.Qt.DisplayRole)
-
-class MultimeterDelegate(QtGui.QStyledItemDelegate):
- '''Delegate to show the data items from a MeasurementDataModel.'''
-
- def __init__(self, parent, font):
- '''Initializes the delegate.
-
- :param font: Font used for the description text, the value is drawn
- with a slightly bigger and bold variant of the font.
- '''
-
- super(self.__class__, self).__init__(parent)
-
- self._nfont = font
- self._bfont = QtGui.QFont(self._nfont)
-
- self._bfont.setBold(True)
- if self._bfont.pixelSize() != -1:
- self._bfont.setPixelSize(self._bfont.pixelSize() * 1.8)
- else:
- self._bfont.setPointSizeF(self._bfont.pointSizeF() * 1.8)
-
- fi = QtGui.QFontInfo(self._nfont)
- self._nfontheight = fi.pixelSize()
-
- fm = QtGui.QFontMetrics(self._bfont)
- r = fm.boundingRect('-XX.XXXXXX X XX')
- self._size = QtCore.QSize(r.width() * 1.2, r.height() * 3.5)
-
- def sizeHint(self, option=None, index=None):
- return self._size
-
- def paint(self, painter, options, index):
- value = index.data(QtCore.Qt.DisplayRole)
- desc = index.data(MeasurementDataModel.descRole)
-
- # description in the top left corner
- painter.setFont(self._nfont)
- p = options.rect.topLeft()
- p += QtCore.QPoint(self._nfontheight, 2 * self._nfontheight)
- painter.drawText(p, desc)
-
- # value in the center
- painter.setFont(self._bfont)
- r = options.rect.adjusted(self._nfontheight, 2.5 * self._nfontheight,
- 0, 0)
- painter.drawText(r, QtCore.Qt.AlignCenter, value)
-
-class EmptyMessageListView(QtGui.QListView):
- '''List view that shows a message if the model im empty.'''
-
- def __init__(self, message, parent=None):
- super(self.__class__, self).__init__(parent)
-
- self._message = message
-
- def paintEvent(self, event):
- m = self.model()
- if m and m.rowCount():
- super(self.__class__, self).paintEvent(event)
- return
-
- painter = QtGui.QPainter(self.viewport())
- painter.drawText(self.rect(), QtCore.Qt.AlignCenter, self._message)
-
-class SigrokMeter(QtGui.QMainWindow):
- '''The main window of the application.'''
-
- def __init__(self, thread):
- super(SigrokMeter, self).__init__()
-
- self.delegate = MultimeterDelegate(self, self.font())
- self.model = MeasurementDataModel(self)
- self.model.rowsInserted.connect(self.modelRowsInserted)
-
- self.setup_ui()
-
- self.thread = thread
- self.thread.measured.connect(self.model.update)
- self.thread.error.connect(self.error)
- self.thread.start()
-
- def setup_ui(self):
- self.setWindowTitle('sigrok-meter')
- # resizing the listView below will increase this again
- self.resize(10, 10)
-
- p = os.path.abspath(os.path.dirname(__file__))
- p = os.path.join(p, 'sigrok-logo-notext.png')
- self.setWindowIcon(QtGui.QIcon(p))
-
- actionQuit = QtGui.QAction(self)
- actionQuit.setText('&Quit')
- actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit'))
- actionQuit.setShortcut('Ctrl+Q')
- actionQuit.triggered.connect(self.close)
-
- actionAbout = QtGui.QAction(self)
- actionAbout.setText('&About')
- actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about'))
- actionAbout.triggered.connect(self.show_about)
-
- menubar = self.menuBar()
- menuFile = menubar.addMenu('&File')
- menuFile.addAction(actionQuit)
- menuHelp = menubar.addMenu('&Help')
- menuHelp.addAction(actionAbout)
-
- self.listView = EmptyMessageListView('waiting for data...')
- self.listView.setFrameShape(QtGui.QFrame.NoFrame)
- self.listView.viewport().setBackgroundRole(QtGui.QPalette.Window)
- self.listView.viewport().setAutoFillBackground(True)
- self.listView.setMinimumWidth(260)
- self.listView.setSelectionMode(QtGui.QAbstractItemView.NoSelection)
- self.listView.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
- self.listView.setVerticalScrollMode(QtGui.QAbstractItemView.ScrollPerPixel)
- self.listView.setItemDelegate(self.delegate)
- self.listView.setModel(self.model)
- self.listView.setUniformItemSizes(True)
- self.listView.setMinimumSize(self.delegate.sizeHint())
-
- self.setCentralWidget(self.listView)
- self.centralWidget().setContentsMargins(0, 0, 0, 0)
-
- @QtCore.Slot()
- def show_about(self):
- text = textwrap.dedent('''\
- <div align="center">
- <b>sigrok-meter</b><br/>
- 0.1.0<br/>
- Using libsigrok {} (lib version {}).<br/>
- <a href='http://www.sigrok.org'>
- http://www.sigrok.org</a><br/>
- <br/>
- This program comes with ABSOLUTELY NO WARRANTY;<br/>
- for details visit
- <a href='http://www.gnu.org/licenses/gpl.html'>
- http://www.gnu.org/licenses/gpl.html</a>
- </div>
- '''.format(self.thread.sr_pkg_version(), self.thread.sr_lib_version()))
-
- QtGui.QMessageBox.about(self, 'About sigrok-meter', text)
-
- @QtCore.Slot(str)
- def error(self, msg):
- '''Error handler for the sampling thread.'''
- QtGui.QMessageBox.critical(self, 'Error', msg)
- self.close()
+ app = QtGui.QApplication([])
- @QtCore.Slot(object, int, int)
- def modelRowsInserted(self, parent, start, end):
- '''Resizes the list view to the size of the content.'''
+ # Initialize modules that need a QApplication to exist.
+ import settings
+ settings.init()
+ import icons
+ icons.load_icons()
- rows = self.model.rowCount()
- dh = self.delegate.sizeHint().height()
- self.listView.setMinimumHeight(dh * rows)
+ context = sr.Context_create()
-if __name__ == '__main__':
- thread = SamplingThread(args['drivers'], args['loglevel'])
+ if args.loglevel != None:
+ try:
+ loglevel = sr.LogLevel.get(args.loglevel)
+ settings.logging.level.setValue(loglevel)
+ except:
+ sys.exit('Error: invalid log level.')
- app = QtGui.QApplication([])
- s = SigrokMeter(thread)
+ s = mainwindow.MainWindow(context, args.drivers)
s.show()
- r = app.exec_()
- thread.stop()
- sys.exit(r)
+ sys.exit(app.exec_())