#!/usr/bin/env python ## ## This file is part of the sigrok-meter project. ## ## Copyright (C) 2013 Uwe Hermann ## Copyright (C) 2014 Jens Steinhauser ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, write to the Free Software ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA ## import argparse import datetime import os.path import re import sigrok.core as sr import sys import textwrap default_drivers = [('demo', {'analog_channels': 1})] default_loglevel = sr.LogLevel.WARN def parse_cli(): parser = argparse.ArgumentParser( description='Simple sigrok GUI for multimeters and dataloggers.', epilog=textwrap.dedent('''\ The DRIVER string is the same as for sigrok-cli(1). examples: %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 %(prog)s --driver uni-t-ut61e:conn=1a86.e008 '''), formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('-d', '--driver', action='append', help='The driver to use') parser.add_argument('-l', '--loglevel', type=int, help='Set loglevel (5 is most verbose)') parser.add_argument('--pyside', action='store_true', default=False, help='Force use of PySide (default is to use PyQt4)') args = parser.parse_args() result = { 'drivers': default_drivers, 'loglevel': default_loglevel, 'pyside': args.pyside } if args.driver: result['drivers'] = [] for d in args.driver: m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) if not m: sys.exit('error parsing option "{}"'.format(d)) opts = m.group('opts').split(':')[1:] opts = [tuple(kv.split('=')) for kv in opts] opts = dict(opts) result['drivers'].append((m.group('name'), opts)) if args.loglevel != None: try: result['loglevel'] = sr.LogLevel.get(args.loglevel) except: sys.exit('error: invalid log level') return result if __name__ == '__main__': # The command line parsing and import of the Qt modules is done here, # so that the modules are imported before the classes below derive # from classes therein. The rest of the main function is at the bottom. args = parse_cli() global qt_signal if args['pyside']: from PySide import QtCore, QtGui qt_signal = QtCore.Signal else: try: from PyQt4 import QtCore, QtGui qt_signal = QtCore.pyqtSignal except: sys.stderr.write('import of PyQt4 failed, using PySide\n') from PySide import QtCore, QtGui qt_signal = QtCore.Signal class SamplingThread(QtCore.QObject): '''A class that handles the reception of sigrok packets in the background.''' class Worker(QtCore.QObject): '''Helper class that does the actual work in another thread.''' '''Signal emitted when new data arrived.''' measured = qt_signal(object) '''Signal emmited in case of an error.''' error = qt_signal(str) def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() self.sampling = False self.drivers = drivers self.context = sr.Context_create() self.context.log_level = loglevel self.sr_pkg_version = self.context.package_version self.sr_lib_version = self.context.lib_version def start_sampling(self): devices = [] for name, options in self.drivers: try: dr = self.context.drivers[name] devices.append(dr.scan(**options)[0]) except: self.error.emit( 'Unable to get device for driver "{}".'.format(name)) return self.session = self.context.create_session() for dev in devices: self.session.add_device(dev) dev.open() self.session.add_datafeed_callback(self.callback) self.session.start() self.sampling = True self.session.run() # If sampling is 'True' here, it means that 'stop_sampling()' was # not called, therefore 'session.run()' ended too early, indicating # an error. if self.sampling: self.error.emit('An error occured during the acquisition.') def stop_sampling(self): if self.sampling: self.sampling = False self.session.stop() def callback(self, device, packet): if packet.type == sr.PacketType.ANALOG: dev = '{} {}'.format(device.vendor, device.model) # only send the most recent value mag = packet.payload.data[0][-1] self.measured.emit((dev, mag, packet.payload.unit, packet.payload.mq_flags)) # wait a short time so that in any case we don't flood the GUI # with new data (for example if the demo device is used) self.thread().msleep(100) # signal used to start the worker across threads _start_signal = qt_signal() def __init__(self, drivers, loglevel): super(self.__class__, self).__init__() self.worker = self.Worker(drivers, loglevel) self.thread = QtCore.QThread() self.worker.moveToThread(self.thread) self._start_signal.connect(self.worker.start_sampling) # expose the signals of the worker self.measured = self.worker.measured self.error = self.worker.error self.thread.start() def start(self): '''Starts sampling''' self._start_signal.emit() def stop(self): '''Stops sampling and the background thread.''' self.worker.stop_sampling() self.thread.quit() # the timeout is needed when the demo device is used, because it # produces so much outstanding data that quitting takes a long time self.thread.wait(500) def sr_pkg_version(self): '''Returns the version number of the libsigrok package.''' return self.worker.sr_pkg_version def sr_lib_version(self): '''Returns the version number fo the libsigrok library.''' return self.worker.sr_lib_version class SigrokMeter(QtGui.QMainWindow): '''The main window of the application.''' def __init__(self, thread): super(SigrokMeter, self).__init__() self.setup_ui() self.inf = float('inf') self.thread = thread self.thread.measured.connect(self.update, QtCore.Qt.QueuedConnection) self.thread.error.connect(self.error) self.thread.start() def setup_ui(self): self.setWindowTitle('sigrok-meter') self.setMinimumHeight(130) self.setMinimumWidth(260) p = os.path.abspath(os.path.dirname(__file__)) p = os.path.join(p, 'sigrok-logo-notext.png') self.setWindowIcon(QtGui.QIcon(p)) actionQuit = QtGui.QAction(self) actionQuit.setText('&Quit') actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit')) actionQuit.setShortcut('Ctrl+Q') actionQuit.triggered.connect(self.close) actionAbout = QtGui.QAction(self) actionAbout.setText('&About') actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about')) actionAbout.triggered.connect(self.show_about) menubar = self.menuBar() menuFile = menubar.addMenu('&File') menuFile.addAction(actionQuit) menuHelp = menubar.addMenu('&Help') menuHelp.addAction(actionAbout) self.lblValue = QtGui.QLabel('waiting for data...') self.lblValue.setAlignment(QtCore.Qt.AlignCenter) font = self.lblValue.font() font.setPointSize(font.pointSize() * 1.7) font.setBold(True) self.lblValue.setFont(font) self.setCentralWidget(self.lblValue) self.centralWidget().setContentsMargins(0, 0, 0, 0) self.lblDevName = QtGui.QLabel() self.lblDevName.setToolTip('Name of used measurement device.') self.statusBar().insertWidget(0, self.lblDevName, 10) self.lblTime = QtGui.QLabel() self.lblTime.setToolTip('Time of the last measurement.') self.statusBar().insertWidget(1, self.lblTime) self.statusBar().setSizeGripEnabled(False) def show_about(self): text = textwrap.dedent('''\
sigrok-meter
0.1.0
Using libsigrok {} (lib version {}).
http://www.sigrok.org

This program comes with ABSOLUTELY NO WARRANTY;
for details visit http://www.gnu.org/licenses/gpl.html
'''.format(self.thread.sr_pkg_version(), self.thread.sr_lib_version())) QtGui.QMessageBox.about(self, 'About sigrok-meter', text) def format_unit(self, u): units = { sr.Unit.VOLT: 'V', sr.Unit.AMPERE: 'A', sr.Unit.OHM: u'\u03A9', sr.Unit.FARAD: 'F', sr.Unit.KELVIN: 'K', sr.Unit.CELSIUS: u'\u00B0C', sr.Unit.FAHRENHEIT: u'\u00B0F', sr.Unit.HERTZ: 'Hz', sr.Unit.PERCENTAGE: '%', # sr.Unit.BOOLEAN sr.Unit.SECOND: 's', sr.Unit.SIEMENS: 'S', sr.Unit.DECIBEL_MW: 'dBu', sr.Unit.DECIBEL_VOLT: 'dBV', # sr.Unit.UNITLESS sr.Unit.DECIBEL_SPL: 'dB', # sr.Unit.CONCENTRATION sr.Unit.REVOLUTIONS_PER_MINUTE: 'rpm', sr.Unit.VOLT_AMPERE: 'VA', sr.Unit.WATT: 'W', sr.Unit.WATT_HOUR: 'Wh', sr.Unit.METER_SECOND: 'm/s', sr.Unit.HECTOPASCAL: 'hPa', sr.Unit.HUMIDITY_293K: '%rF', sr.Unit.DEGREE: u'\u00B0', sr.Unit.HENRY: 'H' } return units.get(u, '') def format_mqflags(self, mqflags): if sr.QuantityFlag.AC in mqflags: s = 'AC' elif sr.QuantityFlag.DC in mqflags: s = 'DC' else: s = '' return s def format_mag(self, mag): if mag == self.inf: return u'\u221E' return '{:f}'.format(mag) def update(self, data): '''Updates the labels with new measurement values.''' device, mag, unit, mqflags = data unit_str = self.format_unit(unit) mqflags_str = self.format_mqflags(mqflags) mag_str = self.format_mag(mag) value = ' '.join([mag_str, unit_str, mqflags_str]) n = datetime.datetime.now().time() now = '{:02}:{:02}:{:02}.{:03}'.format( n.hour, n.minute, n.second, n.microsecond / 1000) self.lblValue.setText(value) self.lblDevName.setText(device) self.lblTime.setText(now) def error(self, msg): '''Error handler for the sampling thread.''' QtGui.QMessageBox.critical(self, 'Error', msg) self.close() if __name__ == '__main__': thread = SamplingThread(args['drivers'], args['loglevel']) app = QtGui.QApplication([]) s = SigrokMeter(thread) s.show() r = app.exec_() thread.stop() sys.exit(r)