]> sigrok.org Git - sigrok-meter.git/blob - acquisition.py
doc: update IRC reference to Libera.Chat
[sigrok-meter.git] / acquisition.py
1 ##
2 ## This file is part of the sigrok-meter project.
3 ##
4 ## Copyright (C) 2013 Uwe Hermann <uwe@hermann-uwe.de>
5 ## Copyright (C) 2014 Jens Steinhauser <jens.steinhauser@gmail.com>
6 ##
7 ## This program is free software; you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation; either version 2 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 import qtcompat
22 import re
23 import sigrok.core as sr
24 import time
25
26 QtCore = qtcompat.QtCore
27
28 class Acquisition(QtCore.QObject):
29     '''Class that handles the sigrok session and the reception of data.'''
30
31     '''Signal emitted when new data arrived.'''
32     measured = QtCore.Signal(float, sr.classes.Device, sr.classes.Channel, tuple)
33
34     '''Signal emitted when the session has stopped.'''
35     stopped = QtCore.Signal()
36
37     def __init__(self, context):
38         super(self.__class__, self).__init__()
39
40         self.context = context
41         self.session = self.context.create_session()
42         self.session.add_datafeed_callback(self._datafeed_callback)
43         self.session.set_stopped_callback(self._stopped_callback)
44
45     def _parse_configstring(self, cs):
46         '''Dissect a config string and return the options as a dictionary.'''
47
48         def parse_option(k, v):
49             '''Parse the value for a single option.'''
50             try:
51                 ck = sr.ConfigKey.get_by_identifier(k)
52             except:
53                 raise ValueError('No option named "{}".'.format(k))
54
55             try:
56                 val = ck.parse_string(v)
57             except:
58                 raise ValueError(
59                     'Invalid value "{}" for option "{}".'.format(v, k))
60
61             return (k, val)
62
63         if not re.match('(([^:=]+=[^:=]+)(:[^:=]+=[^:=]+)*)?$', cs):
64             raise ValueError(
65                 '"{}" is not a valid configuration string.'.format(cs))
66
67         if not cs:
68             return {}
69
70         opts = cs.split(':')
71         opts = [tuple(kv.split('=')) for kv in opts]
72         opts = [parse_option(k, v) for (k, v) in opts]
73         return dict(opts)
74
75     def _parse_driverstring(self, ds):
76         '''Dissect the driver string and return a tuple consisting of
77         the driver name and the options (as a dictionary).'''
78
79         m = re.match('(?P<name>[^:]+)(?P<opts>(:[^:=]+=[^:=]+)*)$', ds)
80         if not m:
81             raise ValueError('"{}" is not a valid driver string.'.format(ds))
82
83         opts = m.group('opts')[1:]
84         return (m.group('name'), self._parse_configstring(opts))
85
86     def add_device(self, driverstring, configstring):
87         '''Add a device to the session.'''
88
89         # Process driver string.
90         (name, opts) = self._parse_driverstring(driverstring)
91         if not name in self.context.drivers:
92             raise RuntimeError('No driver named "{}".'.format(name))
93
94         driver = self.context.drivers[name]
95         devs = driver.scan(**opts)
96         if not devs:
97             raise RuntimeError('No devices found.')
98
99         device = devs[0]
100
101         # Process configuration string.
102         cfgs = self._parse_configstring(configstring)
103         for k, v in cfgs.items():
104             device.config_set(sr.ConfigKey.get_by_identifier(k), v)
105
106         self.session.add_device(device)
107         device.open()
108
109     def is_running(self):
110         '''Return whether the session is running.'''
111         return self.session.is_running()
112
113     @QtCore.Slot()
114     def start(self):
115         '''Start the session.'''
116         self.session.start()
117
118     @QtCore.Slot()
119     def stop(self):
120         '''Stop the session.'''
121         if self.is_running():
122             self.session.stop()
123
124     def _datafeed_callback(self, device, packet):
125         now = time.time()
126
127         if packet.type != sr.PacketType.ANALOG:
128             return
129
130         if not len(packet.payload.channels):
131             return
132
133         # TODO: Find a device with multiple channels in one packet.
134         channel = packet.payload.channels[0]
135
136         # The most recent value.
137         value = packet.payload.data[0][-1]
138
139         self.measured.emit(now, device, channel,
140                 (value, packet.payload.unit, packet.payload.mq_flags))
141
142     def _stopped_callback(self, **kwargs):
143         self.stopped.emit()