#!/usr/bin/env python ## ## This file is part of the sigrok-util project. ## ## Copyright (C) 2018 Gerhard Sittig ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see . ## """ Disguise EEVblog 121GW Bluetooth communication as a COM port (devel HACK). This script runs as an external support process, or optionally can run on a dedicated machine, to relay the meter's BT communication to a COM port, where sigrok's serial-dmm driver can access the multimeter's acquisition data. The sigrok application then gets invoked like so: $ sigrok-cli -d eevblog-121gw:conn=/dev/ttyUSB0 --continuous This approach is a development HACK until the sigrok project may receive better integration of BLE communication. But it also helps abstract away platform dependent details or the choice for BLE communication libraries or tools, and helps speed up the development of the 121GW meter device support. Another benefit is the separation of the BLE communication from the packet inspection and data processing (priviledges are required for the former but not for the latter, COM ports typically are accessible to local users already). A future approach might perhaps support something like: -d eevblog-121gw:conn=ble/121gw/ -d eevblog-121gw:conn=tcp-ser// """ import argparse from bluepy import btle import serial import sys import time _ble_scan_dev = "hci0" _ble_scan_time = 1 _ble_scan_desc = "Complete Local Name" _ble_scan_value = "121GW" _ble_write_handle = 9 _ble_write_data = [ 0x03, 0x00, ] _ble_read_handle = 8 # Verbosity levels: # - 0, quiet operation # - 1 (default), "summary" (got/lost connection, termination) # - 2, plus serial data # - 3, plus BT scan/comm details _verbose = None def _diag_print(level, text): if level > _verbose: return print("DIAG: {}".format(text)) def _parse_cmdline(): parser = argparse.ArgumentParser( description = "EEVblog 121GW Bluetooth to serial converter" ) parser.add_argument( "-i", "--hcidev", help = "BT HCI device name", default = _ble_scan_dev ) parser.add_argument( "-b", "--bdaddr", help = "BT device address", default = "" ) parser.add_argument( "-p", "--comport", help = "COM port to send bytes to", default = "" ) parser.add_argument( "-B", "--bitrate", help = "bit rate for COM port data", default = 115200 ) parser.add_argument( "-v", "--verbose", help = "increase verbosity", action = "count", default = 1 ) parser.add_argument( "-q", "--quiet", help = "decrease verbosity", action = "count", default = 0 ) args = parser.parse_args() return args def _open_comport(args): if not args: return None if not args.comport: return None com_port = serial.Serial( args.comport, args.bitrate, bytesize = serial.EIGHTBITS, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE) return com_port def _send_comport(port, data): port.write(data) class CommDelegate(btle.DefaultDelegate): def __init__(self, port): btle.DefaultDelegate.__init__(self) self._port = port def handleNotification(self, handle, data): if handle != _ble_read_handle: return data = bytearray(data) self.parse_data(data) def parse_data(self, data): text = " ".join([ "{:02x}".format(d) for d in data ]) _diag_print(2, "got BLE data: {}".format(text)) _send_comport(self._port, data) def _scan_bleaddr(args): # If not specified by the user, Scan for the device's BDADDR. if not args: return None if args.bdaddr: return args.bdaddr iface = args.hcidev if iface.startswith("hci"): iface = iface.replace("hci", "") iface = int(iface) _diag_print(3, "Scanning (iface {}) ...".format(iface)) scanner = btle.Scanner(iface) try: devices = scanner.scan(_ble_scan_time) except btle.BTLEException as e: return None except KeyboardInterrupt as e: return None bdaddr = False for dev in devices: a, t, sd = dev.addr, dev.addrType, dev.getScanData() for adtype, desc, value in sd: if desc != _ble_scan_desc: continue if value != _ble_scan_value: continue bdaddr = a break _diag_print(3, "Done scanning, addr {}".format(bdaddr)) return bdaddr def _open_bleconn(args, port): bdaddr = _scan_bleaddr(args) if not bdaddr: return False _diag_print(3, "Opening bluepy(3) connection ...") try: conn = btle.Peripheral(bdaddr, btle.ADDR_TYPE_PUBLIC) except btle.BTLEException as e: return False except KeyboardInterrupt as e: return None conn.setDelegate(CommDelegate(port)) data = bytearray(_ble_write_data) conn.writeCharacteristic(_ble_write_handle, data) _diag_print(3, "bluepy(3) connection {}".format(conn)) return conn def main(): global _verbose args = _parse_cmdline() _verbose = args.verbose - args.quiet com_port = _open_comport(args) if not com_port: sys.exit(1) conn = None while True: # Automatically (re-)create connections, handle absence # of devices (retry later). if not conn: _diag_print(3, "connecting to BT ...") conn = _open_bleconn(args, com_port) if conn is None: break if conn: _diag_print(1, "got BLE connection.") if not conn: _diag_print(3, "could not connect to BT, waiting ...") time.sleep(.5) continue # Wait for incoming notifications. A registered handler # will process the data. Gracefully handle lost connections. try: conn.waitForNotifications(1.0) except KeyboardInterrupt as e: _diag_print(1, "got CTRL-C, terminating ...") break # except btle.BTLEException.DISCONNECTED as e: except btle.BTLEException as e: _diag_print(1, "lost BLE connection.") conn = None return if __name__ == "__main__": main()