I have a webthing programmed in python and used with v3.7.
It’s a LED-Strip that can turn on/off and dim over pwm.
Now I’m thinking of hooking a PIR sensor to a pin with interrupt signaling and let the LED-Strip turn on when ever the motion detection (of that PIR sensor) signals presence of motion in the room and off after a timeout ends.
Do I have to spin up a new thread through treading.event().wait(timeout=x..)
to not block the other features or is there some built in techniques to do so in webthing-python?
What’s best?
Since you’re using webthing-python, you’ll want to use some of tornado’s built-in async mechanisms. Here’s a link to some, but you may have to search around for others if they don’t fit your need. The threading
module doesn’t play well with tornado.
Seams like the ioloop.IOloop.current() does not work as the server starts the ioloop later. That makes it hard (impossible?) to get a callback to the main loop.
Another idea maybe is to use an event.
Remember my intention is to build a MotionSensor.
If I create an event, can that event somehow trigger my Light property?
Without going over the gateway. I need a localhost solution.
Edit:
I just noticed that iotschema.org lists a MotionDetected Event that is not listed in the https://iot.mozilla.org/schemas/#events
Does that have a reason?
In our multiple-things example we show how something can be done periodically using ioloop. It should be similar for your interrupt. We also modify a property inside that example, as you could do.
As for the schema, our schemas are not related to iotschema.org in any way. For this specific example, we have MotionProperty.
I’m still trying to do this and noticed some different behavior when using mraa interrupt to trigger a callback then when I do the same from a function.
I was able to create a callback in the multiple-things example by adding:
...
Property(self,
'on',
Value(self.geting(),self.seting),
...
def geting(self):
logging.debug('get')
def seting(self,value):
logging.debug(f'set {value}')
tornado.ioloop.IOLoop.current().call_later(2, self.geting)
if i toggle the on rocker-switch it prints:
00:00:00 set True
00:00:02 get
Great.
But if I do the same with a mraa interrupt then nothing happens.
Here is how this looks like:
class MotionDetection(Thing):
def __init__(self,location):
Thing.__init__(
self,
'urn:dev:ops:PIR-Sensor',
'PIR-Sensor',
['MotionDetector'],
'A web connected motion sensor'
)
self.pir_sensor = mraa.Gpio(13) # GPIO 1 (P13) on Linkit Smart 7688
self.pir_sensor.dir(mraa.DIR_IN) # set as INPUT pin
self.pir_sensor.isr(mraa.EDGE_BOTH, MotionDetection.interrupt_call, self.pir_sensor)
....
"""
Interrupt calls this when ever it triggers
"""
def interrupt_call(self,gpio):
logging.debug('interrupt_call')
self.set_property('motion_detection', True)
ioloop.IOLoop.current().call_later(5, self.interrupt_call_back)
def interrupt_call_back(self):
logging.debug('interrupt_call_back')
self.set_property('motion_detection', False)
This prints interrupt_call
but never prints: interrupt_call_back
as I would expect it to do 5s later.
Any ideas why?
I’m guessing it’s because you’re passing in a class method, rather than an instance method, below:
self.pir_sensor.isr(mraa.EDGE_BOTH, MotionDetection.interrupt_call, self.pir_sensor)
That should probably be:
self.pir_sensor.isr(mraa.EDGE_BOTH, self.interrupt_call, self.pir_sensor)
A friendly pro helped me debug this. It seems like I was on a wrong thread to callback as the interrupt works on its own.
What I had to do is save the ioloop.IOLoop.current()
, .callback()
on that and then do a .call_later()
.
my code looks like this now:
class MotionSensor(Thing):
"""A PIR Sensor in the entry of my upper Level"""
def __init__(self,location):
self.id = f'urn:dev:ops:{location}-motion-sensor'
self.name = f'{location}-motion-sensor'
Thing.__init__(
self,
self.id,
self.name,
['MotionSensor'],
'A web connected motion sensor'
)
self.main_loop = ioloop.IOLoop.current()
self.main_loop_time = None
self.pir_sensor = mraa.Gpio(13) # GPIO 1 (P13) on Linkit Smart 7688
self.pir_sensor.dir(mraa.DIR_IN) # set as INPUT pin
self.pir_sensor.isr(mraa.EDGE_BOTH, MotionSensor.interrupt_call, self)
Now I also chosen the wrong function as .call_later()
does not have a cancel method.
Imagine, I have a sensor and it reports the motion. Now each time somebody triggers, it should cancel and start to count down a timer until it reports “timeout”.
That’s when I want to power off my light. So I need some kind of cancel of that timer.
Here is my solution (I’m open for feedback!):
"""
Interrupt calls this when ever it triggers
"""
def interrupt_call(self):
self.set_property('motion_detection', True)
self.main_loop.add_callback(self.interrupt_call_back)
def interrupt_call_back(self):
if self.timeout:
self.main_loop.remove_timeout(self.timeout)
self.main_loop_time = self.main_loop.time()
self.get_delay_time()
self.timeout = self.main_loop.add_timeout(self.main_loop_time +
self.delay_seconds,
self.interrupt_call_back_later)
def interrupt_call_back_later(self):
self.set_property('motion_detection', False)
self.timeout = None
def get_delay_time(self):
self.delay_minutes = self.get_property('motion_sensor_delay')
self.delay_seconds = self.delay_minutes * 60
This works.
What I am still searching for, is how to trigger my Lamp that is in another class (thing) from this MotionSensor (thing) directly. Is that possible?
You can pass your lamp Thing into your MotionSensor thing and save it.
Can you give me a sample?
from tornado import ioloop
from webthing import MultipleThings, Property, Thing, Value, WebThingServer
import mraa
class Lamp(Thing):
def __init__(self, location):
self.id = f'urn:dev:ops:{location}-lamp'
self.name = f'{location}-lamp'
Thing.__init__(
self,
self.id,
self.name,
['Light'],
'A web connected lamp'
)
self.add_property(
Property(
self,
'on',
Value(False),
metadata={
'@type': 'OnOffProperty',
'title': 'On/Off',
'type': 'boolean',
'description': 'Whether the lamp is turned on',
}
)
)
class MotionSensor(Thing):
def __init__(self, location, lamp):
self.id = f'urn:dev:ops:{location}-motion-sensor'
self.name = f'{location}-motion-sensor'
Thing.__init__(
self,
self.id,
self.name,
['MotionSensor'],
'A web connected motion sensor'
)
self.lamp = lamp
self.add_property(
Property(
self,
'motion_detection',
Value(False),
metadata={
'@type': 'MotionProperty',
'title': 'Motion',
'type': 'boolean',
'description': 'Whether or not there is motion',
'readOnly': True,
}
)
)
self.add_property(
Property(
self,
'motion_sensor_delay',
Value(1),
metadata={
'title': 'Motion Sensor Delay',
'type': 'integer',
'description': 'Delay before turning sensor back off',
'unit': 'minute',
'minimum': 1,
}
)
)
self.add_property
self.timeout = None
self.main_loop = ioloop.IOLoop.current()
self.main_loop_time = None
self.pir_sensor = mraa.Gpio(13) # GPIO 1 (P13) on Linkit Smart 7688
self.pir_sensor.dir(mraa.DIR_IN) # set as INPUT pin
self.pir_sensor.isr(mraa.EDGE_BOTH, self.interrupt_call, self)
def interrupt_call(self):
self.set_property('motion_detection', True)
self.lamp.set_property('on', True)
self.main_loop.add_callback(self.interrupt_call_back)
def interrupt_call_back(self):
if self.timeout:
self.main_loop.remove_timeout(self.timeout)
self.main_loop_time = self.main_loop.time()
self.get_delay_time()
self.timeout = self.main_loop.add_timeout(
self.main_loop_time +
self.delay_seconds,
self.interrupt_call_back_later
)
def interrupt_call_back_later(self):
self.set_property('motion_detection', False)
self.lamp.set_property('on', False)
self.timeout = None
def get_delay_time(self):
self.delay_minutes = self.get_property('motion_sensor_delay')
self.delay_seconds = self.delay_minutes * 60
def run_server():
lamp = Lamp('upper-level')
sensor = MotionSensor('upper-level', lamp)
server = WebThingServer(
MultipleThings([lamp, sensor], 'MyDevice'),
port=8888
)
try:
server.start()
except KeyboardInterrupt:
server.stop()
if __name__ == '__main__':
run_server()
Thanks.
That works great.