OLD | NEW |
(Empty) | |
| 1 # This Source Code Form is subject to the terms of the Mozilla Public |
| 2 # License, v. 2.0. If a copy of the MPL was not distributed with this |
| 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| 4 |
| 5 import io |
| 6 import os |
| 7 import pytest |
| 8 import shutil |
| 9 import subprocess |
| 10 from zipfile import ZipFile |
| 11 try: # Python 3+ |
| 12 from unittest import mock |
| 13 except ImportError: # Python 2.7 |
| 14 import mock |
| 15 |
| 16 from watchextensions import (OMAHA_URL, ConfigurationError, ExtWatcher, |
| 17 read_config) |
| 18 |
| 19 |
| 20 @pytest.fixture |
| 21 def remote_local(tmpdir): |
| 22 remote = tmpdir.mkdir('remote') |
| 23 subprocess.check_output(['git', 'init', '--bare'], cwd=str(remote)) |
| 24 |
| 25 local = tmpdir.mkdir('repository') |
| 26 subprocess.check_output(['git', 'clone', str(remote), str(local)]) |
| 27 |
| 28 local.join('a.txt').write('foo\nbar') |
| 29 |
| 30 subprocess.check_output(['git', 'add', '--all'], cwd=str(local)) |
| 31 subprocess.check_output(['git', 'commit', '-m', '0.1'], cwd=str(local)) |
| 32 subprocess.check_output(['git', 'push', 'origin', 'master'], |
| 33 cwd=str(local)) |
| 34 |
| 35 return remote |
| 36 |
| 37 |
| 38 @pytest.fixture |
| 39 def test_config(tmpdir, remote_local): |
| 40 config = tmpdir.join('test.ini') |
| 41 config.write(""" |
| 42 [extensions] |
| 43 test_id=abc |
| 44 test_repository={} |
| 45 [enabled] |
| 46 test= |
| 47 """.format(str(remote_local))) |
| 48 return str(config) |
| 49 |
| 50 |
| 51 @pytest.fixture |
| 52 def errorneous_configs(tmpdir): |
| 53 folder = tmpdir.mkdir('configs') |
| 54 no_section = folder.join('no_section.ini') |
| 55 no_section.write('[enabled]\ntest=') |
| 56 no_option = folder.join('no_option.ini') |
| 57 no_option.write('[extensions]\na_id=a\n[enabled]\na=') |
| 58 empty_option = folder.join('empty_option.ini') |
| 59 empty_option.write('[extensions]\na_id=a\na_repository=\n[enabled]\na=') |
| 60 |
| 61 return str(no_section), str(no_option), str(empty_option) |
| 62 |
| 63 |
| 64 @pytest.fixture |
| 65 def crx(tmpdir): |
| 66 v2_path = str(tmpdir.join('v2.crx')) |
| 67 |
| 68 with ZipFile(v2_path, 'w') as fp: |
| 69 fp.writestr('a.txt', 'foo\nbaz') |
| 70 |
| 71 return str(v2_path) |
| 72 |
| 73 |
| 74 def mock_urlopen(*args, **kwargs): |
| 75 mocked_xml = ('<gupdate xmlns="http://www.google.com/update2/response" ' |
| 76 'protocol="2.0" server="prod"><app appid="abc" cohort="" ' |
| 77 'cohortname="" status="ok"><updatecheck version="0.2"/>' |
| 78 '</app></gupdate>').encode() |
| 79 if OMAHA_URL in args: |
| 80 fp = io.open(os.path.join(os.path.dirname(__file__), |
| 81 'data', |
| 82 'omaha.json'), |
| 83 'rt') |
| 84 mocked = fp.read().encode() |
| 85 fp.close() |
| 86 else: |
| 87 mocked = mocked_xml |
| 88 |
| 89 buf = io.BytesIO() |
| 90 buf.write(mocked) |
| 91 buf.seek(0) |
| 92 return buf |
| 93 |
| 94 |
| 95 def test_download_push_and_diff(crx, test_config, tmpdir, remote_local): |
| 96 def mock_urlretrieve(url, filename, *args, **kwargs): |
| 97 shutil.copyfile(crx, filename) |
| 98 return None |
| 99 |
| 100 config = read_config(test_config) |
| 101 with ExtWatcher('test', config, True, True) as watcher: |
| 102 with mock.patch('watchextensions.urllib.urlopen', |
| 103 side_effect=mock_urlopen): |
| 104 with mock.patch('watchextensions.urllib.urlretrieve', |
| 105 side_effect=mock_urlretrieve): |
| 106 watcher.run() |
| 107 |
| 108 diff = subprocess.check_output(['git', 'show'], cwd=str(remote_local)) |
| 109 assert '-bar'.encode() in diff |
| 110 assert '+baz'.encode() in diff |
| 111 |
| 112 |
| 113 def test_config_errors(tmpdir, errorneous_configs, caplog): |
| 114 no_section, no_option, empty_option = errorneous_configs |
| 115 |
| 116 with pytest.raises(ConfigurationError): |
| 117 read_config(os.path.join(str(tmpdir), 'nonextistent')) |
| 118 |
| 119 for cfg, msg in [(no_section, 'Section [extensions] is missing!'), |
| 120 (no_option, '"a" is not fully configured!'), |
| 121 (empty_option, '"a" is not fully configured!')]: |
| 122 |
| 123 config = read_config(cfg) |
| 124 with ExtWatcher('a', config, False, False) as watcher: |
| 125 watcher.run() |
| 126 |
| 127 records = caplog.get_records('call') |
| 128 |
| 129 assert msg in records[-1].message |
OLD | NEW |