from . import lowlevel
import itertools
-__all__ = ['Error', 'Context', 'Driver', 'Device', 'Session', 'Packet', 'Log']
+__all__ = ['Error', 'Context', 'Driver', 'Device', 'Session', 'Packet', 'Log',
+ 'LogLevel', 'PacketType', 'Quantity', 'Unit', 'QuantityFlag']
class Error(Exception):
@property
def type(self):
- return self.struct.type
+ return PacketType(self.struct.type)
@property
def payload(self):
if self._payload is None:
pointer = self.struct.payload
- if self.struct.type == SR_DF_LOGIC:
+ if self.type == PacketType.LOGIC:
self._payload = Logic(self,
void_ptr_to_sr_datafeed_logic_ptr(pointer))
- elif self.struct.type == SR_DF_ANALOG:
+ elif self.type == PacketType.ANALOG:
self._payload = Analog(self,
void_ptr_to_sr_datafeed_analog_ptr(pointer))
else:
@property
def mq(self):
- return self.struct.mq
+ return Quantity(self.struct.mq)
@property
def unit(self):
- return self.struct.unit
+ return Unit(self.struct.unit)
@property
def mqflags(self):
- return self.struct.mqflags
+ return QuantityFlag.set_from_mask(self.struct.mqflags)
@property
def data(self):
class Log(object):
- NONE = SR_LOG_NONE
- ERR = SR_LOG_ERR
- WARN = SR_LOG_WARN
- INFO = SR_LOG_INFO
- DBG = SR_LOG_DBG
- SPEW = SR_LOG_SPEW
-
@property
def level(self):
- return sr_log_loglevel_get()
+ return LogLevel(sr_log_loglevel_get())
@level.setter
def level(self, l):
- check(sr_log_loglevel_set(l))
+ check(sr_log_loglevel_set(l.id))
@property
def domain(self):
def domain(self, d):
check(sr_log_logdomain_set(d))
+class EnumValue(object):
+
+ _enum_values = {}
+
+ def __new__(cls, id):
+ if cls not in cls._enum_values:
+ cls._enum_values[cls] = {}
+ if id not in cls._enum_values[cls]:
+ value = super(EnumValue, cls).__new__(cls)
+ value.id = id
+ cls._enum_values[cls][id] = value
+ return cls._enum_values[cls][id]
+
+class LogLevel(EnumValue):
+ pass
+
+class PacketType(EnumValue):
+ pass
+
+class Quantity(EnumValue):
+ pass
+
+class Unit(EnumValue):
+ pass
+
+class QuantityFlag(EnumValue):
+
+ @classmethod
+ def set_from_mask(cls, mask):
+ result = set()
+ while mask:
+ new_mask = mask & (mask - 1)
+ result.add(cls(mask ^ new_mask))
+ mask = new_mask
+ return result
+
for symbol_name in dir(lowlevel):
- prefix = 'SR_DF_'
- if symbol_name.startswith(prefix):
- name = symbol_name[len(prefix):]
- setattr(Packet, name, getattr(lowlevel, symbol_name))
+ for prefix, cls in [
+ ('SR_LOG_', LogLevel),
+ ('SR_DF_', PacketType),
+ ('SR_MQ_', Quantity),
+ ('SR_UNIT_', Unit),
+ ('SR_MQFLAG_', QuantityFlag)]:
+ if symbol_name.startswith(prefix):
+ name = symbol_name[len(prefix):]
+ value = getattr(lowlevel, symbol_name)
+ setattr(cls, name, cls(value))