]> sigrok.org Git - sigrok-util.git/blob - util/eevblog-121gw/eev121gw-ble-uart-relay
sigrok-native-macosx: Fix an issue with recent boost versions.
[sigrok-util.git] / util / eevblog-121gw / eev121gw-ble-uart-relay
1 #!/usr/bin/env python
2 ##
3 ## This file is part of the sigrok-util project.
4 ##
5 ## Copyright (C) 2018 Gerhard Sittig <gerhard.sittig@gmx.net>
6 ##
7 ## This program is free software; you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation; either version 2 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 """
22 Disguise EEVblog 121GW Bluetooth communication as a COM port (devel HACK).
23
24 This script runs as an external support process, or optionally can run on
25 a dedicated machine, to relay the meter's BT communication to a COM port,
26 where sigrok's serial-dmm driver can access the multimeter's acquisition
27 data. The sigrok application then gets invoked like so:
28
29   $ sigrok-cli -d eevblog-121gw:conn=/dev/ttyUSB0 --continuous
30
31 This approach is a development HACK until the sigrok project may receive
32 better integration of BLE communication. But it also helps abstract away
33 platform dependent details or the choice for BLE communication libraries
34 or tools, and helps speed up the development of the 121GW meter device
35 support. Another benefit is the separation of the BLE communication from
36 the packet inspection and data processing (priviledges are required for
37 the former but not for the latter, COM ports typically are accessible to
38 local users already).
39
40 A future approach might perhaps support something like:
41
42   -d eevblog-121gw:conn=ble/121gw/<bdaddr>
43   -d eevblog-121gw:conn=tcp-ser/<ip>/<port>
44 """
45
46 import argparse
47 from bluepy import btle
48 import serial
49 import sys
50 import time
51
52 _ble_scan_dev = "hci0"
53 _ble_scan_time = 1
54 _ble_scan_desc = "Complete Local Name"
55 _ble_scan_value = "121GW"
56
57 _ble_write_handle = 9
58 _ble_write_data = [ 0x03, 0x00, ]
59 _ble_read_handle = 8
60
61 # Verbosity levels:
62 # - 0, quiet operation
63 # - 1 (default), "summary" (got/lost connection, termination)
64 # - 2, plus serial data
65 # - 3, plus BT scan/comm details
66 _verbose = None
67
68 def _diag_print(level, text):
69         if level > _verbose:
70                 return
71         print("DIAG: {}".format(text))
72
73 def _parse_cmdline():
74         parser = argparse.ArgumentParser(
75                 description = "EEVblog 121GW Bluetooth to serial converter"
76         )
77         parser.add_argument(
78                 "-i", "--hcidev",
79                 help = "BT HCI device name",
80                 default = _ble_scan_dev
81         )
82         parser.add_argument(
83                 "-b", "--bdaddr",
84                 help = "BT device address",
85                 default = ""
86         )
87         parser.add_argument(
88                 "-p", "--comport",
89                 help = "COM port to send bytes to",
90                 default = ""
91         )
92         parser.add_argument(
93                 "-B", "--bitrate",
94                 help = "bit rate for COM port data",
95                 default = 115200
96         )
97         parser.add_argument(
98                 "-v", "--verbose",
99                 help = "increase verbosity",
100                 action = "count", default = 1
101         )
102         parser.add_argument(
103                 "-q", "--quiet",
104                 help = "decrease verbosity",
105                 action = "count", default = 0
106         )
107         args = parser.parse_args()
108         return args
109
110 def _open_comport(args):
111         if not args:
112                 return None
113         if not args.comport:
114                 return None
115         com_port = serial.Serial(
116                 args.comport, args.bitrate,
117                 bytesize = serial.EIGHTBITS,
118                 parity = serial.PARITY_NONE,
119                 stopbits = serial.STOPBITS_ONE)
120         return com_port
121
122 def _send_comport(port, data):
123         port.write(data)
124
125 class CommDelegate(btle.DefaultDelegate):
126
127         def __init__(self, port):
128                 btle.DefaultDelegate.__init__(self)
129                 self._port = port
130
131         def handleNotification(self, handle, data):
132                 if handle != _ble_read_handle:
133                         return
134                 data = bytearray(data)
135                 self.parse_data(data)
136
137         def parse_data(self, data):
138                 text = " ".join([ "{:02x}".format(d) for d in data ])
139                 _diag_print(2, "got BLE data: {}".format(text))
140                 _send_comport(self._port, data)
141
142 def _scan_bleaddr(args):
143         # If not specified by the user, Scan for the device's BDADDR.
144
145         if not args:
146                 return None
147         if args.bdaddr:
148                 return args.bdaddr
149
150         iface = args.hcidev
151         if iface.startswith("hci"):
152                 iface = iface.replace("hci", "")
153         iface = int(iface)
154
155         _diag_print(3, "Scanning (iface {}) ...".format(iface))
156         scanner = btle.Scanner(iface)
157         try:
158                 devices = scanner.scan(_ble_scan_time)
159         except btle.BTLEException as e:
160                 return None
161         except KeyboardInterrupt as e:
162                 return None
163         bdaddr = False
164         for dev in devices:
165                 a, t, sd = dev.addr, dev.addrType, dev.getScanData()
166                 for adtype, desc, value in sd:
167                         if desc != _ble_scan_desc:
168                                 continue
169                         if value != _ble_scan_value:
170                                 continue
171                         bdaddr = a
172                         break
173
174         _diag_print(3, "Done scanning, addr {}".format(bdaddr))
175         return bdaddr
176
177 def _open_bleconn(args, port):
178         bdaddr = _scan_bleaddr(args)
179         if not bdaddr:
180                 return False
181
182         _diag_print(3, "Opening bluepy(3) connection ...")
183         try:
184                 conn = btle.Peripheral(bdaddr, btle.ADDR_TYPE_PUBLIC)
185         except btle.BTLEException as e:
186                 return False
187         except KeyboardInterrupt as e:
188                 return None
189         conn.setDelegate(CommDelegate(port))
190         data = bytearray(_ble_write_data)
191         conn.writeCharacteristic(_ble_write_handle, data)
192         _diag_print(3, "bluepy(3) connection {}".format(conn))
193         return conn
194
195 def main():
196         global _verbose
197
198         args = _parse_cmdline()
199         _verbose = args.verbose - args.quiet
200
201         com_port = _open_comport(args)
202         if not com_port:
203                 sys.exit(1)
204
205         conn = None
206         while True:
207                 # Automatically (re-)create connections, handle absence
208                 # of devices (retry later).
209                 if not conn:
210                         _diag_print(3, "connecting to BT ...")
211                         conn = _open_bleconn(args, com_port)
212                         if conn is None:
213                                 break
214                         if conn:
215                                 _diag_print(1, "got BLE connection.")
216                 if not conn:
217                         _diag_print(3, "could not connect to BT, waiting ...")
218                         time.sleep(.5)
219                         continue
220                 # Wait for incoming notifications. A registered handler
221                 # will process the data. Gracefully handle lost connections.
222                 try:
223                         conn.waitForNotifications(1.0)
224                 except KeyboardInterrupt as e:
225                         _diag_print(1, "got CTRL-C, terminating ...")
226                         break
227                 # except btle.BTLEException.DISCONNECTED as e:
228                 except btle.BTLEException as e:
229                         _diag_print(1, "lost BLE connection.")
230                         conn = None
231
232         return
233
234 if __name__ == "__main__":
235         main()