OLD | NEW |
(Empty) | |
| 1 # This Source Code Form is subject to the terms of the Mozilla Public |
| 2 # License, v. 2.0. If a copy of the MPL was not distributed with this |
| 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| 4 |
| 5 import io |
| 6 import os |
| 7 import pytest |
| 8 import shutil |
| 9 import subprocess |
| 10 from zipfile import ZipFile |
| 11 try: # Python 3+ |
| 12 from unittest import mock |
| 13 except ImportError: # Python 2.7 |
| 14 import mock |
| 15 |
| 16 from watchextensions import ConfigurationError, ExtWatcher, read_config, urllib |
| 17 |
| 18 |
| 19 @pytest.fixture |
| 20 def remote_local(tmpdir): |
| 21 remote = tmpdir.mkdir('remote') |
| 22 subprocess.check_output(['git', 'init', '--bare'], cwd=str(remote)) |
| 23 |
| 24 local = tmpdir.mkdir('repository') |
| 25 subprocess.check_output(['git', 'clone', str(remote), str(local)]) |
| 26 |
| 27 local.join('a.txt').write('foo\nbar') |
| 28 |
| 29 subprocess.check_output(['git', 'add', '--all'], cwd=str(local)) |
| 30 subprocess.check_output(['git', 'commit', '-m', '0.1'], cwd=str(local)) |
| 31 subprocess.check_output(['git', 'push', 'origin', 'master'], |
| 32 cwd=str(local)) |
| 33 |
| 34 return remote |
| 35 |
| 36 |
| 37 @pytest.fixture |
| 38 def test_config(tmpdir, remote_local): |
| 39 config = tmpdir.join('test.ini') |
| 40 config.write(""" |
| 41 [extensions] |
| 42 test_id=abc |
| 43 test_repository={} |
| 44 [enabled] |
| 45 test= |
| 46 """.format(str(remote_local))) |
| 47 return str(config) |
| 48 |
| 49 |
| 50 @pytest.fixture |
| 51 def errorneous_configs(tmpdir): |
| 52 folder = tmpdir.mkdir('configs') |
| 53 no_section = folder.join('no_section.ini') |
| 54 no_section.write('[enabled]\ntest=') |
| 55 no_option = folder.join('no_option.ini') |
| 56 no_option.write('[extensions]\na_id=a\n[enabled]\na=') |
| 57 empty_option = folder.join('empty_option.ini') |
| 58 empty_option.write('[extensions]\na_id=a\na_repository=\n[enabled]\na=') |
| 59 |
| 60 return str(no_section), str(no_option), str(empty_option) |
| 61 |
| 62 |
| 63 @pytest.fixture |
| 64 def crx(tmpdir): |
| 65 v2_path = str(tmpdir.join('v2.crx')) |
| 66 |
| 67 with ZipFile(v2_path, 'w') as fp: |
| 68 fp.writestr('a.txt', 'foo\nbaz') |
| 69 |
| 70 return str(v2_path) |
| 71 |
| 72 |
| 73 orig_urlopen = urllib.urlopen |
| 74 |
| 75 |
| 76 def mock_urlopen(*args, **kwargs): |
| 77 mocked_xml = ('<gupdate xmlns="http://www.google.com/update2/response" ' |
| 78 'protocol="2.0" server="prod"><app appid="abc" cohort="" ' |
| 79 'cohortname="" status="ok"><updatecheck version="0.2"/>' |
| 80 '</app></gupdate>').encode() |
| 81 if 'https://omahaproxy.appspot.com/all.json?os=win' in args: |
| 82 return orig_urlopen(*args) |
| 83 buf = io.BytesIO() |
| 84 buf.write(mocked_xml) |
| 85 buf.seek(0) |
| 86 return buf |
| 87 |
| 88 |
| 89 def test_download_push_and_diff(crx, test_config, tmpdir, remote_local): |
| 90 def mock_urlretrieve(url, filename, *args, **kwargs): |
| 91 shutil.copyfile(crx, filename) |
| 92 return None |
| 93 |
| 94 config = read_config(test_config) |
| 95 with ExtWatcher('test', config, True, True) as watcher: |
| 96 with mock.patch('watchextensions.urllib.urlopen', |
| 97 side_effect=mock_urlopen): |
| 98 with mock.patch('watchextensions.urllib.urlretrieve', |
| 99 side_effect=mock_urlretrieve): |
| 100 watcher.run() |
| 101 |
| 102 diff = subprocess.check_output(['git', 'show'], cwd=str(remote_local)) |
| 103 assert '-bar'.encode() in diff |
| 104 assert '+baz'.encode() in diff |
| 105 |
| 106 |
| 107 def test_config_errors(tmpdir, errorneous_configs, caplog): |
| 108 no_section, no_option, empty_option = errorneous_configs |
| 109 |
| 110 with pytest.raises(ConfigurationError): |
| 111 read_config(os.path.join(str(tmpdir), 'nonextistent')) |
| 112 |
| 113 for cfg, msg in [(no_section, 'Section [extensions] is missing!'), |
| 114 (no_option, '"a" is not fully configured!'), |
| 115 (empty_option, '"a" is not fully configured!')]: |
| 116 |
| 117 config = read_config(cfg) |
| 118 with ExtWatcher('a', config, False, False) as watcher: |
| 119 watcher.run() |
| 120 |
| 121 records = caplog.get_records('call') |
| 122 |
| 123 assert msg in records[-1].message |
OLD | NEW |