Source code for amazon_dash.config

from __future__ import print_function
import os
import stat
from grp import getgrgid
from pwd import getpwuid

from jsonschema import validate, ValidationError
from yaml import load
from yaml.error import YAMLError

from amazon_dash.exceptions import SecurityException, ConfigFileNotFoundError, InvalidConfig

try:
    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

#: Json-schema validation
SCHEMA = {
    "title": "Config",
    "type": "object",
    "properties": {
        "settings": {
            "type": "object",
            "properties": {
                "delay": {
                    "type": "integer"
                },
                "interface": {
                    "type": "string"
                },
            }
        },
        "devices": {
            "type": "object",
            "properties": {
                "/": {}
            },
            "patternProperties": {
                "^([0-9A-Fa-f]{2}[:]){5}([0-9A-Fa-f]{2})$": {
                    "type": "object",
                    "properties": {
                        "name": {
                            "type": "string"
                        },
                        "cmd": {
                            "type": "string"
                        },
                        "user": {
                            "type": "string",
                        },
                        "cwd": {
                            "type": "string",
                        },
                        "url": {
                            "type": "string"
                        },
                        "method": {
                            "type": "string",
                            "oneOf": [
                                {"pattern": "GET|get"},
                                {"pattern": "HEAD|head"},
                                {"pattern": "POST|post"},
                                {"pattern": "PUT|put"},
                                {"pattern": "DELETE|delete"},
                                {"pattern": "CONNECT|connect"},
                                {"pattern": "OPTIONS|options"},
                                {"pattern": "trace|trace"},
                                {"pattern": "PATCH|patch"},
                            ]
                        },
                        "headers": {
                            "type": "object",
                        },
                        "content-type": {
                            "type": "string"
                        },
                        "body": {
                            "type": "string"
                        },
                        "homeassistant": {
                            "type": "string"
                        },
                        "ifttt": {
                            "type": "string"
                        },
                        "event": {
                            "type": "string"
                        },
                        "confirmation": {
                            "type": "string",
                        }
                    },
                }
            },
            "additionalProperties": False,

        },
        "confirmations": {
            "type": "object",
            "properties": {
                "/": {}
            },
            "patternProperties": {
                "^.+$": {
                    "type": "object",
                    "properties": {
                        "service": {
                            "enum": [
                                'telegram',
                                'pushbullet',
                            ]
                        },
                        "token": {
                            "type": "string",
                        },
                        "is_default": {
                            "type": "boolean",
                        },
                        "to": {
                            "type": "integer"
                        }
                    },
                    "required": ["service"],
                }
            },
        }
    },
    "required": ["devices"]
}


[docs]def get_file_owner(file): """Get file owner id :param str file: Path to file :return: user id :rtype: int """ try: return getpwuid(os.stat(file).st_uid)[0] except KeyError: return '???'
[docs]def get_file_group(file): """Get file group id :param file: Path to file :return: group id :rtype: int """ try: return getgrgid(os.stat(file).st_uid)[0] except KeyError: return '???'
[docs]def bitperm(s, perm, pos): """Returns zero if there are no permissions for a bit of the perm. of a file. Otherwise it returns a positive value :param os.stat_result s: os.stat(file) object :param str perm: R (Read) or W (Write) or X (eXecute) :param str pos: USR (USeR) or GRP (GRouP) or OTH (OTHer) :return: mask value :rtype: int """ perm = perm.upper() pos = pos.upper() assert perm in ['R', 'W', 'X'] assert pos in ['USR', 'GRP', 'OTH'] return s.st_mode & getattr(stat, 'S_I{}{}'.format(perm, pos))
[docs]def oth_w_perm(file): """Returns True if others have write permission to the file :param str file: Path to file :return: True if others have permits :rtype: bool """ return bitperm(os.stat(file), 'w', 'oth')
[docs]def only_root_write(path): """File is only writable by root :param str path: Path to file :return: True if only root can write :rtype: bool """ s = os.stat(path) for ug, bp in [(s.st_uid, bitperm(s, 'w', 'usr')), (s.st_gid, bitperm(s, 'w', 'grp'))]: # User id (is not root) and bit permission if ug and bp: return False if bitperm(s, 'w', 'oth'): return False return True
[docs]class Config(dict): """Parse and validate yaml Amazon-dash file config. The instance behaves like a dictionary """ def __init__(self, file, ignore_perms=False, **kwargs): """Set the config file and validate file permissions :param str file: path to file :param kwargs: default values in dict """ super(Config, self).__init__(**kwargs) if not os.path.lexists(file): raise ConfigFileNotFoundError(file) if not ignore_perms and ((not os.getuid() and not only_root_write(file)) or oth_w_perm(file)): file = os.path.abspath(file) raise SecurityException( 'There should be no permissions for other users in the file "{file}". ' 'Current permissions: {user}:{group} {perms}. {msg}. ' 'Run "sudo chmod 660 \'{file}\' && sudo chown root:root \'{file}\'"'.format( file=file, user=get_file_owner(file), group=get_file_group(file), perms=os.stat(file).st_mode & 0o777, msg='Removes write permission for others' if os.getuid() else 'Only root must be able to write to file')) self.file = file self.read()
[docs] def read(self): """Parse and validate the config file. The read data is accessible as a dictionary in this instance :return: None """ try: data = load(open(self.file), Loader) except (UnicodeDecodeError, YAMLError) as e: raise InvalidConfig(self.file, '{}'.format(e)) try: validate(data, SCHEMA) except ValidationError as e: raise InvalidConfig(self.file, e) self.update(data)
[docs]def check_config(file, printfn=print): """Command to check configuration file. Raises InvalidConfig on error :param str file: path to config file :param printfn: print function for success message :return: None """ Config(file).read() printfn('The configuration file "{}" is correct'.format(file))