from functools import partial
from fractions import Fraction
+from collections import OrderedDict
from .lowlevel import *
from . import lowlevel
import itertools
__all__ = ['Error', 'Context', 'Driver', 'Device', 'Session', 'Packet', 'Log',
'LogLevel', 'PacketType', 'Quantity', 'Unit', 'QuantityFlag', 'ConfigKey',
- 'ProbeType', 'Probe', 'ProbeGroup', 'InputFormat', 'OutputFormat',
+ 'ProbeType', 'Probe', 'ChannelGroup', 'InputFormat', 'OutputFormat',
'InputFile', 'Output']
class Error(Exception):
@property
def input_formats(self):
if not self._input_formats:
- self._input_formats = {}
+ self._input_formats = OrderedDict()
input_list = sr_input_list()
for i in itertools.count():
input_ptr = sr_input_format_ptr_array_getitem(input_list, i)
self._initialized = True
options = []
for name, value in kwargs.items():
- key = getattr(ConfigKey, name.upper())
+ key = getattr(ConfigKey, name)
src = sr_config()
src.key = key.id
src.data = python_to_gvariant(value)
option_list = python_to_gslist(options)
device_list = sr_driver_scan(self.struct, option_list)
g_slist_free(option_list)
- devices = [Device(self, gpointer_to_sr_dev_inst_ptr(ptr))
+ devices = [HardwareDevice(self, gpointer_to_sr_dev_inst_ptr(ptr))
for ptr in gslist_to_python(device_list)]
g_slist_free(device_list)
return devices
class Device(object):
- def __new__(cls, driver, struct):
+ def __new__(cls, struct, context):
address = int(struct.this)
- if address not in driver.context._devices:
- device = super(Device, cls).__new__(cls, driver, struct)
- driver.context._devices[address] = device
- return driver.context._devices[address]
-
- def __init__(self, driver, struct):
- self.driver = driver
- self.struct = struct
- self._probes = None
- self._probe_groups = None
-
- def __getattr__(self, name):
- key = getattr(ConfigKey, name.upper())
- data = new_gvariant_ptr_ptr()
- try:
- check(sr_config_get(self.driver.struct, self.struct, None,
- key.id, data))
- except Error as error:
- if error.errno == SR_ERR_NA:
- raise NotImplementedError(
- "Device does not implement %s" % name)
- else:
- raise AttributeError
- value = gvariant_ptr_ptr_value(data)
- return gvariant_to_python(value)
-
- def __setattr__(self, name, value):
- try:
- key = getattr(ConfigKey, name.upper())
- except AttributeError:
- super(Device, self).__setattr__(name, value)
- return
- check(sr_config_set(self.struct, None, key.id, python_to_gvariant(value)))
+ if address not in context._devices:
+ device = super(Device, cls).__new__(cls)
+ device.struct = struct
+ device.context = context
+ device._probes = None
+ device._channel_groups = None
+ context._devices[address] = device
+ return context._devices[address]
@property
def vendor(self):
self._probes = {}
probe_list = self.struct.probes
while (probe_list):
- probe_ptr = void_ptr_to_sr_probe_ptr(probe_list.data)
+ probe_ptr = void_ptr_to_sr_channel_ptr(probe_list.data)
self._probes[probe_ptr.name] = Probe(self, probe_ptr)
probe_list = probe_list.next
return self._probes
@property
- def probe_groups(self):
- if self._probe_groups is None:
- self._probe_groups = {}
- probe_group_list = self.struct.probe_groups
- while (probe_group_list):
- probe_group_ptr = void_ptr_to_sr_probe_group_ptr(
- probe_group_list.data)
- self._probe_groups[probe_group_ptr.name] = ProbeGroup(self,
- probe_group_ptr)
- probe_group_list = probe_group_list.next
- return self._probe_groups
+ def channel_groups(self):
+ if self._channel_groups is None:
+ self._channel_groups = {}
+ channel_group_list = self.struct.channel_groups
+ while (channel_group_list):
+ channel_group_ptr = void_ptr_to_sr_channel_group_ptr(
+ channel_group_list.data)
+ self._channel_groups[channel_group_ptr.name] = ChannelGroup(self,
+ channel_group_ptr)
+ channel_group_list = channel_group_list.next
+ return self._channel_groups
+
+class HardwareDevice(Device):
+
+ def __new__(cls, driver, struct):
+ device = Device.__new__(cls, struct, driver.context)
+ device.driver = driver
+ return device
+
+ def __getattr__(self, name):
+ key = getattr(ConfigKey, name)
+ data = new_gvariant_ptr_ptr()
+ try:
+ check(sr_config_get(self.driver.struct, self.struct, None,
+ key.id, data))
+ except Error as error:
+ if error.errno == SR_ERR_NA:
+ raise NotImplementedError(
+ "Device does not implement %s" % name)
+ else:
+ raise AttributeError
+ value = gvariant_ptr_ptr_value(data)
+ return gvariant_to_python(value)
+
+ def __setattr__(self, name, value):
+ try:
+ key = getattr(ConfigKey, name)
+ except AttributeError:
+ super(Device, self).__setattr__(name, value)
+ return
+ check(sr_config_set(self.struct, None, key.id, python_to_gvariant(value)))
class Probe(object):
def name(self):
return self.struct.name
-class ProbeGroup(object):
+class ChannelGroup(object):
def __init__(self, device, struct):
self.device = device
self.struct = struct
- self._probes = None
+ self._channels = None
def __iter__(self):
- return iter(self.probes)
+ return iter(self.channels)
def __getattr__(self, name):
key = config_key(name)
except Error as error:
if error.errno == SR_ERR_NA:
raise NotImplementedError(
- "Probe group does not implement %s" % name)
+ "Channel group does not implement %s" % name)
else:
raise AttributeError
value = gvariant_ptr_ptr_value(data)
try:
key = config_key(name)
except AttributeError:
- super(ProbeGroup, self).__setattr__(name, value)
+ super(ChannelGroup, self).__setattr__(name, value)
return
check(sr_config_set(self.device.struct, self.struct,
key.id, python_to_gvariant(value)))
return self.struct.name
@property
- def probes(self):
- if self._probes is None:
- self._probes = []
- probe_list = self.struct.probes
- while (probe_list):
- probe_ptr = void_ptr_to_sr_probe_ptr(probe_list.data)
- self._probes.append(Probe(self, probe_ptr))
- probe_list = probe_list.next
- return self._probes
+ def channels(self):
+ if self._channels is None:
+ self._channels = []
+ channel_list = self.struct.channels
+ while (channel_list):
+ channel_ptr = void_ptr_to_sr_channel_ptr(channel_list.data)
+ self._channels.append(Probe(self, probe_ptr))
+ channel_list = channel_list.next
+ return self._channels
class Session(object):
for key, value in kwargs.items():
g_hash_table_insert(self.struct.param, g_strdup(key), g_strdup(str(value)))
check(self.format.struct.call_init(self.struct, self.filename))
+ self.device = InputFileDevice(self)
def load(self):
check(self.format.struct.call_loadfile(self.struct, self.filename))
def __del__(self):
g_hash_table_destroy(self.struct.param)
+class InputFileDevice(Device):
+
+ def __new__(cls, file):
+ device = Device.__new__(cls, file.struct.sdi, file.format.context)
+ device.file = file
+ return device
+
class OutputFormat(object):
def __init__(self, context, struct):
class ConfigInfo(object):
- def __init__(self, key):
- self.key = key
- self.struct = sr_config_info_get(key.id)
+ def __new__(cls, key):
+ struct = sr_config_info_get(key.id)
+ if not struct:
+ return None
+ obj = super(ConfigInfo, cls).__new__(cls)
+ obj.key = key
+ obj.struct = struct
+ return obj
@property
def datatype(self):
if symbol_name.startswith(prefix):
name = symbol_name[len(prefix):]
value = getattr(lowlevel, symbol_name)
- setattr(cls, name, cls(value))
+ obj = cls(value)
+ setattr(cls, name, obj)
+ if cls is ConfigKey:
+ obj.info = ConfigInfo(obj)
+ if obj.info:
+ setattr(cls, obj.info.id, obj)
+ else:
+ setattr(cls, name.lower(), obj)