##
import qtcompat
+import re
import sigrok.core as sr
QtCore = qtcompat.QtCore
QtGui = qtcompat.QtGui
class SamplingThread(QtCore.QObject):
- '''A class that handles the reception of sigrok packets in the background.'''
+ '''Class that handles the reception of sigrok packets in the background.'''
class Worker(QtCore.QObject):
'''Helper class that does the actual work in another thread.'''
self.sampling = False
+ def parse_configstring(self, cs):
+ '''Dissect a config string and return the options as a
+ dictionary.'''
+
+ def parse_option(k, v):
+ '''Parse the value for a single option.'''
+ try:
+ ck = sr.ConfigKey.get_by_identifier(k)
+ except:
+ raise ValueError('No option named "{}".'.format(k))
+
+ try:
+ val = ck.parse_string(v)
+ except:
+ raise ValueError(
+ 'Invalid value "{}" for option "{}".'.format(v, k))
+
+ return (k, val)
+
+ if not re.match('(([^:=]+=[^:=]+)(:[^:=]+=[^:=]+)*)?$', cs):
+ raise ValueError(
+ '"{}" is not a valid configuration string.'.format(cs))
+
+ if not cs:
+ return {}
+
+ opts = cs.split(':')
+ opts = [tuple(kv.split('=')) for kv in opts]
+ opts = [parse_option(k, v) for (k, v) in opts]
+ return dict(opts)
+
+ def parse_driverstring(self, ds):
+ '''Dissect the driver string and return a tuple consisting of
+ the driver name and the options (as a dictionary).'''
+
+ m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)$', ds)
+ if not m:
+ raise ValueError('"{}" is not a valid driver string.'.format(ds))
+
+ opts = m.group('opts')[1:]
+ return (m.group('name'), self.parse_configstring(opts))
+
@QtCore.Slot()
def start_sampling(self):
devices = []
- for name, options in self.drivers:
+ for (ds, cs) in self.drivers:
+ # Process driver string.
try:
- dr = self.context.drivers[name]
- devices.append(dr.scan(**options)[0])
- except:
+ (name, opts) = self.parse_driverstring(ds)
+ if not name in self.context.drivers:
+ raise RuntimeError('No driver named "{}".'.format(name))
+
+ driver = self.context.drivers[name]
+ devs = driver.scan(**opts)
+ if not devs:
+ raise RuntimeError('No devices found.')
+
+ device = devs[0]
+ except Exception as e:
self.error.emit(
- 'Unable to get device for driver "{}".'.format(name))
+ 'Error processing driver string:\n{}'.format(e))
return
+ # Process configuration string.
+ try:
+ cfgs = self.parse_configstring(cs)
+ for k, v in cfgs.items():
+ device.config_set(sr.ConfigKey.get_by_identifier(k), v)
+ except Exception as e:
+ self.error.emit(
+ 'Error processing configuration string:\n{}'.format(e))
+ return
+
+ devices.append(device)
+
self.session = self.context.create_session()
for dev in devices:
self.session.add_device(dev)
# TODO: find a device with multiple channels in one packet
channel = packet.payload.channels[0]
- # the most recent value
+ # The most recent value.
value = packet.payload.data[0][-1]
self.measured.emit(device, channel,
(value, packet.payload.unit, packet.payload.mq_flags))
- # signal used to start the worker across threads
+ # Signal used to start the worker across threads.
_start_signal = QtCore.Signal()
def __init__(self, context, drivers):
self._start_signal.connect(self.worker.start_sampling)
- # expose the signals of the worker
+ # Expose the signals of the worker.
self.measured = self.worker.measured
self.error = self.worker.error
self.thread.start()
def start(self):
- '''Starts sampling'''
+ '''Start sampling.'''
self._start_signal.emit()
def stop(self):
- '''Stops sampling and the background thread.'''
+ '''Stop sampling and stop the background thread.'''
self.worker.stop_sampling()
self.thread.quit()
self.thread.wait()