## This file is part of the sigrok-meter project.
##
## Copyright (C) 2013 Uwe Hermann <uwe@hermann-uwe.de>
+## Copyright (C) 2014 Jens Steinhauser <jens.steinhauser@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
##
import argparse
-from multiprocessing import Process, Queue
-from gi.repository import Gtk, GObject
+import datetime
+import os.path
+import PyQt4.QtCore as QtCore
+import PyQt4.QtGui as QtGui
import re
import sigrok.core as sr
import sys
return units.get(u, '')
-def init_and_run(queue, drivers, loglevel):
- def datafeed_in(device, packet):
- if packet.type == sr.PacketType.ANALOG:
- data = packet.payload.data
- unit_str = format_unit(packet.payload.unit)
- mqflags, mqflags_str = packet.payload.mq_flags, ""
-
- if sr.QuantityFlag.AC in mqflags:
- mqflags_str = "AC"
- elif sr.QuantityFlag.DC in mqflags:
- mqflags_str = "DC"
-
- for i in range(packet.payload.num_samples):
- dev = "%s %s" % (device.vendor, device.model)
- mag_str = "%f" % data[0][i]
- val = ' '.join([mag_str, unit_str, mqflags_str])
- queue.put((dev, val))
-
- context = sr.Context_create()
- context.log_level = loglevel
-
- devices = []
- for name, options in drivers:
- try:
- dr = context.drivers[name]
- devices.append(dr.scan(**options)[0])
- except:
- print('error getting device for driver "{}", skipping'.format(name))
-
- if not devices:
- print('no devices found')
- return
-
- session = context.create_session()
- for dev in devices:
- session.add_device(dev)
- dev.open()
- session.add_datafeed_callback(datafeed_in)
- session.start()
- session.run()
- session.stop()
-
-class SigrokMeter:
- def __init__(self):
- self.builder = Gtk.Builder()
- self.builder.add_from_file("sigrok-meter.glade")
- self.builder.connect_signals(self)
- self.value_label = self.builder.get_object("value_label")
- self.value_label2 = self.builder.get_object("value_label2")
- self.win = self.builder.get_object("mainwindow")
- self.win.show_all()
- self.queue = Queue()
- GObject.timeout_add(100, self.update_label_if_needed)
-
- def update_label_if_needed(self):
- try:
- t = self.queue.get_nowait()
- l = self.value_label if t[0] != "Victor" else self.value_label2
- l.set_text("%s: %s" % (t[0], t[1]))
- except:
- pass
- GObject.timeout_add(100, self.update_label_if_needed)
-
- def on_quit(self, *args):
- Gtk.main_quit(*args)
-
- def on_about(self, action):
- about = self.builder.get_object("aboutdialog")
- context = sr.Context_create()
- sr_pkg = context.package_version
- sr_lib = context.lib_version
- s = "Using libsigrok %s (lib version %s)." % (sr_pkg, sr_lib)
- about.set_comments(s)
- about.run()
- about.hide()
+class SamplingThread(QtCore.QObject):
+ '''A class that handles the reception of sigrok packets in the background.'''
+
+ class Worker(QtCore.QObject):
+ '''Helper class that does the actual work in another thread.'''
+
+ '''Signal emitted when new data arrived.'''
+ measured = QtCore.pyqtSignal(object, object)
+
+ def __init__(self, drivers, loglevel):
+ super(self.__class__, self).__init__()
+
+ self.context = sr.Context_create()
+ self.context.log_level = loglevel
+
+ self.sr_pkg_version = self.context.package_version
+ self.sr_lib_version = self.context.lib_version
+
+ self.devices = []
+ for name, options in drivers:
+ try:
+ dr = self.context.drivers[name]
+ self.devices.append(dr.scan(**options)[0])
+ except:
+ print('error getting device for driver "{}", skipping'.format(name))
+
+ if not self.devices:
+ print('no devices found')
+
+ def start_sampling(self):
+ self.session = self.context.create_session()
+ for dev in self.devices:
+ self.session.add_device(dev)
+ dev.open()
+ self.session.add_datafeed_callback(self.callback)
+ self.session.start()
+ self.session.run()
+
+ def stop_sampling(self):
+ self.session.stop()
+
+ def callback(self, device, packet):
+ if packet.type == sr.PacketType.ANALOG:
+ data = packet.payload.data
+ unit_str = format_unit(packet.payload.unit)
+ mqflags, mqflags_str = packet.payload.mq_flags, ""
+
+ if sr.QuantityFlag.AC in mqflags:
+ mqflags_str = "AC"
+ elif sr.QuantityFlag.DC in mqflags:
+ mqflags_str = "DC"
+
+ for i in range(packet.payload.num_samples):
+ dev = "%s %s" % (device.vendor, device.model)
+ mag_str = "%f" % data[0][i]
+ val = ' '.join([mag_str, unit_str, mqflags_str])
+
+ self.measured.emit(dev, val)
+
+ # wait a short time so that in any case we don't flood the GUI
+ # with new data (for example if the demo device is used)
+ self.thread().msleep(100)
+
+ # signal used to start the worker across threads
+ _start_signal = QtCore.pyqtSignal()
+
+ def __init__(self, drivers, loglevel):
+ super(self.__class__, self).__init__()
+
+ self.worker = self.Worker(drivers, loglevel)
+ self.thread = QtCore.QThread()
+ self.worker.moveToThread(self.thread)
+
+ self._start_signal.connect(self.worker.start_sampling)
+
+ self.measured = self.worker.measured
+
+ self.thread.start()
+
+ def start(self):
+ '''Starts sampling'''
+ self._start_signal.emit()
+
+ def stop(self):
+ '''Stops sampling and the background thread.'''
+ self.worker.stop_sampling()
+ self.thread.quit()
+ # the timeout is needed when the demo device is used, because it
+ # produces so much outstanding data that quitting takes a long time
+ self.thread.wait(500)
+
+ def sr_pkg_version(self):
+ '''Returns the version number of the libsigrok package.'''
+ return self.worker.sr_pkg_version
+
+ def sr_lib_version(self):
+ '''Returns the version number fo the libsigrok library.'''
+ return self.worker.sr_lib_version
+
+class SigrokMeter(QtGui.QMainWindow):
+ '''The main window of the application.'''
+
+ def __init__(self, thread):
+ super(SigrokMeter, self).__init__()
+ self.setup_ui()
+
+ self.thread = thread
+ self.thread.measured.connect(self.update, QtCore.Qt.QueuedConnection)
+ self.thread.start()
+
+ def setup_ui(self):
+ self.setWindowTitle('sigrok-meter')
+ self.setMinimumHeight(130)
+ self.setMinimumWidth(260)
+
+ p = os.path.abspath(os.path.dirname(__file__))
+ p = os.path.join(p, 'sigrok-logo-notext.png')
+ self.setWindowIcon(QtGui.QIcon(p))
+
+ actionQuit = QtGui.QAction(self)
+ actionQuit.setText('&Quit')
+ actionQuit.setIcon(QtGui.QIcon.fromTheme('application-exit'))
+ actionQuit.setShortcut('Ctrl+Q')
+ actionQuit.triggered.connect(self.close)
+
+ actionAbout = QtGui.QAction(self)
+ actionAbout.setText('&About')
+ actionAbout.setIcon(QtGui.QIcon.fromTheme('help-about'))
+ actionAbout.triggered.connect(self.show_about)
+
+ menubar = self.menuBar()
+ menuFile = menubar.addMenu('&File')
+ menuFile.addAction(actionQuit)
+ menuHelp = menubar.addMenu('&Help')
+ menuHelp.addAction(actionAbout)
+
+ self.lblValue = QtGui.QLabel('waiting for data...')
+ self.lblValue.setAlignment(QtCore.Qt.AlignCenter)
+ font = self.lblValue.font()
+ font.setPointSize(font.pointSize() * 1.7)
+ font.setBold(True)
+ self.lblValue.setFont(font)
+ self.setCentralWidget(self.lblValue)
+ self.centralWidget().setContentsMargins(0, 0, 0, 0)
+
+ self.lblDevName = QtGui.QLabel()
+ self.lblDevName.setToolTip('Name of used measurement device.')
+ self.statusBar().insertWidget(0, self.lblDevName, 10)
+ self.lblTime = QtGui.QLabel()
+ self.lblTime.setToolTip('Time of the last measurement.')
+ self.statusBar().insertWidget(1, self.lblTime)
+
+ self.statusBar().setSizeGripEnabled(False)
+
+ def show_about(self):
+ text = textwrap.dedent('''\
+ <div align="center">
+ <b>sigrok-meter</b><br/>
+ 0.1.0<br/>
+ Using libsigrok {} (lib version {}).<br/>
+ <a href='http://www.sigrok.org'>
+ http://www.sigrok.org</a><br/>
+ <br/>
+ This program comes with ABSOLUTELY NO WARRANTY;<br/>
+ for details visit
+ <a href='http://www.gnu.org/licenses/gpl.html'>
+ http://www.gnu.org/licenses/gpl.html</a>
+ </div>
+ '''.format(self.thread.sr_pkg_version(), self.thread.sr_lib_version()))
+
+ QtGui.QMessageBox.about(self, 'About sigrok-meter', text)
+
+ def update(self, device, value):
+ '''Updates the labels with new measurement values.'''
+
+ n = datetime.datetime.now().time()
+ now = '{:02}:{:02}:{:02}.{:03}'.format(
+ n.hour, n.minute, n.second, n.microsecond / 1000)
+
+ self.lblValue.setText(value)
+ self.lblDevName.setText(device)
+ self.lblTime.setText(now)
def parse_cli():
parser = argparse.ArgumentParser(
if __name__ == '__main__':
args = parse_cli()
- s = SigrokMeter()
- process = Process(target=init_and_run,
- args=(s.queue, args['drivers'], args['loglevel']))
- process.start()
- Gtk.main()
- process.terminate()
+ thread = SamplingThread(args['drivers'], args['loglevel'])
+
+ app = QtGui.QApplication([])
+ s = SigrokMeter(thread)
+ s.show()
+
+ r = app.exec_()
+ thread.stop()
+ sys.exit(r)