#!/usr/bin/env python ## ## This file is part of the sigrok-meter project. ## ## Copyright (C) 2013 Uwe Hermann ## Copyright (C) 2014 Jens Steinhauser ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, write to the Free Software ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA ## import argparse import datetime import os.path import re import sigrok.core as sr import sys import textwrap default_drivers = [('demo', {'analog_channels': 1})] default_loglevel = sr.LogLevel.WARN def parse_cli(): parser = argparse.ArgumentParser( description='Simple sigrok GUI for multimeters and dataloggers.', epilog=textwrap.dedent('''\ The DRIVER string is the same as for sigrok-cli(1). examples: %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 %(prog)s --driver uni-t-ut61e:conn=1a86.e008 '''), formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('-d', '--driver', action='append', help='The driver to use') parser.add_argument('-l', '--loglevel', type=int, help='Set loglevel (5 is most verbose)') parser.add_argument('--pyside', action='store_true', default=False, help='Force use of PySide (default is to use PyQt4)') args = parser.parse_args() result = { 'drivers': default_drivers, 'loglevel': default_loglevel, 'pyside': args.pyside } if args.driver: result['drivers'] = [] for d in args.driver: m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) if not m: sys.exit('error parsing option "{}"'.format(d)) opts = m.group('opts').split(':')[1:] opts = [tuple(kv.split('=')) for kv in opts] opts = dict(opts) result['drivers'].append((m.group('name'), opts)) if args.loglevel != None: try: result['loglevel'] = sr.LogLevel.get(args.loglevel) except: sys.exit('error: invalid log level') return result if __name__ == '__main__': # The command line parsing and import of the Qt modules is done here, # so that the modules are imported before the classes below derive # from classes therein. The rest of the main function is at the bottom. args = parse_cli() if args['pyside']: from PySide import QtCore, QtGui else: try: # Use version 2 API in all cases, because that's what PySide uses. import sip sip.setapi('QVariant', 2) sip.setapi('QDate', 2) sip.setapi('QDateTime', 2) sip.setapi('QString', 2) sip.setapi('QTextStream', 2) sip.setapi('QTime', 2) sip.setapi('QUrl', 2) sip.setapi('QVariant', 2) from PyQt4 import QtCore, QtGui # Add PySide compatible names. QtCore.Signal = QtCore.pyqtSignal QtCore.Slot = QtCore.pyqtSlot except: sys.stderr.write('import of PyQt4 failed, using PySide\n') from PySide import QtCore, QtGui class SamplingThread(QtCore.QObject): '''A class that handles the reception of sigrok packets in the background.''' class Worker(QtCore.QObject): '''Helper class that does the actual work in another thread.''' '''Signal emitted when new data arrived.''' measured = QtCore.Signal(object, object) '''Signal emmited in case of an error.''' error = QtCore.Signal(str) def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() self.sampling = False self.drivers = drivers self.context = sr.Context_create() self.context.log_level = loglevel self.sr_pkg_version = self.context.package_version self.sr_lib_version = self.context.lib_version def start_sampling(self): devices = [] for name, options in self.drivers: try: dr = self.context.drivers[name] devices.append(dr.scan(**options)[0]) except: self.error.emit( 'Unable to get device for driver "{}".'.format(name)) return self.session = self.context.create_session() for dev in devices: self.session.add_device(dev) dev.open() self.session.add_datafeed_callback(self.callback) self.session.start() self.sampling = True self.session.run() # If sampling is 'True' here, it means that 'stop_sampling()' was # not called, therefore 'session.run()' ended too early, indicating # an error. if self.sampling: self.error.emit('An error occured during the acquisition.') def stop_sampling(self): if self.sampling: self.sampling = False self.session.stop() def callback(self, device, packet): if packet.type == sr.PacketType.ANALOG: dev = '{} {}'.format(device.vendor, device.model) # only send the most recent value mag = packet.payload.data[0][-1] self.measured.emit((dev, mag, packet.payload.unit, packet.payload.mq_flags)) # wait a short time so that in any case we don't flood the GUI # with new data (for example if the demo device is used) self.thread().msleep(100) # signal used to start the worker across threads _start_signal = QtCore.Signal() def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() self.worker = self.Worker(drivers, loglevel) self.thread = QtCore.QThread() self.worker.moveToThread(self.thread) self._start_signal.connect(self.worker.start_sampling) # expose the signals of the worker self.measured = self.worker.measured self.error = self.worker.error self.thread.start() def start(self): '''Starts sampling''' self._start_signal.emit() def stop(self): '''Stops sampling and the background thread.''' self.worker.stop_sampling() self.thread.quit() # the timeout is needed when the demo device is used, because it # produces so much outstanding data that quitting takes a long time self.thread.wait(500) def sr_pkg_version(self): '''Returns the version number of the libsigrok package.''' return self.worker.sr_pkg_version def sr_lib_version(self): '''Returns the version number fo the libsigrok library.''' return self.worker.sr_lib_version class SigrokMeter(QtGui.QMainWindow): '''The main window of the application.''' def __init__(self, thread): super(SigrokMeter, self).__init__() self.setup_ui() self.inf = float('inf') self.thread = thread self.thread.measured.connect(self.update, QtCore.Qt.QueuedConnection) self.thread.error.connect(self.error) self.thread.start() def setup_ui(self): self.setWindowTitle('sigrok-meter') self.setMinimumHeight(130) self.setMinimumWidth(260) p = os.path.abspath(os.path.dirname(__file__)) p = os.path.join(p, 'sigrok-logo-notext.png') self.setWindowIcon(QtGui.QIcon(p)) actionQuit = QtGui.QAction(self) actionQuit.setText('&Quit') actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit')) actionQuit.setShortcut('Ctrl+Q') actionQuit.triggered.connect(self.close) actionAbout = QtGui.QAction(self) actionAbout.setText('&About') actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about')) actionAbout.triggered.connect(self.show_about) menubar = self.menuBar() menuFile = menubar.addMenu('&File') menuFile.addAction(actionQuit) menuHelp = menubar.addMenu('&Help') menuHelp.addAction(actionAbout) self.lblValue = QtGui.QLabel('waiting for data...') self.lblValue.setAlignment(QtCore.Qt.AlignCenter) font = self.lblValue.font() font.setPointSize(font.pointSize() * 1.7) font.setBold(True) self.lblValue.setFont(font) self.setCentralWidget(self.lblValue) self.centralWidget().setContentsMargins(0, 0, 0, 0) self.lblDevName = QtGui.QLabel() self.lblDevName.setToolTip('Name of used measurement device.') self.statusBar().insertWidget(0, self.lblDevName, 10) self.lblTime = QtGui.QLabel() self.lblTime.setToolTip('Time of the last measurement.') self.statusBar().insertWidget(1, self.lblTime) self.statusBar().setSizeGripEnabled(False) def show_about(self): text = textwrap.dedent('''\
sigrok-meter
0.1.0
Using libsigrok {} (lib version {}).
http://www.sigrok.org

This program comes with ABSOLUTELY NO WARRANTY;
for details visit http://www.gnu.org/licenses/gpl.html
'''.format(self.thread.sr_pkg_version(), self.thread.sr_lib_version())) QtGui.QMessageBox.about(self, 'About sigrok-meter', text) def format_unit(self, u): units = { sr.Unit.VOLT: 'V', sr.Unit.AMPERE: 'A', sr.Unit.OHM: u'\u03A9', sr.Unit.FARAD: 'F', sr.Unit.KELVIN: 'K', sr.Unit.CELSIUS: u'\u00B0C', sr.Unit.FAHRENHEIT: u'\u00B0F', sr.Unit.HERTZ: 'Hz', sr.Unit.PERCENTAGE: '%', # sr.Unit.BOOLEAN sr.Unit.SECOND: 's', sr.Unit.SIEMENS: 'S', sr.Unit.DECIBEL_MW: 'dBu', sr.Unit.DECIBEL_VOLT: 'dBV', # sr.Unit.UNITLESS sr.Unit.DECIBEL_SPL: 'dB', # sr.Unit.CONCENTRATION sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', sr.Unit.VOLT_AMPERE: 'VA', sr.Unit.WATT: 'W', sr.Unit.WATT_HOUR: 'Wh', sr.Unit.METER_SECOND: 'm/s', sr.Unit.HECTOPASCAL: 'hPa', sr.Unit.HUMIDITY_293K: '%rF', sr.Unit.DEGREE: u'\u00B0', sr.Unit.HENRY: 'H' } return units.get(u, '') def format_mqflags(self, mqflags): if sr.QuantityFlag.AC in mqflags: s = 'AC' elif sr.QuantityFlag.DC in mqflags: s = 'DC' else: s = '' return s def format_mag(self, mag): if mag == self.inf: return u'\u221E' return '{:f}'.format(mag) def update(self, data): '''Updates the labels with new measurement values.''' device, mag, unit, mqflags = data unit_str = self.format_unit(unit) mqflags_str = self.format_mqflags(mqflags) mag_str = self.format_mag(mag) value = ' '.join([mag_str, unit_str, mqflags_str]) n = datetime.datetime.now().time() now = '{:02}:{:02}:{:02}.{:03}'.format( n.hour, n.minute, n.second, n.microsecond / 1000) self.lblValue.setText(value) self.lblDevName.setText(device) self.lblTime.setText(now) def error(self, msg): '''Error handler for the sampling thread.''' QtGui.QMessageBox.critical(self, 'Error', msg) self.close() if __name__ == '__main__': thread = SamplingThread(args['drivers'], args['loglevel']) app = QtGui.QApplication([]) s = SigrokMeter(thread) s.show() r = app.exec_() thread.stop() sys.exit(r)