Source code for amazon_dash.tests.test_execute

import base64
import json
import unittest
from threading import Thread

import requests
import sys
from requests_mock import NoMockAddress
from amazon_dash._compat import subprocess
from amazon_dash.tests._compat import patch as mock_patch, Mock

from amazon_dash.exceptions import SecurityException, InvalidConfig, ExecuteError
from amazon_dash.execute import ExecuteCmd, ExecuteUrl, logger, execute_cmd, get_shell, \
    ExecuteHomeAssistant, ExecuteOpenHab, ExecuteIFTTT, execute_over_ssh
from amazon_dash.tests.base import ExecuteMockBase

import requests_mock


[docs]def io_out(text): if sys.version_info >= (3, 0): from io import BytesIO return BytesIO(bytes(text, 'utf-8')) else: from StringIO import StringIO return StringIO('foo')
[docs]class TestExecuteCmdFunction(unittest.TestCase):
[docs] @mock_patch.object(subprocess, 'Popen') def test_success(self, m): process_mock = Mock() process_mock.configure_mock(**{'returncode': 0, 'stdout': io_out('foo')}) m.return_value = process_mock out = execute_cmd(['ls']) self.assertEqual(out[0], 'foo')
[docs] @mock_patch.object(subprocess, 'Popen') def test_error(self, m): process_mock = Mock() process_mock.configure_mock(**{'returncode': 1}) m.return_value = process_mock with self.assertRaises(ExecuteError): execute_cmd(['ls'])
[docs] @mock_patch.object(subprocess, 'Popen') def test_timeout(self, m): def side_effect(timeout=None): raise subprocess.TimeoutExpired('', timeout) process_mock = Mock() process_mock.configure_mock(**{'wait.side_effect': side_effect}) m.return_value = process_mock self.assertEqual(execute_cmd(['ls']), None)
[docs]class TestExecuteOverSsh(unittest.TestCase):
[docs] def test_invalid_port(self): with self.assertRaises(InvalidConfig): execute_over_ssh('ls', 'machine:spam')
[docs] def test_execute_without_port(self): cmd = execute_over_ssh('ls', 'machine') self.assertEqual(['ssh', 'machine', '-C', "/usr/bin/env bash -c ' ls '"], cmd)
[docs] def test_execute_with_port(self): cmd = execute_over_ssh('ls', 'machine:222') self.assertEqual(['ssh', 'machine', '-p', '222', '-C', "/usr/bin/env bash -c ' ls '"], cmd)
[docs] def test_execute_double_quotes(self): cmd = execute_over_ssh('"ls"', 'machine:222') self.assertEqual(['ssh', 'machine', '-p', '222', '-C', "/usr/bin/env bash -c ' \"ls\" '"], cmd)
[docs] def test_execute_single_quotes(self): cmd = execute_over_ssh('\'ls\'', 'machine:222') self.assertEqual(['ssh', 'machine', '-p', '222', '-C', "/usr/bin/env bash -c ' '\"'\"'ls'\"'\"' '"], cmd)
[docs]class TestExecuteCmd(ExecuteMockBase, unittest.TestCase):
[docs] def test_execute(self): device = ExecuteCmd('key', {'cmd': 'ls'}) device.execute() self.execute_mock_req.assert_called_once()
[docs] def test_execute_root(self): device = ExecuteCmd('key', {'cmd': 'ls', 'user': 'root'}) device.execute(True) self.execute_mock_req.assert_called_once()
[docs] def test_execute_root_error(self): device = ExecuteCmd('key', {'cmd': 'ls', 'user': 'root'}) with self.assertRaises(SecurityException): device.execute(False) self.execute_mock_req.assert_not_called()
[docs] def test_user_on_ssh(self): device = ExecuteCmd('key', {'cmd': 'ls', 'user': 'root', 'ssh': 'machine'}) with self.assertRaises(InvalidConfig): device.execute() self.execute_mock_req.assert_not_called()
[docs] def test_ssh(self): device = ExecuteCmd('key', {'cmd': 'ls', 'ssh': 'machine'}) with mock_patch('amazon_dash.execute.execute_over_ssh') as execute_over_ssh_mock: device.execute() execute_over_ssh_mock.assert_called_once() self.execute_mock_req.assert_called_once()
# def test_execute_cmd_start(self): # with mock_patch.object(Thread, 'start') as start_mock: # execute_cmd('ls', '/tmp') # start_mock.assert_called_once() # @mock_patch('subprocess.Popen', autospec=True) # def test_check_execution_success(self, popen_mock): # popen_mock.return_value = Mock() # popen_mock_obj = popen_mock.return_value # # popen_mock_obj.communicate.return_value = ("OUT", "ERR") # popen_mock_obj.returncode = 1 # with mock_patch.object(logger, 'error') as logger_error: # check_execution_success('ls', '/tmp') # logger_error.assert_called_once()
[docs] def test_get_shell(self): self.assertEqual(get_shell('/usr/bin/command'), ['/usr/bin/command']) self.assertEqual(get_shell('command'), ['/usr/bin/env', 'command'])
[docs]class TestExecuteUrl(unittest.TestCase): no_body_methods = ['get', 'head', 'delete', 'connect', 'options', 'trace'] url = 'http://domain.com'
[docs] def setUp(self): super(TestExecuteUrl, self).setUp() self.session_mock = requests_mock.Mocker() self.session_mock.start() self.get_mock = self.session_mock.get(self.url) pass
[docs] def get_default_data(self): return { 'url': self.url, }
[docs] def test_content_type_invalid_method(self): data = dict(self.get_default_data(), **{ 'content-type': 'form', }) for method in self.no_body_methods: with self.assertRaises(InvalidConfig): ExecuteUrl('key', dict(data, method=method)).validate()
[docs] def test_body_invalid_method(self): data = dict(self.get_default_data(), **{ 'body': 'foo', }) for method in self.no_body_methods: with self.assertRaises(InvalidConfig): ExecuteUrl('key', dict(data, method=method)).validate()
[docs] def test_form_data(self): data = {'foo': 'bar'} execute_url = ExecuteUrl('key', dict(self.get_default_data(), method='post', body=json.dumps(data))) execute_url.validate() self.assertEqual(execute_url.data['body'], data)
[docs] def test_form_invalid_data(self): execute_url = ExecuteUrl('key', dict(self.get_default_data(), method='post', body='{{inval')) with self.assertRaises(InvalidConfig): execute_url.validate()
[docs] def test_execute(self): execute_url = ExecuteUrl('key', self.get_default_data()) execute_url.validate() execute_url.execute() self.assertTrue(self.get_mock.called_once)
[docs] def test_execute_headers(self): self.session_mock.post(self.url, request_headers={'authorization': 'foo'}) execute_url = ExecuteUrl('key', dict(self.get_default_data(), method='post', **{'headers': {'authorization': 'foo'}})) execute_url.validate() execute_url.execute() execute_url2 = ExecuteUrl('key', dict(self.get_default_data(), method='post', **{'headers': {'authorization': 'bar'}})) execute_url2.validate() with self.assertRaises(NoMockAddress): execute_url2.execute()
[docs] def test_execute_content_type(self): self.session_mock.post(self.url, request_headers={'content-type': 'foo'}) execute_url = ExecuteUrl('key', dict(self.get_default_data(), method='post', **{'content-type': 'foo'})) execute_url.validate() execute_url.execute() execute_url2 = ExecuteUrl('key', dict(self.get_default_data(), method='post', **{'content-type': 'bar'})) execute_url2.validate() with self.assertRaises(NoMockAddress): execute_url2.execute()
[docs] def test_execute_body(self): self.session_mock.post(self.url, additional_matcher=lambda r: r.body == b'foo') execute_url = ExecuteUrl('key', dict(self.get_default_data(), method='post', body='foo', **{'content-type': 'plain'})) execute_url.validate() execute_url.execute() execute_url2 = ExecuteUrl('key', dict(self.get_default_data(), method='post', body='bar', **{'content-type': 'plain'})) execute_url2.validate() with self.assertRaises(NoMockAddress): execute_url2.execute()
[docs] def test_execute_exception(self): self.session_mock.post(self.url, exc=requests.exceptions.ConnectTimeout) execute_url = ExecuteUrl('key', dict(self.get_default_data(), method='post', body='foo', **{'content-type': 'plain'})) execute_url.validate() with self.assertRaises(ExecuteError): execute_url.execute()
[docs] def test_execute_400(self): self.session_mock.post(self.url, status_code=400) execute_url = ExecuteUrl('key', dict(self.get_default_data(), method='post', **{'content-type': 'plain'})) execute_url.validate() with self.assertRaises(ExecuteError): execute_url.execute()
[docs] def test_authorization(self): auth = b'Basic ' + base64.b64encode(b'foo:bar') auth = auth.decode('utf-8') if sys.version_info > (3,) else auth self.session_mock.post(self.url, request_headers={'Authorization': auth}) execute_url = ExecuteUrl('key', dict(self.get_default_data(), method='post', auth='foo:bar')) execute_url.validate() execute_url.execute()
[docs] def test_verify(self): ExecuteUrl('key', dict(self.get_default_data(), verify=False)).execute() self.assertFalse(self.session_mock.last_request.verify)
[docs] def tearDown(self): super(TestExecuteUrl, self).tearDown() self.session_mock.stop()
[docs]class TestExecuteHomeAssistant(unittest.TestCase): path = '/api/events/test' url = 'http://localhost:8123' + path
[docs] def default_data(self, address='localhost', event='test', extra_data=None): return dict({ 'homeassistant': address, 'event': event, }, **extra_data or {})
[docs] def test_no_event(self): with self.assertRaises(InvalidConfig): ExecuteHomeAssistant('key', self.default_data(event=''))
[docs] def test_only_address(self): assis = ExecuteHomeAssistant('key', self.default_data()) self.assertIn('url', assis.data) self.assertEqual(assis.data['url'], self.url)
[docs] def test_include_address_protocol(self): assis = ExecuteHomeAssistant('key', self.default_data('https://localhost')) self.assertIn('url', assis.data) self.assertEqual(assis.data['url'], 'https://localhost:8123' + self.path)
[docs] def test_include_address_port(self): assis = ExecuteHomeAssistant('key', self.default_data('localhost:7123')) self.assertIn('url', assis.data) self.assertEqual(assis.data['url'], 'http://localhost:7123' + self.path)
[docs] def test_full_address(self): assis = ExecuteHomeAssistant('key', self.default_data('https://localhost:7123')) self.assertIn('url', assis.data) self.assertEqual(assis.data['url'], 'https://localhost:7123' + self.path)
[docs] def test_execute(self): with requests_mock.mock() as m: m.post(self.url, text='success') assis = ExecuteHomeAssistant('key', self.default_data()) assis.execute() self.assertTrue(m.called_once)
[docs] def test_execute_with_access(self): with requests_mock.mock() as m: m.post(self.url, text='success', request_headers={'x-ha-access': 'password'}) assis = ExecuteHomeAssistant('key', self.default_data(extra_data={'access': 'password'})) assis.execute() self.assertTrue(m.called_once)
[docs] def test_execute_with_access_token(self): with requests_mock.mock() as m: m.post(self.url, text='success', request_headers={'Authorization': 'Bearer abcde12345'}) assis = ExecuteHomeAssistant('key', self.default_data(extra_data={'access_token': 'abcde12345'})) assis.execute() self.assertTrue(m.called_once)
[docs]class TestExecuteOpenHab(unittest.TestCase): path = '/rest/items/test' url = 'http://localhost:8080' + path
[docs] def default_data(self, address='localhost', item='test'): return { 'openhab': address, 'item': item, }
[docs] def test_execute(self): with requests_mock.mock() as m: m.post(self.url, text='success', request_headers={'Content-Type': 'text/plain', 'Accept': 'application/json'}) assis = ExecuteOpenHab('key', self.default_data()) assis.execute() self.assertTrue(m.called_once)
[docs]class TestExecuteIFTTT(unittest.TestCase):
[docs] def default_data(self): return { 'ifttt': 'foobarspam' * 5, 'event': 'myevent', }
[docs] def test_execute(self): data = self.default_data() with requests_mock.mock() as m: m.post(ExecuteIFTTT.url_pattern.format(key=data['ifttt'], **data)) assis = ExecuteIFTTT('key', data) assis.execute() self.assertTrue(m.called_once)
[docs] def test_key_required(self): data = self.default_data() data['ifttt'] = '' with self.assertRaises(InvalidConfig): ExecuteIFTTT('key', data)
[docs] def test_event_required(self): data = self.default_data() data['event'] = '' with self.assertRaises(InvalidConfig): ExecuteIFTTT('key', data)