]> sigrok.org Git - sigrok-util.git/commitdiff
eevblog-121gw: introduce BLE to UART gateway (EEVBlog 121GW comm helper)
authorGerhard Sittig <redacted>
Tue, 9 Oct 2018 17:27:06 +0000 (19:27 +0200)
committerUwe Hermann <redacted>
Sun, 14 Oct 2018 21:47:32 +0000 (23:47 +0200)
Introduce a Python script which relays data which is received via BLE
communication to a UART. This enables immediate use of the serial-dmm
device driver for the EEVBlog 121GW multimeter, until sigrok's serial
communication layer grows (better) support for this physical layer.

Prefer the bluepy(3) Python module over the external gatttool(1) process
since this enables automatic scan for the device's address, and graceful
handling of lost communication, including re-connecting to the device.

util/eevblog-121gw/eev121gw-ble-uart-relay [new file with mode: 0755]

diff --git a/util/eevblog-121gw/eev121gw-ble-uart-relay b/util/eevblog-121gw/eev121gw-ble-uart-relay
new file mode 100755 (executable)
index 0000000..0ff7645
--- /dev/null
@@ -0,0 +1,235 @@
+#!/usr/bin/env python
+##
+## This file is part of the sigrok-util project.
+##
+## Copyright (C) 2018 Gerhard Sittig <gerhard.sittig@gmx.net>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+"""
+Disguise EEVblog 121GW Bluetooth communication as a COM port (devel HACK).
+
+This script runs as an external support process, or optionally can run on
+a dedicated machine, to relay the meter's BT communication to a COM port,
+where sigrok's serial-dmm driver can access the multimeter's acquisition
+data. The sigrok application then gets invoked like so:
+
+  $ sigrok-cli -d eevblog-121gw:conn=/dev/ttyUSB0 --continuous
+
+This approach is a development HACK until the sigrok project may receive
+better integration of BLE communication. But it also helps abstract away
+platform dependent details or the choice for BLE communication libraries
+or tools, and helps speed up the development of the 121GW meter device
+support. Another benefit is the separation of the BLE communication from
+the packet inspection and data processing (priviledges are required for
+the former but not for the latter, COM ports typically are accessible to
+local users already).
+
+A future approach might perhaps support something like:
+
+  -d eevblog-121gw:conn=ble/121gw/<bdaddr>
+  -d eevblog-121gw:conn=tcp-ser/<ip>/<port>
+"""
+
+import argparse
+from bluepy import btle
+import serial
+import sys
+import time
+
+_ble_scan_dev = "hci0"
+_ble_scan_time = 1
+_ble_scan_desc = "Complete Local Name"
+_ble_scan_value = "121GW"
+
+_ble_write_handle = 9
+_ble_write_data = [ 0x03, 0x00, ]
+_ble_read_handle = 8
+
+# Verbosity levels:
+# - 0, quiet operation
+# - 1 (default), "summary" (got/lost connection, termination)
+# - 2, plus serial data
+# - 3, plus BT scan/comm details
+_verbose = None
+
+def _diag_print(level, text):
+       if level > _verbose:
+               return
+       print("DIAG: {}".format(text))
+
+def _parse_cmdline():
+       parser = argparse.ArgumentParser(
+               description = "EEVblog 121GW Bluetooth to serial converter"
+       )
+       parser.add_argument(
+               "-i", "--hcidev",
+               help = "BT HCI device name",
+               default = _ble_scan_dev
+       )
+       parser.add_argument(
+               "-b", "--bdaddr",
+               help = "BT device address",
+               default = ""
+       )
+       parser.add_argument(
+               "-p", "--comport",
+               help = "COM port to send bytes to",
+               default = ""
+       )
+       parser.add_argument(
+               "-B", "--bitrate",
+               help = "bit rate for COM port data",
+               default = 115200
+       )
+       parser.add_argument(
+               "-v", "--verbose",
+               help = "increase verbosity",
+               action = "count", default = 1
+       )
+       parser.add_argument(
+               "-q", "--quiet",
+               help = "decrease verbosity",
+               action = "count", default = 0
+       )
+       args = parser.parse_args()
+       return args
+
+def _open_comport(args):
+       if not args:
+               return None
+       if not args.comport:
+               return None
+       com_port = serial.Serial(
+               args.comport, args.bitrate,
+               bytesize = serial.EIGHTBITS,
+               parity = serial.PARITY_NONE,
+               stopbits = serial.STOPBITS_ONE)
+       return com_port
+
+def _send_comport(port, data):
+       port.write(data)
+
+class CommDelegate(btle.DefaultDelegate):
+
+       def __init__(self, port):
+               btle.DefaultDelegate.__init__(self)
+               self._port = port
+
+       def handleNotification(self, handle, data):
+               if handle != _ble_read_handle:
+                       return
+               data = bytearray(data)
+               self.parse_data(data)
+
+       def parse_data(self, data):
+               text = " ".join([ "{:02x}".format(d) for d in data ])
+               _diag_print(2, "got BLE data: {}".format(text))
+               _send_comport(self._port, data)
+
+def _scan_bleaddr(args):
+       # If not specified by the user, Scan for the device's BDADDR.
+
+       if not args:
+               return None
+       if args.bdaddr:
+               return args.bdaddr
+
+       iface = args.hcidev
+       if iface.startswith("hci"):
+               iface = iface.replace("hci", "")
+       iface = int(iface)
+
+       _diag_print(3, "Scanning (iface {}) ...".format(iface))
+       scanner = btle.Scanner(iface)
+       try:
+               devices = scanner.scan(_ble_scan_time)
+       except btle.BTLEException as e:
+               return None
+       except KeyboardInterrupt as e:
+               return None
+       bdaddr = False
+       for dev in devices:
+               a, t, sd = dev.addr, dev.addrType, dev.getScanData()
+               for adtype, desc, value in sd:
+                       if desc != _ble_scan_desc:
+                               continue
+                       if value != _ble_scan_value:
+                               continue
+                       bdaddr = a
+                       break
+
+       _diag_print(3, "Done scanning, addr {}".format(bdaddr))
+       return bdaddr
+
+def _open_bleconn(args, port):
+       bdaddr = _scan_bleaddr(args)
+       if not bdaddr:
+               return False
+
+       _diag_print(3, "Opening bluepy(3) connection ...")
+       try:
+               conn = btle.Peripheral(bdaddr, btle.ADDR_TYPE_PUBLIC)
+       except btle.BTLEException as e:
+               return False
+       except KeyboardInterrupt as e:
+               return None
+       conn.setDelegate(CommDelegate(port))
+       data = bytearray(_ble_write_data)
+       conn.writeCharacteristic(_ble_write_handle, data)
+       _diag_print(3, "bluepy(3) connection {}".format(conn))
+       return conn
+
+def main():
+       global _verbose
+
+       args = _parse_cmdline()
+       _verbose = args.verbose - args.quiet
+
+       com_port = _open_comport(args)
+       if not com_port:
+               sys.exit(1)
+
+       conn = None
+       while True:
+               # Automatically (re-)create connections, handle absence
+               # of devices (retry later).
+               if not conn:
+                       _diag_print(3, "connecting to BT ...")
+                       conn = _open_bleconn(args, com_port)
+                       if conn is None:
+                               break
+                       if conn:
+                               _diag_print(1, "got BLE connection.")
+               if not conn:
+                       _diag_print(3, "could not connect to BT, waiting ...")
+                       time.sleep(.5)
+                       continue
+               # Wait for incoming notifications. A registered handler
+               # will process the data. Gracefully handle lost connections.
+               try:
+                       conn.waitForNotifications(1.0)
+               except KeyboardInterrupt as e:
+                       _diag_print(1, "got CTRL-C, terminating ...")
+                       break
+               # except btle.BTLEException.DISCONNECTED as e:
+               except btle.BTLEException as e:
+                       _diag_print(1, "lost BLE connection.")
+                       conn = None
+
+       return
+
+if __name__ == "__main__":
+       main()