The rest of the main function is at the bottom. args = parse_cli() if args['pyside']: from PySide import QtCore, QtGui else: try: # Use version 2 API in all cases, because that's what PySide uses. import sip sip.setapi('QVariant', 2) sip.setapi('QDate', 2) sip.setapi('QDateTime', 2) sip.setapi('QString', 2) sip.setapi('QTextStream', 2) sip.setapi('QTime', 2) sip.setapi('QUrl', 2) sip.setapi('QVariant', 2) from PyQt4 import QtCore, QtGui # Add PySide compatible names. QtCore.Signal = QtCore.pyqtSignal QtCore.Slot = QtCore.pyqtSlot except: sys.stderr.write('import of PyQt4 failed, using PySide\n') from PySide import QtCore, QtGui class SamplingThread(QtCore.QObject): '''A class that handles the reception of sigrok packets in the background.''' class Worker(QtCore.QObject): '''Helper class that does the actual work in another thread.''' '''Signal emitted when new data arrived.''' measured = QtCore.Signal(object, object) '''Signal emmited in case of an error.''' error = QtCore.Signal(str) def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() self.sampling = False self.drivers = drivers self.context = sr.Context_create() self.context.log_level = loglevel self.sr_pkg_version = self.context.package_version self.sr_lib_version = self.context.lib_version @QtCore.Slot() def start_sampling(self): devices = [] for name, options in self.drivers: try: dr = self.context.drivers[name] devices.append(dr.scan(**options)[0]) except: self.error.emit( 'Unable to get device for driver "{}".'.format(name)) return self.session = self.context.create_session() for dev in devices: self.session.add_device(dev) dev.open() self.session.add_datafeed_callback(self.callback) self.session.start() self.sampling = True self.session.run() # If sampling is 'True' here, it means that 'stop_sampling()' was # not called, therefore 'session.run()' ended too early, indicating # an error. if self.sampling: self.error.emit('An error occured during the acquisition.') def stop_sampling(self): if self.sampling: self.sampling = False self.session.stop() def callback(self, device, packet): if packet.type == sr.PacketType.ANALOG: self.measured.emit(device, packet.payload) # wait a short time so that in any case we don't flood the GUI # with new data (for example if the demo device is used) self.thread().msleep(100) # signal used to start the worker across threads _start_signal = QtCore.Signal() def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() self.worker = self.Worker(drivers, loglevel) self.thread = QtCore.QThread() self.worker.moveToThread(self.thread) self._start_signal.connect(self.worker.start_sampling) # expose the signals of the worker self.measured = self.worker.measured self.error = self.worker.error self.thread.start() def start(self): '''Starts sampling''' self._start_signal.emit() def stop(self): '''Stops sampling and the background thread.''' self.worker.stop_sampling() self.thread.quit() # the timeout is needed when the demo device is used, because it # produces so much outstanding data that quitting takes a long time self.thread.wait(500) def sr_pkg_version(self): '''Returns the version number of the libsigrok package.''' return self.worker.sr_pkg_version def sr_lib_version(self): '''Returns the version number fo the libsigrok library.''' return self.worker.sr_lib_version class MeasurementDataModel(QtGui.QStandardItemModel): '''Model to hold the measured values.''' '''Role used to identify and find the item.''' _idRole = QtCore.Qt.UserRole + 1 '''Role used to store the device vendor and model.''' descRole = QtCore.Qt.UserRole + 2 def __init__(self, parent): super(self.__class__, self).__init__(parent) # Use the description text to sort the items for now, because the # _idRole holds tuples, and using them to sort doesn't work. self.setSortRole(MeasurementDataModel.descRole) # Used in 'format_mag()' to check against. self.inf = float('inf') def format_unit(self, u): units = { sr.Unit.VOLT: 'V', sr.Unit.AMPERE: 'A', sr.Unit.OHM: u'\u03A9', sr.Unit.FARAD: 'F', sr.Unit.KELVIN: 'K', sr.Unit.CELSIUS: u'\u00B0C', sr.Unit.FAHRENHEIT: u'\u00B0F', sr.Unit.HERTZ: 'Hz', sr.Unit.PERCENTAGE: '%', # sr.Unit.BOOLEAN sr.Unit.SECOND: 's', sr.Unit.SIEMENS: 'S', sr.Unit.DECIBEL_MW: 'dBu', sr.Unit.DECIBEL_VOLT: 'dBV', # sr.Unit.UNITLESS sr.Unit.DECIBEL_SPL: 'dB', # sr.Unit.CONCENTRATION sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', sr.Unit.VOLT_AMPERE: 'VA', sr.Unit.WATT: 'W', sr.Unit.WATT_HOUR: 'Wh', sr.Unit.METER_SECOND: 'm/s', sr.Unit.HECTOPASCAL: 'hPa', sr.Unit.HUMIDITY_293K: '%rF', sr.Unit.DEGREE: u'\u00B0', sr.Unit.HENRY: 'H' } return units.get(u, '') def format_mqflags(self, mqflags): if sr.QuantityFlag.AC in mqflags: return 'AC' elif sr.QuantityFlag.DC in mqflags: return 'DC' else: return '' def format_mag(self, mag): if mag == self.inf: return u'\u221E' return '{:f}'.format(mag) def getItem(self, device, channel): '''Returns the item for the device + channel combination from the model, or creates a new item if no existing one matches.''' # unique identifier for the device + channel # TODO: isn't there something better? uid = ( device.vendor, device.model, device.serial_number(), device.connection_id(), channel.index ) # find the correct item in the model for row in range(self.rowCount()): item = self.item(row) rid = item.data(MeasurementDataModel._idRole) rid = tuple(rid) # PySide returns a list if uid == rid: return item # nothing found, create a new item desc = '{} {}, channel "{}"'.format( device.vendor, device.model, channel.name) item = QtGui.QStandardItem() item.setData(uid, MeasurementDataModel._idRole) item.setData(desc, MeasurementDataModel.descRole) self.appendRow(item) self.sort(0) return item @QtCore.Slot(object, object) def update(self, device, payload): '''Updates the data for the device (+channel) with the most recent measurement from the given payload.''' if not len(payload.channels): return # TODO: find a device with multiple channels in one packet channel = payload.channels[0] item = self.getItem(device, channel) # the most recent value mag = payload.data[0][-1] unit_str = self.format_unit(payload.unit) mqflags_str = self.format_mqflags(payload.mq_flags) mag_str = self.format_mag(mag) disp = ' '.join([mag_str, unit_str, mqflags_str]) item.setData(disp, QtCore.Qt.DisplayRole) class MultimeterDelegate(QtGui.QStyledItemDelegate): '''Delegate to show the data items from a MeasurementDataModel.''' def __init__(self, parent, font): '''Initializes the delegate. :param font: Font used for the description text, the value is drawn with a slightly bigger and bold variant of the font. ''' super(self.__class__, self).__init__(parent) self._nfont = font self._bfont = QtGui.QFont(self._nfont) self._bfont.setBold(True) if self._bfont.pixelSize() != -1: self._bfont.setPixelSize(self._bfont.pixelSize() * 1.8) else: self._bfont.setPointSizeF(self._bfont.pointSizeF() * 1.8) fi = QtGui.QFontInfo(self._nfont) self._nfontheight = fi.pixelSize() fm = QtGui.QFontMetrics(self._bfont) r = fm.boundingRect('-XX.XXXXXX X XX') self._size = QtCore.QSize(r.width() * 1.2, r.height() * 3.5) def sizeHint(self, option=None, index=None): return self._size def paint(self, painter, options, index): value = index.data(QtCore.Qt.DisplayRole) desc = index.data(MeasurementDataModel.descRole) # description in the top left corner painter.setFont(self._nfont) p = options.rect.topLeft() p += QtCore.QPoint(self._nfontheight, 2 * self._nfontheight) painter.drawText(p, desc) # value in the center painter.setFont(self._bfont) r = options.rect.adjusted(self._nfontheight, 2.5 * self._nfontheight, 0, 0) painter.drawText(r, QtCore.Qt.AlignCenter, value) class EmptyMessageListView(QtGui.QListView): '''List view that shows a message if the model im empty.''' def __init__(self, message, parent=None): super(self.__class__, self).__init__(parent) self._message = message def paintEvent(self, event): m = self.model() if m and m.rowCount(): super(self.__class__, self).paintEvent(event) return painter = QtGui.QPainter(self.viewport()) painter.drawText(self.rect(), QtCore.Qt.AlignCenter, self._message) class SigrokMeter(QtGui.QMainWindow): '''The main window of the application.''' def __init__(self, thread): super(SigrokMeter, self).__init__() self.delegate = MultimeterDelegate(self, self.font()) self.model = MeasurementDataModel(self) self.model.rowsInserted.connect(self.modelRowsInserted) self.setup_ui() self.thread = thread self.thread.measured.connect(self.model.update) self.thread.error.connect(self.error) self.thread.start() def setup_ui(self): self.setWindowTitle('sigrok-meter') # resizing the listView below will increase this again self.resize(10, 10) p = os.path.abspath(os.path.dirname(__file__)) p = os.path.join(p, 'sigrok-logo-notext.png') self.setWindowIcon(QtGui.QIcon(p)) actionQuit = QtGui.QAction(self) actionQuit.setText('&Quit') actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit')) actionQuit.setShortcut('Ctrl+Q') actionQuit.triggered.connect(self.close) actionAbout = QtGui.QAction(self) actionAbout.setText('&About') actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about')) actionAbout.triggered.connect(self.show_about) menubar = self.menuBar() menuFile = menubar.addMenu('&File') menuFile.addAction(actionQuit) menuHelp = menubar.addMenu('&Help') menuHelp.addAction(actionAbout) self.listView = EmptyMessageListView('waiting for data...') self.listView.setFrameShape(QtGui.QFrame.NoFrame) self.listView.viewport().setBackgroundRole(QtGui.QPalette.Window) self.listView.viewport().setAutoFillBackground(True) self.listView.setMinimumWidth(260) self.listView.setSelectionMode(QtGui.QAbstractItemView.NoSelection) self.listView.setEditTriggers(QtGui.QAbstractItemView.NoEditTriggers) self.listView.setVerticalScrollMode(QtGui.QAbstractItemView.ScrollPerPixel) self.listView.setItemDelegate(self.delegate) self.listView.setModel(self.model) self.listView.setUniformItemSizes(True) self.listView.setMinimumSize(self.delegate.sizeHint()) self.setCentralWidget(self.listView) self.centralWidget().setContentsMargins(0, 0, 0, 0) @QtCore.Slot() def show_about(self): text = textwrap.dedent('''\
Using libsigrok {} (lib version {}).

This program comes with ABSOLUTELY NO WARRANTY;
for details visit http://www.gnu.org/licenses/gpl.html
'''.format(self.thread.sr_pkg_version(), self.thread.sr_lib_version())) QtGui.QMessageBox.about(self, 'About sigrok-meter', text) @QtCore.Slot(str) def error(self, msg): '''Error handler for the sampling thread.''' QtGui.QMessageBox.critical(self, 'Error', msg) self.close() @QtCore.Slot(object, int, int) def modelRowsInserted(self, parent, start, end): '''Resizes the list view to the size of the content.''' rows = self.model.rowCount() dh = self.delegate.sizeHint().height() self.listView.setMinimumHeight(dh * rows) if __name__ == '__main__': thread = SamplingThread(args['drivers'], args['loglevel']) app = QtGui.QApplication([]) s = SigrokMeter(thread) s.show() r = app.exec_() thread.stop() sys.exit(r)