QtGui = qtcompat.QtGui
class SamplingThread(QtCore.QObject):
- '''A class that handles the reception of sigrok packets in the background.'''
+ '''Class that handles the reception of sigrok packets in the background.'''
class Worker(QtCore.QObject):
'''Helper class that does the actual work in another thread.'''
self.sampling = False
- def parse_driverstring(self, ds):
- '''Dissects the driver string and returns a tuple consiting of
- the driver name and the options (as a dictionary).'''
+ def parse_configstring(self, cs):
+ '''Dissect a config string and return the options as a
+ dictionary.'''
def parse_option(k, v):
'''Parse the value for a single option.'''
val = ck.parse_string(v)
except:
raise ValueError(
- 'Invalid value "{}" for option "{}"'.format(v, k))
+ 'Invalid value "{}" for option "{}".'.format(v, k))
return (k, val)
- m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)$', ds)
- if not m:
- raise ValueError('"{}" is not a valid driver string.'.format(ds))
+ if not re.match('(([^:=]+=[^:=]+)(:[^:=]+=[^:=]+)*)?$', cs):
+ raise ValueError(
+ '"{}" is not a valid configuration string.'.format(cs))
- opts = m.group('opts').split(':')[1:]
+ if not cs:
+ return {}
+
+ opts = cs.split(':')
opts = [tuple(kv.split('=')) for kv in opts]
opts = [parse_option(k, v) for (k, v) in opts]
- opts = dict(opts)
+ return dict(opts)
+
+ def parse_driverstring(self, ds):
+ '''Dissect the driver string and return a tuple consisting of
+ the driver name and the options (as a dictionary).'''
+
+ m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)$', ds)
+ if not m:
+ raise ValueError('"{}" is not a valid driver string.'.format(ds))
- return (m.group('name'), opts)
+ opts = m.group('opts')[1:]
+ return (m.group('name'), self.parse_configstring(opts))
@QtCore.Slot()
def start_sampling(self):
devices = []
- for ds in self.drivers:
+ for (ds, cs) in self.drivers:
+ # Process driver string.
try:
(name, opts) = self.parse_driverstring(ds)
if not name in self.context.drivers:
- raise RuntimeError('No driver called "{}".'.format(name))
+ raise RuntimeError('No driver named "{}".'.format(name))
driver = self.context.drivers[name]
devs = driver.scan(**opts)
if not devs:
raise RuntimeError('No devices found.')
- devices.append(devs[0])
+
+ device = devs[0]
except Exception as e:
self.error.emit(
'Error processing driver string:\n{}'.format(e))
return
+ # Process configuration string.
+ try:
+ cfgs = self.parse_configstring(cs)
+ for k, v in cfgs.items():
+ device.config_set(sr.ConfigKey.get_by_identifier(k), v)
+ except Exception as e:
+ self.error.emit(
+ 'Error processing configuration string:\n{}'.format(e))
+ return
+
+ devices.append(device)
+
self.session = self.context.create_session()
for dev in devices:
self.session.add_device(dev)
# TODO: find a device with multiple channels in one packet
channel = packet.payload.channels[0]
- # the most recent value
+ # The most recent value.
value = packet.payload.data[0][-1]
self.measured.emit(device, channel,
(value, packet.payload.unit, packet.payload.mq_flags))
- # signal used to start the worker across threads
+ # Signal used to start the worker across threads.
_start_signal = QtCore.Signal()
def __init__(self, context, drivers):
self._start_signal.connect(self.worker.start_sampling)
- # expose the signals of the worker
+ # Expose the signals of the worker.
self.measured = self.worker.measured
self.error = self.worker.error
self.thread.start()
def start(self):
- '''Starts sampling'''
+ '''Start sampling.'''
self._start_signal.emit()
def stop(self):
- '''Stops sampling and the background thread.'''
+ '''Stop sampling and stop the background thread.'''
self.worker.stop_sampling()
self.thread.quit()
self.thread.wait()