]> sigrok.org Git - sigrok-util.git/blobdiff - util/eevblog-121gw/eev121gw-ble-uart-relay
eevblog-121gw: introduce BLE to UART gateway (EEVBlog 121GW comm helper)
[sigrok-util.git] / util / eevblog-121gw / eev121gw-ble-uart-relay
diff --git a/util/eevblog-121gw/eev121gw-ble-uart-relay b/util/eevblog-121gw/eev121gw-ble-uart-relay
new file mode 100755 (executable)
index 0000000..0ff7645
--- /dev/null
@@ -0,0 +1,235 @@
+#!/usr/bin/env python
+##
+## This file is part of the sigrok-util project.
+##
+## Copyright (C) 2018 Gerhard Sittig <gerhard.sittig@gmx.net>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+"""
+Disguise EEVblog 121GW Bluetooth communication as a COM port (devel HACK).
+
+This script runs as an external support process, or optionally can run on
+a dedicated machine, to relay the meter's BT communication to a COM port,
+where sigrok's serial-dmm driver can access the multimeter's acquisition
+data. The sigrok application then gets invoked like so:
+
+  $ sigrok-cli -d eevblog-121gw:conn=/dev/ttyUSB0 --continuous
+
+This approach is a development HACK until the sigrok project may receive
+better integration of BLE communication. But it also helps abstract away
+platform dependent details or the choice for BLE communication libraries
+or tools, and helps speed up the development of the 121GW meter device
+support. Another benefit is the separation of the BLE communication from
+the packet inspection and data processing (priviledges are required for
+the former but not for the latter, COM ports typically are accessible to
+local users already).
+
+A future approach might perhaps support something like:
+
+  -d eevblog-121gw:conn=ble/121gw/<bdaddr>
+  -d eevblog-121gw:conn=tcp-ser/<ip>/<port>
+"""
+
+import argparse
+from bluepy import btle
+import serial
+import sys
+import time
+
+_ble_scan_dev = "hci0"
+_ble_scan_time = 1
+_ble_scan_desc = "Complete Local Name"
+_ble_scan_value = "121GW"
+
+_ble_write_handle = 9
+_ble_write_data = [ 0x03, 0x00, ]
+_ble_read_handle = 8
+
+# Verbosity levels:
+# - 0, quiet operation
+# - 1 (default), "summary" (got/lost connection, termination)
+# - 2, plus serial data
+# - 3, plus BT scan/comm details
+_verbose = None
+
+def _diag_print(level, text):
+       if level > _verbose:
+               return
+       print("DIAG: {}".format(text))
+
+def _parse_cmdline():
+       parser = argparse.ArgumentParser(
+               description = "EEVblog 121GW Bluetooth to serial converter"
+       )
+       parser.add_argument(
+               "-i", "--hcidev",
+               help = "BT HCI device name",
+               default = _ble_scan_dev
+       )
+       parser.add_argument(
+               "-b", "--bdaddr",
+               help = "BT device address",
+               default = ""
+       )
+       parser.add_argument(
+               "-p", "--comport",
+               help = "COM port to send bytes to",
+               default = ""
+       )
+       parser.add_argument(
+               "-B", "--bitrate",
+               help = "bit rate for COM port data",
+               default = 115200
+       )
+       parser.add_argument(
+               "-v", "--verbose",
+               help = "increase verbosity",
+               action = "count", default = 1
+       )
+       parser.add_argument(
+               "-q", "--quiet",
+               help = "decrease verbosity",
+               action = "count", default = 0
+       )
+       args = parser.parse_args()
+       return args
+
+def _open_comport(args):
+       if not args:
+               return None
+       if not args.comport:
+               return None
+       com_port = serial.Serial(
+               args.comport, args.bitrate,
+               bytesize = serial.EIGHTBITS,
+               parity = serial.PARITY_NONE,
+               stopbits = serial.STOPBITS_ONE)
+       return com_port
+
+def _send_comport(port, data):
+       port.write(data)
+
+class CommDelegate(btle.DefaultDelegate):
+
+       def __init__(self, port):
+               btle.DefaultDelegate.__init__(self)
+               self._port = port
+
+       def handleNotification(self, handle, data):
+               if handle != _ble_read_handle:
+                       return
+               data = bytearray(data)
+               self.parse_data(data)
+
+       def parse_data(self, data):
+               text = " ".join([ "{:02x}".format(d) for d in data ])
+               _diag_print(2, "got BLE data: {}".format(text))
+               _send_comport(self._port, data)
+
+def _scan_bleaddr(args):
+       # If not specified by the user, Scan for the device's BDADDR.
+
+       if not args:
+               return None
+       if args.bdaddr:
+               return args.bdaddr
+
+       iface = args.hcidev
+       if iface.startswith("hci"):
+               iface = iface.replace("hci", "")
+       iface = int(iface)
+
+       _diag_print(3, "Scanning (iface {}) ...".format(iface))
+       scanner = btle.Scanner(iface)
+       try:
+               devices = scanner.scan(_ble_scan_time)
+       except btle.BTLEException as e:
+               return None
+       except KeyboardInterrupt as e:
+               return None
+       bdaddr = False
+       for dev in devices:
+               a, t, sd = dev.addr, dev.addrType, dev.getScanData()
+               for adtype, desc, value in sd:
+                       if desc != _ble_scan_desc:
+                               continue
+                       if value != _ble_scan_value:
+                               continue
+                       bdaddr = a
+                       break
+
+       _diag_print(3, "Done scanning, addr {}".format(bdaddr))
+       return bdaddr
+
+def _open_bleconn(args, port):
+       bdaddr = _scan_bleaddr(args)
+       if not bdaddr:
+               return False
+
+       _diag_print(3, "Opening bluepy(3) connection ...")
+       try:
+               conn = btle.Peripheral(bdaddr, btle.ADDR_TYPE_PUBLIC)
+       except btle.BTLEException as e:
+               return False
+       except KeyboardInterrupt as e:
+               return None
+       conn.setDelegate(CommDelegate(port))
+       data = bytearray(_ble_write_data)
+       conn.writeCharacteristic(_ble_write_handle, data)
+       _diag_print(3, "bluepy(3) connection {}".format(conn))
+       return conn
+
+def main():
+       global _verbose
+
+       args = _parse_cmdline()
+       _verbose = args.verbose - args.quiet
+
+       com_port = _open_comport(args)
+       if not com_port:
+               sys.exit(1)
+
+       conn = None
+       while True:
+               # Automatically (re-)create connections, handle absence
+               # of devices (retry later).
+               if not conn:
+                       _diag_print(3, "connecting to BT ...")
+                       conn = _open_bleconn(args, com_port)
+                       if conn is None:
+                               break
+                       if conn:
+                               _diag_print(1, "got BLE connection.")
+               if not conn:
+                       _diag_print(3, "could not connect to BT, waiting ...")
+                       time.sleep(.5)
+                       continue
+               # Wait for incoming notifications. A registered handler
+               # will process the data. Gracefully handle lost connections.
+               try:
+                       conn.waitForNotifications(1.0)
+               except KeyboardInterrupt as e:
+                       _diag_print(1, "got CTRL-C, terminating ...")
+                       break
+               # except btle.BTLEException.DISCONNECTED as e:
+               except btle.BTLEException as e:
+                       _diag_print(1, "lost BLE connection.")
+                       conn = None
+
+       return
+
+if __name__ == "__main__":
+       main()