import itertools
__all__ = ['Error', 'Context', 'Driver', 'Device', 'Session', 'Packet', 'Log',
- 'LogLevel', 'PacketType', 'Quantity', 'Unit', 'QuantityFlag']
+ 'LogLevel', 'PacketType', 'Quantity', 'Unit', 'QuantityFlag', 'ConfigKey',
+ 'ProbeType', 'Probe', 'ProbeGroup', 'InputFormat', 'OutputFormat',
+ 'InputFile', 'Output']
class Error(Exception):
if result != SR_OK:
raise Error(result)
-def config_key(name):
- if not name.lower() == name:
- raise AttributeError
- key_name = "SR_CONF_" + name.upper()
- return getattr(lowlevel, key_name)
-
def gvariant_to_python(value):
type_string = g_variant_get_type_string(value)
if type_string == 't':
self.struct = sr_context_ptr_ptr_value(context_ptr_ptr)
self._drivers = None
self._devices = {}
+ self._input_formats = None
+ self._output_formats = None
self.session = None
def __del__(self):
break
return self._drivers
+ @property
+ def input_formats(self):
+ if not self._input_formats:
+ self._input_formats = {}
+ input_list = sr_input_list()
+ for i in itertools.count():
+ input_ptr = sr_input_format_ptr_array_getitem(input_list, i)
+ if input_ptr:
+ self._input_formats[input_ptr.id] = InputFormat(self, input_ptr)
+ else:
+ break
+ return self._input_formats
+
+ @property
+ def output_formats(self):
+ if not self._output_formats:
+ self._output_formats = {}
+ output_list = sr_output_list()
+ for i in itertools.count():
+ output_ptr = sr_output_format_ptr_array_getitem(output_list, i)
+ if output_ptr:
+ self._output_formats[output_ptr.id] = OutputFormat(self, output_ptr)
+ else:
+ break
+ return self._output_formats
+
class Driver(object):
def __init__(self, context, struct):
def name(self):
return self.struct.name
- def scan(self):
+ def scan(self, **kwargs):
if not self._initialized:
check(sr_driver_init(self.context.struct, self.struct))
self._initialized = True
- devices = []
- device_list = sr_driver_scan(self.struct, None)
- device_list_item = device_list
- while device_list_item:
- ptr = device_list_item.data
- device_ptr = gpointer_to_sr_dev_inst_ptr(ptr)
- devices.append(Device(self, device_ptr))
- device_list_item = device_list_item.next
+ options = []
+ for name, value in kwargs.items():
+ key = getattr(ConfigKey, name)
+ src = sr_config()
+ src.key = key.id
+ src.data = python_to_gvariant(value)
+ options.append(src.this)
+ option_list = python_to_gslist(options)
+ device_list = sr_driver_scan(self.struct, option_list)
+ g_slist_free(option_list)
+ devices = [HardwareDevice(self, gpointer_to_sr_dev_inst_ptr(ptr))
+ for ptr in gslist_to_python(device_list)]
g_slist_free(device_list)
return devices
class Device(object):
- def __new__(cls, driver, struct):
+ def __new__(cls, struct, context):
address = int(struct.this)
- if address not in driver.context._devices:
- device = super(Device, cls).__new__(cls, driver, struct)
- driver.context._devices[address] = device
- return driver.context._devices[address]
+ if address not in context._devices:
+ device = super(Device, cls).__new__(cls)
+ device.struct = struct
+ device.context = context
+ device._probes = None
+ device._probe_groups = None
+ context._devices[address] = device
+ return context._devices[address]
- def __init__(self, driver, struct):
- self.driver = driver
- self.struct = struct
+ @property
+ def vendor(self):
+ return self.struct.vendor
+
+ @property
+ def model(self):
+ return self.struct.model
+
+ @property
+ def version(self):
+ return self.struct.version
+
+ @property
+ def probes(self):
+ if self._probes is None:
+ self._probes = {}
+ probe_list = self.struct.probes
+ while (probe_list):
+ probe_ptr = void_ptr_to_sr_probe_ptr(probe_list.data)
+ self._probes[probe_ptr.name] = Probe(self, probe_ptr)
+ probe_list = probe_list.next
+ return self._probes
+
+ @property
+ def probe_groups(self):
+ if self._probe_groups is None:
+ self._probe_groups = {}
+ probe_group_list = self.struct.probe_groups
+ while (probe_group_list):
+ probe_group_ptr = void_ptr_to_sr_probe_group_ptr(
+ probe_group_list.data)
+ self._probe_groups[probe_group_ptr.name] = ProbeGroup(self,
+ probe_group_ptr)
+ probe_group_list = probe_group_list.next
+ return self._probe_groups
+
+class HardwareDevice(Device):
+
+ def __new__(cls, driver, struct):
+ device = Device.__new__(cls, struct, driver.context)
+ device.driver = driver
+ return device
def __getattr__(self, name):
- key = config_key(name)
+ key = getattr(ConfigKey, name)
data = new_gvariant_ptr_ptr()
try:
- check(sr_config_get(self.driver.struct, key, data, self.struct))
+ check(sr_config_get(self.driver.struct, self.struct, None,
+ key.id, data))
except Error as error:
if error.errno == SR_ERR_NA:
raise NotImplementedError(
def __setattr__(self, name, value):
try:
- key = config_key(name)
+ key = getattr(ConfigKey, name)
except AttributeError:
super(Device, self).__setattr__(name, value)
return
- check(sr_config_set(self.struct, key, python_to_gvariant(value)))
+ check(sr_config_set(self.struct, None, key.id, python_to_gvariant(value)))
+
+class Probe(object):
+
+ def __init__(self, device, struct):
+ self.device = device
+ self.struct = struct
@property
- def vendor(self):
- return self.struct.vendor
+ def type(self):
+ return ProbeType(self.struct.type)
@property
- def model(self):
- return self.struct.model
+ def enabled(self):
+ return self.struct.enabled
@property
- def version(self):
- return self.struct.version
+ def name(self):
+ return self.struct.name
+
+class ProbeGroup(object):
+
+ def __init__(self, device, struct):
+ self.device = device
+ self.struct = struct
+ self._probes = None
+
+ def __iter__(self):
+ return iter(self.probes)
+
+ def __getattr__(self, name):
+ key = config_key(name)
+ data = new_gvariant_ptr_ptr()
+ try:
+ check(sr_config_get(self.device.driver.struct, self.device.struct,
+ self.struct, key.id, data))
+ except Error as error:
+ if error.errno == SR_ERR_NA:
+ raise NotImplementedError(
+ "Probe group does not implement %s" % name)
+ else:
+ raise AttributeError
+ value = gvariant_ptr_ptr_value(data)
+ return gvariant_to_python(value)
+
+ def __setattr__(self, name, value):
+ try:
+ key = config_key(name)
+ except AttributeError:
+ super(ProbeGroup, self).__setattr__(name, value)
+ return
+ check(sr_config_set(self.device.struct, self.struct,
+ key.id, python_to_gvariant(value)))
+
+ @property
+ def name(self):
+ return self.struct.name
+
+ @property
+ def probes(self):
+ if self._probes is None:
+ self._probes = []
+ probe_list = self.struct.probes
+ while (probe_list):
+ probe_ptr = void_ptr_to_sr_probe_ptr(probe_list.data)
+ self._probes.append(Probe(self, probe_ptr))
+ probe_list = probe_list.next
+ return self._probes
class Session(object):
def add_device(self, device):
check(sr_session_dev_add(device.struct))
+ def open_device(self, device):
+ check(sr_dev_open(device.struct))
+
def add_callback(self, callback):
wrapper = partial(callback_wrapper, self, callback)
check(sr_session_datafeed_python_callback_add(wrapper))
void_ptr_to_sr_datafeed_analog_ptr(pointer))
else:
raise NotImplementedError(
- "No Python mapping for packet type %ѕ" % self.struct.type)
+ "No Python mapping for packet type %s" % self.struct.type)
return self._payload
class Logic(object):
def domain(self, d):
check(sr_log_logdomain_set(d))
+class InputFormat(object):
+
+ def __init__(self, context, struct):
+ self.context = context
+ self.struct = struct
+
+ @property
+ def id(self):
+ return self.struct.id
+
+ @property
+ def description(self):
+ return self.struct.description
+
+ def format_match(self, filename):
+ return bool(self.struct.call_format_match(filename))
+
+class InputFile(object):
+
+ def __init__(self, format, filename, **kwargs):
+ self.format = format
+ self.filename = filename
+ self.struct = sr_input()
+ self.struct.format = self.format.struct
+ self.struct.param = g_hash_table_new_full(
+ g_str_hash_ptr, g_str_equal_ptr, g_free_ptr, g_free_ptr)
+ for key, value in kwargs.items():
+ g_hash_table_insert(self.struct.param, g_strdup(key), g_strdup(str(value)))
+ check(self.format.struct.call_init(self.struct, self.filename))
+ self.device = InputFileDevice(self)
+
+ def load(self):
+ check(self.format.struct.call_loadfile(self.struct, self.filename))
+
+ def __del__(self):
+ g_hash_table_destroy(self.struct.param)
+
+class InputFileDevice(Device):
+
+ def __new__(cls, file):
+ device = Device.__new__(cls, file.struct.sdi, file.format.context)
+ device.file = file
+ return device
+
+class OutputFormat(object):
+
+ def __init__(self, context, struct):
+ self.context = context
+ self.struct = struct
+
+ @property
+ def id(self):
+ return self.struct.id
+
+ @property
+ def description(self):
+ return self.struct.description
+
+class Output(object):
+
+ def __init__(self, format, device, param=None):
+ self.format = format
+ self.device = device
+ self.param = param
+ self.struct = sr_output()
+ self.struct.format = self.format.struct
+ self.struct.sdi = self.device.struct
+ self.struct.param = param
+ check(self.format.struct.call_init(self.struct))
+
+ def receive(self, packet):
+
+ output_buf_ptr = new_uint8_ptr_ptr()
+ output_len_ptr = new_uint64_ptr()
+ using_obsolete_api = False
+
+ if self.format.struct.event and packet.type in (
+ PacketType.TRIGGER, PacketType.FRAME_BEGIN,
+ PacketType.FRAME_END, PacketType.END):
+ check(self.format.struct.call_event(self.struct, packet.type.id,
+ output_buf_ptr, output_len_ptr))
+ using_obsolete_api = True
+ elif self.format.struct.data and packet.type.id == self.format.struct.df_type:
+ check(self.format.struct.call_data(self.struct,
+ packet.payload.struct.data, packet.payload.struct.length,
+ output_buf_ptr, output_len_ptr))
+ using_obsolete_api = True
+
+ if using_obsolete_api:
+ output_buf = uint8_ptr_ptr_value(output_buf_ptr)
+ output_len = uint64_ptr_value(output_len_ptr)
+ result = cdata(output_buf, output_len)
+ g_free(output_buf)
+ return result
+
+ if self.format.struct.receive:
+ out_ptr = new_gstring_ptr_ptr()
+ check(self.format.struct.call_receive(self.struct, self.device.struct,
+ packet.struct, out_ptr))
+ out = gstring_ptr_ptr_value(out_ptr)
+ if out:
+ result = out.str
+ g_string_free(out, True)
+ return result
+
+ return None
+
+ def __del__(self):
+ check(self.format.struct.call_cleanup(self.struct))
+
+class ConfigInfo(object):
+
+ def __new__(cls, key):
+ struct = sr_config_info_get(key.id)
+ if not struct:
+ return None
+ obj = super(ConfigInfo, cls).__new__(cls)
+ obj.key = key
+ obj.struct = struct
+ return obj
+
+ @property
+ def datatype(self):
+ return DataType(self.struct.datatype)
+
+ @property
+ def id(self):
+ return self.struct.id
+
+ @property
+ def name(self):
+ return self.struct.name
+
+ @property
+ def description(self):
+ return self.struct.description
+
class EnumValue(object):
_enum_values = {}
mask = new_mask
return result
+class ConfigKey(EnumValue):
+ pass
+
+class DataType(EnumValue):
+ pass
+
+class ProbeType(EnumValue):
+ pass
+
for symbol_name in dir(lowlevel):
for prefix, cls in [
('SR_LOG_', LogLevel),
('SR_DF_', PacketType),
('SR_MQ_', Quantity),
('SR_UNIT_', Unit),
- ('SR_MQFLAG_', QuantityFlag)]:
+ ('SR_MQFLAG_', QuantityFlag),
+ ('SR_CONF_', ConfigKey),
+ ('SR_T_', DataType),
+ ('SR_PROBE_', ProbeType)]:
if symbol_name.startswith(prefix):
name = symbol_name[len(prefix):]
value = getattr(lowlevel, symbol_name)
- setattr(cls, name, cls(value))
+ obj = cls(value)
+ setattr(cls, name, obj)
+ if cls is ConfigKey:
+ obj.info = ConfigInfo(obj)
+ if obj.info:
+ setattr(cls, obj.info.id, obj)
+ else:
+ setattr(cls, name.lower(), obj)