args = parse_cli()
- global qt_signal
-
if args['pyside']:
from PySide import QtCore, QtGui
- qt_signal = QtCore.Signal
else:
try:
+ # Use version 2 API in all cases, because that's what PySide uses.
+ import sip
+ sip.setapi('QVariant', 2)
+ sip.setapi('QDate', 2)
+ sip.setapi('QDateTime', 2)
+ sip.setapi('QString', 2)
+ sip.setapi('QTextStream', 2)
+ sip.setapi('QTime', 2)
+ sip.setapi('QUrl', 2)
+ sip.setapi('QVariant', 2)
+
from PyQt4 import QtCore, QtGui
- qt_signal = QtCore.pyqtSignal
+
+ # Add PySide compatible names.
+ QtCore.Signal = QtCore.pyqtSignal
+ QtCore.Slot = QtCore.pyqtSlot
except:
sys.stderr.write('import of PyQt4 failed, using PySide\n')
from PySide import QtCore, QtGui
- qt_signal = QtCore.Signal
class SamplingThread(QtCore.QObject):
'''A class that handles the reception of sigrok packets in the background.'''
'''Helper class that does the actual work in another thread.'''
'''Signal emitted when new data arrived.'''
- measured = qt_signal(object)
+ measured = QtCore.Signal(object, object)
+
+ '''Signal emmited in case of an error.'''
+ error = QtCore.Signal(str)
def __init__(self, drivers, loglevel):
super(self.__class__, self).__init__()
+ self.sampling = False
+ self.drivers = drivers
+
self.context = sr.Context_create()
self.context.log_level = loglevel
self.sr_pkg_version = self.context.package_version
self.sr_lib_version = self.context.lib_version
- self.devices = []
- for name, options in drivers:
+ @QtCore.Slot()
+ def start_sampling(self):
+ devices = []
+ for name, options in self.drivers:
try:
dr = self.context.drivers[name]
- self.devices.append(dr.scan(**options)[0])
+ devices.append(dr.scan(**options)[0])
except:
- print('error getting device for driver "{}", skipping'.format(name))
-
- if not self.devices:
- print('no devices found')
+ self.error.emit(
+ 'Unable to get device for driver "{}".'.format(name))
+ return
- def start_sampling(self):
self.session = self.context.create_session()
- for dev in self.devices:
+ for dev in devices:
self.session.add_device(dev)
dev.open()
self.session.add_datafeed_callback(self.callback)
self.session.start()
+ self.sampling = True
self.session.run()
+ # If sampling is 'True' here, it means that 'stop_sampling()' was
+ # not called, therefore 'session.run()' ended too early, indicating
+ # an error.
+ if self.sampling:
+ self.error.emit('An error occured during the acquisition.')
+
def stop_sampling(self):
- self.session.stop()
+ if self.sampling:
+ self.sampling = False
+ self.session.stop()
def callback(self, device, packet):
if packet.type == sr.PacketType.ANALOG:
self.thread().msleep(100)
# signal used to start the worker across threads
- _start_signal = qt_signal()
+ _start_signal = QtCore.Signal()
def __init__(self, drivers, loglevel):
super(self.__class__, self).__init__()
self._start_signal.connect(self.worker.start_sampling)
+ # expose the signals of the worker
self.measured = self.worker.measured
+ self.error = self.worker.error
self.thread.start()
self.thread = thread
self.thread.measured.connect(self.update, QtCore.Qt.QueuedConnection)
+ self.thread.error.connect(self.error)
self.thread.start()
def setup_ui(self):
self.statusBar().setSizeGripEnabled(False)
+ @QtCore.Slot()
def show_about(self):
text = textwrap.dedent('''\
<div align="center">
return u'\u221E'
return '{:f}'.format(mag)
+ @QtCore.Slot(object)
def update(self, data):
'''Updates the labels with new measurement values.'''
self.lblDevName.setText(device)
self.lblTime.setText(now)
+ @QtCore.Slot(str)
+ def error(self, msg):
+ '''Error handler for the sampling thread.'''
+ QtGui.QMessageBox.critical(self, 'Error', msg)
+ self.close()
+
if __name__ == '__main__':
thread = SamplingThread(args['drivers'], args['loglevel'])