import argparse
import datetime
import os.path
-import PyQt4.QtCore as QtCore
-import PyQt4.QtGui as QtGui
import re
import sigrok.core as sr
import sys
import textwrap
-default_drivers = [('demo', {'analog_channels': 1})]
+default_drivers = [('demo', {'analog_channels': 4})]
default_loglevel = sr.LogLevel.WARN
+def parse_cli():
+ parser = argparse.ArgumentParser(
+ description='Simple sigrok GUI for multimeters and dataloggers.',
+ epilog=textwrap.dedent('''\
+ The DRIVER string is the same as for sigrok-cli(1).
+
+ examples:
+
+ %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0
+
+ %(prog)s --driver uni-t-ut61e:conn=1a86.e008
+ '''),
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+
+ parser.add_argument('-d', '--driver',
+ action='append',
+ help='The driver to use')
+ parser.add_argument('-l', '--loglevel',
+ type=int,
+ help='Set loglevel (5 is most verbose)')
+ parser.add_argument('--pyside',
+ action='store_true',
+ default=False,
+ help='Force use of PySide (default is to use PyQt4)')
+ args = parser.parse_args()
+
+ result = {
+ 'drivers': default_drivers,
+ 'loglevel': default_loglevel,
+ 'pyside': args.pyside
+ }
+
+ if args.driver:
+ result['drivers'] = []
+ for d in args.driver:
+ m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)', d)
+ if not m:
+ sys.exit('error parsing option "{}"'.format(d))
+
+ opts = m.group('opts').split(':')[1:]
+ opts = [tuple(kv.split('=')) for kv in opts]
+ opts = dict(opts)
+
+ result['drivers'].append((m.group('name'), opts))
+
+ if args.loglevel != None:
+ try:
+ result['loglevel'] = sr.LogLevel.get(args.loglevel)
+ except:
+ sys.exit('error: invalid log level')
+
+ return result
+
+if __name__ == '__main__':
+ # The command line parsing and import of the Qt modules is done here,
+ # so that the modules are imported before the classes below derive
+ # from classes therein. The rest of the main function is at the bottom.
+
+ args = parse_cli()
+
+ if args['pyside']:
+ from PySide import QtCore, QtGui
+ else:
+ try:
+ # Use version 2 API in all cases, because that's what PySide uses.
+ import sip
+ sip.setapi('QVariant', 2)
+ sip.setapi('QDate', 2)
+ sip.setapi('QDateTime', 2)
+ sip.setapi('QString', 2)
+ sip.setapi('QTextStream', 2)
+ sip.setapi('QTime', 2)
+ sip.setapi('QUrl', 2)
+ sip.setapi('QVariant', 2)
+
+ from PyQt4 import QtCore, QtGui
+
+ # Add PySide compatible names.
+ QtCore.Signal = QtCore.pyqtSignal
+ QtCore.Slot = QtCore.pyqtSlot
+ except:
+ sys.stderr.write('import of PyQt4 failed, using PySide\n')
+ from PySide import QtCore, QtGui
+
class SamplingThread(QtCore.QObject):
'''A class that handles the reception of sigrok packets in the background.'''
'''Helper class that does the actual work in another thread.'''
'''Signal emitted when new data arrived.'''
- measured = QtCore.pyqtSignal(object)
+ measured = QtCore.Signal(object, object, object)
+
+ '''Signal emmited in case of an error.'''
+ error = QtCore.Signal(str)
- def __init__(self, drivers, loglevel):
+ def __init__(self, context, drivers):
super(self.__class__, self).__init__()
- self.context = sr.Context_create()
- self.context.log_level = loglevel
+ self.context = context
+ self.drivers = drivers
- self.sr_pkg_version = self.context.package_version
- self.sr_lib_version = self.context.lib_version
+ self.sampling = False
- self.devices = []
- for name, options in drivers:
+ @QtCore.Slot()
+ def start_sampling(self):
+ devices = []
+ for name, options in self.drivers:
try:
dr = self.context.drivers[name]
- self.devices.append(dr.scan(**options)[0])
+ devices.append(dr.scan(**options)[0])
except:
- print('error getting device for driver "{}", skipping'.format(name))
-
- if not self.devices:
- print('no devices found')
+ self.error.emit(
+ 'Unable to get device for driver "{}".'.format(name))
+ return
- def start_sampling(self):
self.session = self.context.create_session()
- for dev in self.devices:
+ for dev in devices:
self.session.add_device(dev)
dev.open()
self.session.add_datafeed_callback(self.callback)
self.session.start()
+ self.sampling = True
self.session.run()
+ # If sampling is 'True' here, it means that 'stop_sampling()' was
+ # not called, therefore 'session.run()' ended too early, indicating
+ # an error.
+ if self.sampling:
+ self.error.emit('An error occured during the acquisition.')
+
def stop_sampling(self):
- self.session.stop()
+ if self.sampling:
+ self.sampling = False
+ self.session.stop()
def callback(self, device, packet):
- if packet.type == sr.PacketType.ANALOG:
- dev = '{} {}'.format(device.vendor, device.model)
+ if not sr:
+ # In rare cases it can happen that the callback fires while
+ # the interpreter is shutting down. Then the sigrok module
+ # is already set to 'None'.
+ return
- # only send the most recent value
- mag = packet.payload.data[0][-1]
+ if packet.type != sr.PacketType.ANALOG:
+ return
- self.measured.emit((dev, mag, packet.payload.unit,
- packet.payload.mq_flags))
+ if not len(packet.payload.channels):
+ return
- # wait a short time so that in any case we don't flood the GUI
- # with new data (for example if the demo device is used)
- self.thread().msleep(100)
+ # TODO: find a device with multiple channels in one packet
+ channel = packet.payload.channels[0]
+
+ # the most recent value
+ value = packet.payload.data[0][-1]
+
+ self.measured.emit(device, channel,
+ (value, packet.payload.unit, packet.payload.mq_flags))
# signal used to start the worker across threads
- _start_signal = QtCore.pyqtSignal()
+ _start_signal = QtCore.Signal()
- def __init__(self, drivers, loglevel):
+ def __init__(self, context, drivers):
super(self.__class__, self).__init__()
- self.worker = self.Worker(drivers, loglevel)
+ self.worker = self.Worker(context, drivers)
self.thread = QtCore.QThread()
self.worker.moveToThread(self.thread)
self._start_signal.connect(self.worker.start_sampling)
+ # expose the signals of the worker
self.measured = self.worker.measured
+ self.error = self.worker.error
self.thread.start()
'''Stops sampling and the background thread.'''
self.worker.stop_sampling()
self.thread.quit()
- # the timeout is needed when the demo device is used, because it
- # produces so much outstanding data that quitting takes a long time
- self.thread.wait(500)
-
- def sr_pkg_version(self):
- '''Returns the version number of the libsigrok package.'''
- return self.worker.sr_pkg_version
-
- def sr_lib_version(self):
- '''Returns the version number fo the libsigrok library.'''
- return self.worker.sr_lib_version
-
-class SigrokMeter(QtGui.QMainWindow):
- '''The main window of the application.'''
-
- def __init__(self, thread):
- super(SigrokMeter, self).__init__()
- self.setup_ui()
-
- self.inf = float('inf')
-
- self.thread = thread
- self.thread.measured.connect(self.update, QtCore.Qt.QueuedConnection)
- self.thread.start()
-
- def setup_ui(self):
- self.setWindowTitle('sigrok-meter')
- self.setMinimumHeight(130)
- self.setMinimumWidth(260)
-
- p = os.path.abspath(os.path.dirname(__file__))
- p = os.path.join(p, 'sigrok-logo-notext.png')
- self.setWindowIcon(QtGui.QIcon(p))
-
- actionQuit = QtGui.QAction(self)
- actionQuit.setText('&Quit')
- actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit'))
- actionQuit.setShortcut('Ctrl+Q')
- actionQuit.triggered.connect(self.close)
+ self.thread.wait()
- actionAbout = QtGui.QAction(self)
- actionAbout.setText('&About')
- actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about'))
- actionAbout.triggered.connect(self.show_about)
+class MeasurementDataModel(QtGui.QStandardItemModel):
+ '''Model to hold the measured values.'''
- menubar = self.menuBar()
- menuFile = menubar.addMenu('&File')
- menuFile.addAction(actionQuit)
- menuHelp = menubar.addMenu('&Help')
- menuHelp.addAction(actionAbout)
+ '''Role used to identify and find the item.'''
+ _idRole = QtCore.Qt.UserRole + 1
- self.lblValue = QtGui.QLabel('waiting for data...')
- self.lblValue.setAlignment(QtCore.Qt.AlignCenter)
- font = self.lblValue.font()
- font.setPointSize(font.pointSize() * 1.7)
- font.setBold(True)
- self.lblValue.setFont(font)
- self.setCentralWidget(self.lblValue)
- self.centralWidget().setContentsMargins(0, 0, 0, 0)
+ '''Role used to store the device vendor and model.'''
+ descRole = QtCore.Qt.UserRole + 2
- self.lblDevName = QtGui.QLabel()
- self.lblDevName.setToolTip('Name of used measurement device.')
- self.statusBar().insertWidget(0, self.lblDevName, 10)
- self.lblTime = QtGui.QLabel()
- self.lblTime.setToolTip('Time of the last measurement.')
- self.statusBar().insertWidget(1, self.lblTime)
+ def __init__(self, parent):
+ super(self.__class__, self).__init__(parent)
- self.statusBar().setSizeGripEnabled(False)
-
- def show_about(self):
- text = textwrap.dedent('''\
- <div align="center">
- <b>sigrok-meter</b><br/>
- 0.1.0<br/>
- Using libsigrok {} (lib version {}).<br/>
- <a href='http://www.sigrok.org'>
- http://www.sigrok.org</a><br/>
- <br/>
- This program comes with ABSOLUTELY NO WARRANTY;<br/>
- for details visit
- <a href='http://www.gnu.org/licenses/gpl.html'>
- http://www.gnu.org/licenses/gpl.html</a>
- </div>
- '''.format(self.thread.sr_pkg_version(), self.thread.sr_lib_version()))
+ # Use the description text to sort the items for now, because the
+ # _idRole holds tuples, and using them to sort doesn't work.
+ self.setSortRole(MeasurementDataModel.descRole)
- QtGui.QMessageBox.about(self, 'About sigrok-meter', text)
+ # Used in 'format_value()' to check against.
+ self.inf = float('inf')
def format_unit(self, u):
units = {
def format_mqflags(self, mqflags):
if sr.QuantityFlag.AC in mqflags:
- s = 'AC'
+ return 'AC'
elif sr.QuantityFlag.DC in mqflags:
- s = 'DC'
+ return 'DC'
else:
- s = ''
+ return ''
- return s
-
- def format_mag(self, mag):
+ def format_value(self, mag):
if mag == self.inf:
return u'\u221E'
return '{:f}'.format(mag)
- def update(self, data):
- '''Updates the labels with new measurement values.'''
-
- device, mag, unit, mqflags = data
-
+ def getItem(self, device, channel):
+ '''Returns the item for the device + channel combination from the model,
+ or creates a new item if no existing one matches.'''
+
+ # unique identifier for the device + channel
+ # TODO: isn't there something better?
+ uid = (
+ device.vendor,
+ device.model,
+ device.serial_number(),
+ device.connection_id(),
+ channel.index
+ )
+
+ # find the correct item in the model
+ for row in range(self.rowCount()):
+ item = self.item(row)
+ rid = item.data(MeasurementDataModel._idRole)
+ rid = tuple(rid) # PySide returns a list
+ if uid == rid:
+ return item
+
+ # nothing found, create a new item
+ desc = '{} {}, channel "{}"'.format(
+ device.vendor, device.model, channel.name)
+
+ item = QtGui.QStandardItem()
+ item.setData(uid, MeasurementDataModel._idRole)
+ item.setData(desc, MeasurementDataModel.descRole)
+ self.appendRow(item)
+ self.sort(0)
+ return item
+
+ @QtCore.Slot(object, object, object)
+ def update(self, device, channel, data):
+ '''Updates the data for the device (+channel) with the most recent
+ measurement from the given payload.'''
+
+ item = self.getItem(device, channel)
+
+ value, unit, mqflags = data
+ value_str = self.format_value(value)
unit_str = self.format_unit(unit)
mqflags_str = self.format_mqflags(mqflags)
- mag_str = self.format_mag(mag)
- value = ' '.join([mag_str, unit_str, mqflags_str])
- n = datetime.datetime.now().time()
- now = '{:02}:{:02}:{:02}.{:03}'.format(
- n.hour, n.minute, n.second, n.microsecond / 1000)
+ disp = ' '.join([value_str, unit_str, mqflags_str])
+ item.setData(disp, QtCore.Qt.DisplayRole)
- self.lblValue.setText(value)
- self.lblDevName.setText(device)
- self.lblTime.setText(now)
+class MultimeterDelegate(QtGui.QStyledItemDelegate):
+ '''Delegate to show the data items from a MeasurementDataModel.'''
-def parse_cli():
- parser = argparse.ArgumentParser(
- description='Simple sigrok GUI for multimeters and dataloggers.',
- epilog=textwrap.dedent('''\
- The DRIVER string is the same as for sigrok-cli(1).
+ def __init__(self, parent, font):
+ '''Initializes the delegate.
- examples:
+ :param font: Font used for the description text, the value is drawn
+ with a slightly bigger and bold variant of the font.
+ '''
- %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0
+ super(self.__class__, self).__init__(parent)
- %(prog)s --driver uni-t-ut61e:conn=1a86.e008
- '''),
- formatter_class=argparse.RawDescriptionHelpFormatter)
+ self._nfont = font
+ self._bfont = QtGui.QFont(self._nfont)
- parser.add_argument('-d', '--driver',
- action='append',
- help='The driver to use')
- parser.add_argument('-l', '--loglevel',
- type=int,
- help='Set loglevel (5 is most verbose)')
- args = parser.parse_args()
+ self._bfont.setBold(True)
+ if self._bfont.pixelSize() != -1:
+ self._bfont.setPixelSize(self._bfont.pixelSize() * 1.8)
+ else:
+ self._bfont.setPointSizeF(self._bfont.pointSizeF() * 1.8)
- result = {
- 'drivers': default_drivers,
- 'loglevel': default_loglevel
- }
+ fi = QtGui.QFontInfo(self._nfont)
+ self._nfontheight = fi.pixelSize()
- if args.driver:
- result['drivers'] = []
- for d in args.driver:
- m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)', d)
- if not m:
- sys.exit('error parsing option "{}"'.format(d))
+ fm = QtGui.QFontMetrics(self._bfont)
+ r = fm.boundingRect('-XX.XXXXXX X XX')
+ self._size = QtCore.QSize(r.width() * 1.2, r.height() * 3.5)
- opts = m.group('opts').split(':')[1:]
- opts = [tuple(kv.split('=')) for kv in opts]
- opts = dict(opts)
+ def sizeHint(self, option=None, index=None):
+ return self._size
- result['drivers'].append((m.group('name'), opts))
+ def paint(self, painter, options, index):
+ value = index.data(QtCore.Qt.DisplayRole)
+ desc = index.data(MeasurementDataModel.descRole)
- if args.loglevel != None:
- try:
- result['loglevel'] = sr.LogLevel.get(args.loglevel)
- except:
- sys.exit('error: invalid log level')
+ # description in the top left corner
+ painter.setFont(self._nfont)
+ p = options.rect.topLeft()
+ p += QtCore.QPoint(self._nfontheight, 2 * self._nfontheight)
+ painter.drawText(p, desc)
- return result
+ # value in the center
+ painter.setFont(self._bfont)
+ r = options.rect.adjusted(self._nfontheight, 2.5 * self._nfontheight,
+ 0, 0)
+ painter.drawText(r, QtCore.Qt.AlignCenter, value)
-if __name__ == '__main__':
- args = parse_cli()
+class EmptyMessageListView(QtGui.QListView):
+ '''List view that shows a message if the model im empty.'''
+
+ def __init__(self, message, parent=None):
+ super(self.__class__, self).__init__(parent)
+
+ self._message = message
+
+ def paintEvent(self, event):
+ m = self.model()
+ if m and m.rowCount():
+ super(self.__class__, self).paintEvent(event)
+ return
+
+ painter = QtGui.QPainter(self.viewport())
+ painter.drawText(self.rect(), QtCore.Qt.AlignCenter, self._message)
+
+class SigrokMeter(QtGui.QMainWindow):
+ '''The main window of the application.'''
+
+ def __init__(self, context, drivers):
+ super(SigrokMeter, self).__init__()
- thread = SamplingThread(args['drivers'], args['loglevel'])
+ self.context = context
+
+ self.delegate = MultimeterDelegate(self, self.font())
+ self.model = MeasurementDataModel(self)
+ self.model.rowsInserted.connect(self.modelRowsInserted)
+
+ self.setup_ui()
+
+ self.thread = SamplingThread(self.context, drivers)
+ self.thread.measured.connect(self.model.update)
+ self.thread.error.connect(self.error)
+ self.thread.start()
+
+ def setup_ui(self):
+ self.setWindowTitle('sigrok-meter')
+ # resizing the listView below will increase this again
+ self.resize(10, 10)
+
+ p = os.path.abspath(os.path.dirname(__file__))
+ p = os.path.join(p, 'sigrok-logo-notext.png')
+ self.setWindowIcon(QtGui.QIcon(p))
+
+ actionQuit = QtGui.QAction(self)
+ actionQuit.setText('&Quit')
+ actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit'))
+ actionQuit.setShortcut('Ctrl+Q')
+ actionQuit.triggered.connect(self.close)
+
+ actionAbout = QtGui.QAction(self)
+ actionAbout.setText('&About')
+ actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about'))
+ actionAbout.triggered.connect(self.show_about)
+
+ menubar = self.menuBar()
+ menuFile = menubar.addMenu('&File')
+ menuFile.addAction(actionQuit)
+ menuHelp = menubar.addMenu('&Help')
+ menuHelp.addAction(actionAbout)
+
+ self.listView = EmptyMessageListView('waiting for data...')
+ self.listView.setFrameShape(QtGui.QFrame.NoFrame)
+ self.listView.viewport().setBackgroundRole(QtGui.QPalette.Window)
+ self.listView.viewport().setAutoFillBackground(True)
+ self.listView.setMinimumWidth(260)
+ self.listView.setSelectionMode(QtGui.QAbstractItemView.NoSelection)
+ self.listView.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers)
+ self.listView.setVerticalScrollMode(QtGui.QAbstractItemView.ScrollPerPixel)
+ self.listView.setItemDelegate(self.delegate)
+ self.listView.setModel(self.model)
+ self.listView.setUniformItemSizes(True)
+ self.listView.setMinimumSize(self.delegate.sizeHint())
+
+ self.setCentralWidget(self.listView)
+ self.centralWidget().setContentsMargins(0, 0, 0, 0)
+
+ def closeEvent(self, event):
+ self.thread.stop()
+ event.accept()
+
+ @QtCore.Slot()
+ def show_about(self):
+ text = textwrap.dedent('''\
+ <div align="center">
+ <b>sigrok-meter</b><br/>
+ 0.1.0<br/>
+ Using libsigrok {} (lib version {}).<br/>
+ <a href='http://www.sigrok.org'>
+ http://www.sigrok.org</a><br/>
+ <br/>
+ This program comes with ABSOLUTELY NO WARRANTY;<br/>
+ for details visit
+ <a href='http://www.gnu.org/licenses/gpl.html'>
+ http://www.gnu.org/licenses/gpl.html</a>
+ </div>
+ '''.format(self.context.package_version, self.context.lib_version))
+
+ QtGui.QMessageBox.about(self, 'About sigrok-meter', text)
+
+ @QtCore.Slot(str)
+ def error(self, msg):
+ '''Error handler for the sampling thread.'''
+ QtGui.QMessageBox.critical(self, 'Error', msg)
+ self.close()
+
+ @QtCore.Slot(object, int, int)
+ def modelRowsInserted(self, parent, start, end):
+ '''Resizes the list view to the size of the content.'''
+
+ rows = self.model.rowCount()
+ dh = self.delegate.sizeHint().height()
+ self.listView.setMinimumHeight(dh * rows)
+
+if __name__ == '__main__':
+ context = sr.Context_create()
+ context.log_level = args['loglevel']
app = QtGui.QApplication([])
- s = SigrokMeter(thread)
+ s = SigrokMeter(context, args['drivers'])
s.show()
- r = app.exec_()
- thread.stop()
- sys.exit(r)
+ sys.exit(app.exec_())