#!/usr/bin/env python ## ## This file is part of the sigrok-meter project. ## ## Copyright (C) 2013 Uwe Hermann ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, write to the Free Software ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA ## import argparse from multiprocessing import Process, Queue from gi.repository import Gtk, GObject import re import sigrok.core as sr import sys import textwrap default_drivers = [('demo', {'analog_channels': 1})] default_loglevel = sr.LogLevel.WARN def init_and_run(queue, drivers, loglevel): def datafeed_in(device, packet): if packet.type == sr.PacketType.ANALOG: data = packet.payload.data unit, unit_str = packet.payload.unit, "" if unit == sr.Unit.VOLT: unit_str = " V" elif unit == sr.Unit.OHM: unit_str = " Ohm" elif unit == sr.Unit.AMPERE: unit_str = " A" mqflags, mqflags_str = packet.payload.mq_flags, "" if sr.QuantityFlag.AC in mqflags: mqflags_str = " AC" elif sr.QuantityFlag.DC in mqflags: mqflags_str = " DC" for i in range(packet.payload.num_samples): dev = "%s %s" % (device.vendor, device.model) val = "%f%s%s" % (data[0][i], unit_str, mqflags_str) queue.put((dev, val)) context = sr.Context_create() context.log_level = loglevel devices = [] for name, options in drivers: try: dr = context.drivers[name] devices.append(dr.scan(**options)[0]) except: print('error getting device for driver "{}", skipping'.format(name)) if not devices: print('no devices found') return session = context.create_session() for dev in devices: session.add_device(dev) dev.open() session.add_datafeed_callback(datafeed_in) session.start() session.run() session.stop() class SigrokMeter: def __init__(self): self.builder = Gtk.Builder() self.builder.add_from_file("sigrok-meter.glade") self.builder.connect_signals(self) self.value_label = self.builder.get_object("value_label") self.value_label2 = self.builder.get_object("value_label2") self.win = self.builder.get_object("mainwindow") self.win.show_all() self.queue = Queue() GObject.timeout_add(100, self.update_label_if_needed) def update_label_if_needed(self): try: t = self.queue.get_nowait() l = self.value_label if t[0] != "Victor" else self.value_label2 l.set_text("%s: %s" % (t[0], t[1])) except: pass GObject.timeout_add(100, self.update_label_if_needed) def on_quit(self, *args): Gtk.main_quit(*args) def on_about(self, action): about = self.builder.get_object("aboutdialog") context = sr.Context_create() sr_pkg = context.package_version sr_lib = context.lib_version s = "Using libsigrok %s (lib version %s)." % (sr_pkg, sr_lib) about.set_comments(s) about.run() about.hide() def parse_cli(): parser = argparse.ArgumentParser( description='Simple sigrok GUI for multimeters and dataloggers.', epilog=textwrap.dedent('''\ The DRIVER string is the same as for sigrok-cli(1). examples: %(prog)s --driver tecpel-dmm-8061-ser:conn=/dev/ttyUSB0 %(prog)s --driver uni-t-ut61e:conn=1a86.e008 '''), formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument('-d', '--driver', action='append', help='The driver to use') parser.add_argument('-l', '--loglevel', type=int, help='Set loglevel (5 is most verbose)') args = parser.parse_args() result = { 'drivers': default_drivers, 'loglevel': default_loglevel } if args.driver: result['drivers'] = [] for d in args.driver: m = re.match('(?P[^:]+)(?P(:[^:=]+=[^:=]+)*)', d) if not m: sys.exit('error parsing option "{}"'.format(d)) opts = m.group('opts').split(':')[1:] opts = [tuple(kv.split('=')) for kv in opts] opts = dict(opts) result['drivers'].append((m.group('name'), opts)) if args.loglevel != None: try: result['loglevel'] = sr.LogLevel.get(args.loglevel) except: sys.exit('error: invalid log level') return result if __name__ == '__main__': args = parse_cli() s = SigrokMeter() process = Process(target=init_and_run, args=(s.queue, args['drivers'], args['loglevel'])) process.start() Gtk.main() process.terminate()