Source code for amazon_dash.listener

import time
from collections import defaultdict

import threading

from amazon_dash.config import Config
from amazon_dash.confirmations import get_confirmation
from amazon_dash.exceptions import InvalidConfig, InvalidDevice
from amazon_dash.execute import logger, ExecuteCmd, ExecuteUrl, ExecuteHomeAssistant, ExecuteOpenHab, ExecuteIFTTT
from amazon_dash.scan import scan_devices

DEFAULT_DELAY = 10
"""
On seconds. By default, 10 seconds. Minimum time that must pass between pulsations of the Amazon Dash button.
"""

EXECUTE_CLS = {
    'cmd': ExecuteCmd,
    'url': ExecuteUrl,
    'homeassistant': ExecuteHomeAssistant,
    'openhab': ExecuteOpenHab,
    'ifttt': ExecuteIFTTT,
}
"""
Execute classes registered.
"""

last_execution = defaultdict(lambda: 0)
"""
Last time a device was executed. Value on unix time.
"""


[docs]class Device(object): """Set the execution method for the device """ execute_instance = None #: Execute cls instance def __init__(self, src, data=None, config=None): """ :param str src: Mac address :param data: device data """ data = data or {} config = config or {} if isinstance(src, Device): src = src.src self.src = src.lower() self.data = data execs = [cls(self.name, data) for name, cls in EXECUTE_CLS.items() if name in self.data] if len(execs) > 1: raise InvalidConfig( extra_body='There can only be one method of execution on a device. The device is {}. '.format(self.name) ) elif len(execs): self.execute_instance = execs[0] self.execute_instance.validate() self.confirmation = get_confirmation(src, data, config.get('confirmations', {})) @property def name(self): """Name on self.data or mac address :return: name :rtype: str """ return self.data.get('name', self.src)
[docs] def execute(self, root_allowed=False): """Execute this device :param bool root_allowed: Only used for ExecuteCmd :return: None """ logger.debug('%s device executed (mac %s)', self.name, self.src) if not self.execute_instance: msg = '%s: There is not execution method in device conf.' logger.warning(msg, self.name) self.send_confirmation(msg % self.name, False) return try: result = self.execute_instance.execute(root_allowed) except Exception as e: self.send_confirmation('Error executing the device {}: {}'.format(self.name, e), False) raise else: result = 'The {} device has been started and is running right now'.format(self.name) \ if result is None else result result = result or 'The {} device has been executed successfully'.format(self.name) self.send_confirmation(result) return result
[docs] def send_confirmation(self, message, success=True): """Send success or error message to configured confirmation :param str message: Body message to send :param bool success: Device executed successfully to personalize message :return: None """ message = message.strip() if not self.confirmation: return try: self.confirmation.send(message, success) except Exception as e: logger.warning('Error sending confirmation on device {}: {}'.format(self.name, e))
[docs]class Listener(object): """Start listener daemon for execute on button press """ root_allowed = False #: Only used for ExecuteCmd def __init__(self, config_path, ignore_perms=False): """ :param str config_path: Path to config file """ self.config = Config(config_path, ignore_perms) self.settings = self.config.get('settings', {}) self.devices = {key.lower(): Device(key, value, self.config) for key, value in self.config['devices'].items()} assert len(self.devices) == len(self.config['devices']), "Duplicate(s) MAC(s) on devices config."
[docs] def on_push(self, device): """Press button. Check DEFAULT_DELAY. :param scapy.packet.Packet device: Scapy packet :return: None """ src = device.src.lower() if last_execution[src] + self.settings.get('delay', DEFAULT_DELAY) > time.time(): return last_execution[src] = time.time() self.execute(device)
[docs] def execute(self, device): """Execute a device. Used if the time between executions is greater than DEFAULT_DELAY :param scapy.packet.Packet device: Scapy packet :return: None """ src = device.src.lower() device = self.devices[src] threading.Thread(target=device.execute, kwargs={ 'root_allowed': self.root_allowed }).start()
[docs] def run(self, root_allowed=False): """Start daemon mode :param bool root_allowed: Only used for ExecuteCmd :return: loop """ self.root_allowed = root_allowed scan_devices(self.on_push, lambda d: d.src.lower() in self.devices, self.settings.get('interface'))
[docs]def test_device(device, file, root_allowed=False): """Test the execution of a device without pressing the associated button :param str device: mac address :param str file: config file :param bool root_allowed: only used for ExecuteCmd :return: None """ config = Config(file) config.read() if not device in config['devices']: raise InvalidDevice('Device {} is not in config file.'.format(device)) logger.info(Device(device, config['devices'][device], config).execute(root_allowed))