##
-## This file is part of the sigrok project.
+## This file is part of the libsigrok project.
##
## Copyright (C) 2013 Martin Ling <martin-sigrok@earth.li>
##
from functools import partial
from fractions import Fraction
-from lowlevel import *
-import lowlevel
+from .lowlevel import *
+from . import lowlevel
import itertools
-__all__ = ['Error', 'Context', 'Driver', 'Device', 'Session', 'Packet']
+__all__ = ['Error', 'Context', 'Driver', 'Device', 'Session', 'Packet', 'Log',
+ 'LogLevel', 'PacketType', 'Quantity', 'Unit', 'QuantityFlag', 'ConfigKey',
+ 'ProbeType', 'Probe', 'ProbeGroup']
class Error(Exception):
if result != SR_OK:
raise Error(result)
-def config_key(name):
- if not name.lower() == name:
- raise AttributeError
- key_name = "SR_CONF_" + name.upper()
- return getattr(lowlevel, key_name)
-
def gvariant_to_python(value):
type_string = g_variant_get_type_string(value)
if type_string == 't':
return Fraction(
g_variant_get_uint64(g_variant_get_child_value(value, 0)),
g_variant_get_uint64(g_variant_get_child_value(value, 1)))
- raise NotImplementedError, (
+ raise NotImplementedError(
"Can't convert GVariant type '%s' to a Python type." % type_string)
def python_to_gvariant(value):
return g_variant_new_string(value)
if isinstance(value, Fraction):
array = new_gvariant_ptr_array(2)
- gvariant_ptr_array_setitem(array, 0, value.numerator)
- gvariant_ptr_array_setitem(array, 1, value.denominator)
+ gvariant_ptr_array_setitem(array, 0,
+ g_variant_new_uint64(value.numerator))
+ gvariant_ptr_array_setitem(array, 1,
+ g_variant_new_uint64(value.denominator))
result = g_variant_new_tuple(array, 2)
delete_gvariant_ptr_array(array)
return result
- raise NotImplementedError, (
+ raise NotImplementedError(
"Can't convert Python '%s' to a GVariant." % type(value))
def callback_wrapper(session, callback, device_ptr, packet_ptr):
def name(self):
return self.struct.name
- def scan(self):
+ def scan(self, **kwargs):
if not self._initialized:
check(sr_driver_init(self.context.struct, self.struct))
self._initialized = True
- devices = []
- device_list = sr_driver_scan(self.struct, None)
- device_list_item = device_list
- while device_list_item:
- ptr = device_list_item.data
- device_ptr = gpointer_to_sr_dev_inst_ptr(ptr)
- devices.append(Device(self, device_ptr))
- device_list_item = device_list_item.next
+ options = []
+ for name, value in kwargs.items():
+ key = getattr(ConfigKey, name.upper())
+ src = sr_config()
+ src.key = key.id
+ src.data = python_to_gvariant(value)
+ options.append(src.this)
+ option_list = python_to_gslist(options)
+ device_list = sr_driver_scan(self.struct, option_list)
+ g_slist_free(option_list)
+ devices = [Device(self, gpointer_to_sr_dev_inst_ptr(ptr))
+ for ptr in gslist_to_python(device_list)]
g_slist_free(device_list)
return devices
def __init__(self, driver, struct):
self.driver = driver
self.struct = struct
+ self._probes = None
+ self._probe_groups = None
def __getattr__(self, name):
- key = config_key(name)
+ key = getattr(ConfigKey, name.upper())
data = new_gvariant_ptr_ptr()
try:
- check(sr_config_get(self.driver.struct, key, data, self.struct))
+ check(sr_config_get(self.driver.struct, self.struct, None,
+ key, data))
except Error as error:
- if error.errno == SR_ERR_ARG:
- raise NotImplementedError, (
+ if error.errno == SR_ERR_NA:
+ raise NotImplementedError(
"Device does not implement %s" % name)
else:
raise AttributeError
def __setattr__(self, name, value):
try:
- key = config_key(name)
+ key = getattr(ConfigKey, name.upper())
except AttributeError:
super(Device, self).__setattr__(name, value)
return
- check(sr_config_set(self.struct, key, python_to_gvariant(value)))
+ check(sr_config_set(self.struct, None, key, python_to_gvariant(value)))
@property
def vendor(self):
def version(self):
return self.struct.version
+ @property
+ def probes(self):
+ if self._probes is None:
+ self._probes = {}
+ probe_list = self.struct.probes
+ while (probe_list):
+ probe_ptr = void_ptr_to_sr_probe_ptr(probe_list.data)
+ self._probes[probe_ptr.name] = Probe(self, probe_ptr)
+ probe_list = probe_list.next
+ return self._probes
+
+ @property
+ def probe_groups(self):
+ if self._probe_groups is None:
+ self._probe_groups = {}
+ probe_group_list = self.struct.probe_groups
+ while (probe_group_list):
+ probe_group_ptr = void_ptr_to_sr_probe_group_ptr(
+ probe_group_list.data)
+ self._probe_groups[probe_group_ptr.name] = ProbeGroup(self,
+ probe_group_ptr)
+ probe_group_list = probe_group_list.next
+ return self._probe_groups
+
+class Probe(object):
+
+ def __init__(self, device, struct):
+ self.device = device
+ self.struct = struct
+
+ @property
+ def type(self):
+ return ProbeType(self.struct.type)
+
+ @property
+ def enabled(self):
+ return self.struct.enabled
+
+ @property
+ def name(self):
+ return self.struct.name
+
+class ProbeGroup(object):
+
+ def __init__(self, device, struct):
+ self.device = device
+ self.struct = struct
+ self._probes = None
+
+ def __iter__(self):
+ return iter(self.probes)
+
+ @property
+ def name(self):
+ return self.struct.name
+
+ @property
+ def probes(self):
+ if self._probes is None:
+ self._probes = []
+ probe_list = self.struct.probes
+ while (probe_list):
+ probe_ptr = void_ptr_to_sr_probe_ptr(probe_list.data)
+ self._probes.append(Probe(self, probe_ptr))
+ probe_list = probe_list.next
+ return self._probes
+
class Session(object):
def __init__(self, context):
def add_device(self, device):
check(sr_session_dev_add(device.struct))
+ def open_device(self, device):
+ check(sr_dev_open(device.struct))
+
def add_callback(self, callback):
wrapper = partial(callback_wrapper, self, callback)
check(sr_session_datafeed_python_callback_add(wrapper))
@property
def type(self):
- return self.struct.type
+ return PacketType(self.struct.type)
@property
def payload(self):
if self._payload is None:
pointer = self.struct.payload
- if self.struct.type == SR_DF_LOGIC:
+ if self.type == PacketType.LOGIC:
self._payload = Logic(self,
void_ptr_to_sr_datafeed_logic_ptr(pointer))
+ elif self.type == PacketType.ANALOG:
+ self._payload = Analog(self,
+ void_ptr_to_sr_datafeed_analog_ptr(pointer))
else:
- raise NotImplementedError, (
- "No Python mapping for packet type %ѕ" % self.struct.type)
+ raise NotImplementedError(
+ "No Python mapping for packet type %s" % self.struct.type)
return self._payload
class Logic(object):
self._data = cdata(self.struct.data, self.struct.length)
return self._data
+class Analog(object):
+
+ def __init__(self, packet, struct):
+ self.packet = packet
+ self.struct = struct
+ self._data = None
+
+ @property
+ def num_samples(self):
+ return self.struct.num_samples
+
+ @property
+ def mq(self):
+ return Quantity(self.struct.mq)
+
+ @property
+ def unit(self):
+ return Unit(self.struct.unit)
+
+ @property
+ def mqflags(self):
+ return QuantityFlag.set_from_mask(self.struct.mqflags)
+
+ @property
+ def data(self):
+ if self._data is None:
+ self._data = float_array.frompointer(self.struct.data)
+ return self._data
+
+class Log(object):
+
+ @property
+ def level(self):
+ return LogLevel(sr_log_loglevel_get())
+
+ @level.setter
+ def level(self, l):
+ check(sr_log_loglevel_set(l.id))
+
+ @property
+ def domain(self):
+ return sr_log_logdomain_get()
+
+ @domain.setter
+ def domain(self, d):
+ check(sr_log_logdomain_set(d))
+
+class EnumValue(object):
+
+ _enum_values = {}
+
+ def __new__(cls, id):
+ if cls not in cls._enum_values:
+ cls._enum_values[cls] = {}
+ if id not in cls._enum_values[cls]:
+ value = super(EnumValue, cls).__new__(cls)
+ value.id = id
+ cls._enum_values[cls][id] = value
+ return cls._enum_values[cls][id]
+
+class LogLevel(EnumValue):
+ pass
+
+class PacketType(EnumValue):
+ pass
+
+class Quantity(EnumValue):
+ pass
+
+class Unit(EnumValue):
+ pass
+
+class QuantityFlag(EnumValue):
+
+ @classmethod
+ def set_from_mask(cls, mask):
+ result = set()
+ while mask:
+ new_mask = mask & (mask - 1)
+ result.add(cls(mask ^ new_mask))
+ mask = new_mask
+ return result
+
+class ConfigKey(EnumValue):
+ pass
+
+class ProbeType(EnumValue):
+ pass
+
for symbol_name in dir(lowlevel):
- prefix = 'SR_DF_'
- if symbol_name.startswith(prefix):
- name = symbol_name[len(prefix):]
- setattr(Packet, name, getattr(lowlevel, symbol_name))
+ for prefix, cls in [
+ ('SR_LOG_', LogLevel),
+ ('SR_DF_', PacketType),
+ ('SR_MQ_', Quantity),
+ ('SR_UNIT_', Unit),
+ ('SR_MQFLAG_', QuantityFlag),
+ ('SR_CONF_', ConfigKey),
+ ('SR_PROBE_', ProbeType)]:
+ if symbol_name.startswith(prefix):
+ name = symbol_name[len(prefix):]
+ value = getattr(lowlevel, symbol_name)
+ setattr(cls, name, cls(value))