The IRMP core library is not prepared for threading or interleaved use
by multiple call sites for different data streams, internal state is
kept in global vars (MCU project heritage). Adjust the Python wrapper,
create one usable instance, and several more which fail to execute.
Fail late such that users see error messages.
The approach isn't pretty, but avoids segfaults when re-loaded sessions
assign multiple decoder instances, and raises user's awareness of the
"one instance" limitation by established means: "decoder error" bar, and
log messages, with a description to point out the cause.
This commit implements a dirty modification of a singleton. It's a pity
that Python appears to lack reliable destruction, hence the whole class
remains blocked even if the instance is released. Move all library use
into pd.py:decode() in the hope that Python's 'with' could help in a
future implementation. Prepare to either present a generic message that
is generated by pd.py, or pass on a text that originates in the Python
wrapper for the C library.
Library instance for an infrared protocol detector.
'''
Library instance for an infrared protocol detector.
'''
+ __usable_instance = None
+
class ResultData(ctypes.Structure):
_fields_ = [
( 'protocol', ctypes.c_uint32, ),
class ResultData(ctypes.Structure):
_fields_ = [
( 'protocol', ctypes.c_uint32, ),
return 'libirmp.dylib'
return 'irmp.dll'
return 'libirmp.dylib'
return 'irmp.dll'
+ def _library_setup_api(self):
- Create a library instance.
+ Lookup the C library's API routines. Declare their prototypes.
- # Load the library. Lookup routines, declare their prototypes.
- filename = self._library_filename()
- self._lib = ctypes.cdll.LoadLibrary(filename)
+ if not self._lib:
+ return False
self._lib.irmp_get_sample_rate.restype = ctypes.c_uint32
self._lib.irmp_get_sample_rate.argtypes = []
self._lib.irmp_get_sample_rate.restype = ctypes.c_uint32
self._lib.irmp_get_sample_rate.argtypes = []
# Create a result buffer that's local to the library instance.
self._data = self.ResultData()
# Create a result buffer that's local to the library instance.
self._data = self.ResultData()
+ return True
+
+ def __init__(self):
+ '''
+ Create a library instance.
+ '''
+
+ # Only create a working instance for the first invocation.
+ # Degrade all other instances, make them fail "late" during
+ # execution, so that users will see the errors.
+ self._lib = None
+ self._data = None
+ if IrmpLibrary.__usable_instance is None:
+ filename = self._library_filename()
+ self._lib = ctypes.cdll.LoadLibrary(filename)
+ self._library_setup_api()
+ IrmpLibrary.__usable_instance = self
+
def get_sample_rate(self):
def get_sample_rate(self):
+ if not self._lib:
+ return None
return self._lib.irmp_get_sample_rate()
def reset_state(self):
return self._lib.irmp_get_sample_rate()
def reset_state(self):
+ if not self._lib:
+ return None
self._lib.irmp_reset_state()
def add_one_sample(self, level):
self._lib.irmp_reset_state()
def add_one_sample(self, level):
+ if not self._lib:
+ raise Exception("IRMP library limited to a single instance.")
if not self._lib.irmp_add_one_sample(int(level)):
return False
self._lib.irmp_get_result_data(ctypes.byref(self._data))
return True
def get_result_data(self):
if not self._lib.irmp_add_one_sample(int(level)):
return False
self._lib.irmp_get_result_data(ctypes.byref(self._data))
return True
def get_result_data(self):
+ if not self._data:
+ return None
+ return {
'proto_nr': self._data.protocol,
'proto_name': self._data.protocol_name.decode('UTF-8', 'ignore'),
'address': self._data.address,
'proto_nr': self._data.protocol,
'proto_name': self._data.protocol_name.decode('UTF-8', 'ignore'),
'address': self._data.address,
'start': self._data.start_sample,
'end': self._data.end_sample,
}
'start': self._data.start_sample,
'end': self._data.end_sample,
}
class SamplerateError(Exception):
pass
class SamplerateError(Exception):
pass
+class LibraryError(Exception):
+ pass
+
class Decoder(srd.Decoder):
api_version = 3
id = 'ir_irmp'
class Decoder(srd.Decoder):
api_version = 3
id = 'ir_irmp'
self.put(ss, es, self.out_ann, [0, txts])
def __init__(self):
self.put(ss, es, self.out_ann, [0, txts])
def __init__(self):
- self.irmp = irmp_library.IrmpLibrary()
- self.lib_rate = self.irmp.get_sample_rate()
self.reset()
def reset(self):
self.reset()
def reset(self):
- self.irmp.reset_state()
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.samplerate = value
def decode(self):
self.samplerate = value
def decode(self):
+ if not self.irmp:
+ try:
+ self.irmp = irmp_library.IrmpLibrary()
+ except Exception as e:
+ txt = e.args[0]
+ raise LibraryError(txt)
+ if self.irmp:
+ self.lib_rate = self.irmp.get_sample_rate()
+ if not self.irmp or not self.lib_rate:
+ raise LibraryError('Cannot access IRMP library. One instance limit exceeded?')
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')
if self.samplerate % self.lib_rate:
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')
if self.samplerate % self.lib_rate:
- raise SamplerateError('capture samplerate must be multiple of library samplerate ({})'.format(self.lib_rate))
+ raise SamplerateError('Capture samplerate must be multiple of library samplerate ({})'.format(self.lib_rate))
self.rate_factor = int(self.samplerate / self.lib_rate)
self.rate_factor = int(self.samplerate / self.lib_rate)
+ if self.want_reset:
+ self.irmp.reset_state()
+ self.want_reset = False
self.active = 0 if self.options['polarity'] == 'active-low' else 1
ir, = self.wait()
self.active = 0 if self.options['polarity'] == 'active-low' else 1
ir, = self.wait()