| LEFT | RIGHT | 
|---|
| (no file at all) |  | 
|  | 1 # This Source Code Form is subject to the terms of the Mozilla Public | 
|  | 2 # License, v. 2.0. If a copy of the MPL was not distributed with this | 
|  | 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. | 
|  | 4 | 
|  | 5 import io | 
|  | 6 import os | 
|  | 7 import pytest | 
|  | 8 import shutil | 
|  | 9 import subprocess | 
|  | 10 from zipfile import ZipFile | 
|  | 11 try:  # Python 3+ | 
|  | 12     from unittest import mock | 
|  | 13 except ImportError:  # Python 2.7 | 
|  | 14     import mock | 
|  | 15 | 
|  | 16 from watchextensions import (OMAHA_URL, ConfigurationError, ExtWatcher, | 
|  | 17                              read_config) | 
|  | 18 | 
|  | 19 | 
|  | 20 @pytest.fixture | 
|  | 21 def remote_local(tmpdir): | 
|  | 22     remote = tmpdir.mkdir('remote') | 
|  | 23     subprocess.check_output(['git', 'init', '--bare'], cwd=str(remote)) | 
|  | 24 | 
|  | 25     local = tmpdir.mkdir('repository') | 
|  | 26     subprocess.check_output(['git', 'clone', str(remote), str(local)]) | 
|  | 27 | 
|  | 28     local.join('a.txt').write('foo\nbar') | 
|  | 29 | 
|  | 30     subprocess.check_output(['git', 'add', '--all'], cwd=str(local)) | 
|  | 31     subprocess.check_output(['git', 'commit', '-m', '0.1'], cwd=str(local)) | 
|  | 32     subprocess.check_output(['git', 'push', 'origin', 'master'], | 
|  | 33                             cwd=str(local)) | 
|  | 34 | 
|  | 35     return remote | 
|  | 36 | 
|  | 37 | 
|  | 38 @pytest.fixture | 
|  | 39 def test_config(tmpdir, remote_local): | 
|  | 40     config = tmpdir.join('test.ini') | 
|  | 41     config.write(""" | 
|  | 42     [extensions] | 
|  | 43     test_id=abc | 
|  | 44     test_repository={} | 
|  | 45     [enabled] | 
|  | 46     test= | 
|  | 47     """.format(str(remote_local))) | 
|  | 48     return str(config) | 
|  | 49 | 
|  | 50 | 
|  | 51 @pytest.fixture | 
|  | 52 def errorneous_configs(tmpdir): | 
|  | 53     folder = tmpdir.mkdir('configs') | 
|  | 54     no_section = folder.join('no_section.ini') | 
|  | 55     no_section.write('[enabled]\ntest=') | 
|  | 56     no_option = folder.join('no_option.ini') | 
|  | 57     no_option.write('[extensions]\na_id=a\n[enabled]\na=') | 
|  | 58     empty_option = folder.join('empty_option.ini') | 
|  | 59     empty_option.write('[extensions]\na_id=a\na_repository=\n[enabled]\na=') | 
|  | 60 | 
|  | 61     return str(no_section), str(no_option), str(empty_option) | 
|  | 62 | 
|  | 63 | 
|  | 64 @pytest.fixture | 
|  | 65 def crx(tmpdir): | 
|  | 66     v2_path = str(tmpdir.join('v2.crx')) | 
|  | 67 | 
|  | 68     with ZipFile(v2_path, 'w') as fp: | 
|  | 69         fp.writestr('a.txt', 'foo\nbaz') | 
|  | 70 | 
|  | 71     return str(v2_path) | 
|  | 72 | 
|  | 73 | 
|  | 74 def mock_urlopen(*args, **kwargs): | 
|  | 75     mocked_xml = ('<gupdate xmlns="http://www.google.com/update2/response" ' | 
|  | 76                   'protocol="2.0" server="prod"><app appid="abc" cohort="" ' | 
|  | 77                   'cohortname="" status="ok"><updatecheck version="0.2"/>' | 
|  | 78                   '</app></gupdate>').encode() | 
|  | 79     if OMAHA_URL in args: | 
|  | 80         fp = io.open(os.path.join(os.path.dirname(__file__), | 
|  | 81                                   'data', | 
|  | 82                                   'omaha.json'), | 
|  | 83                      'rt') | 
|  | 84         mocked = fp.read().encode() | 
|  | 85         fp.close() | 
|  | 86     else: | 
|  | 87         mocked = mocked_xml | 
|  | 88 | 
|  | 89     buf = io.BytesIO() | 
|  | 90     buf.write(mocked) | 
|  | 91     buf.seek(0) | 
|  | 92     return buf | 
|  | 93 | 
|  | 94 | 
|  | 95 def test_download_push_and_diff(crx, test_config, tmpdir, remote_local): | 
|  | 96     def mock_urlretrieve(url, filename, *args, **kwargs): | 
|  | 97         shutil.copyfile(crx, filename) | 
|  | 98         return None | 
|  | 99 | 
|  | 100     config = read_config(test_config) | 
|  | 101     with ExtWatcher('test', config, True, True) as watcher: | 
|  | 102         with mock.patch('watchextensions.urllib.urlopen', | 
|  | 103                         side_effect=mock_urlopen): | 
|  | 104             with mock.patch('watchextensions.urllib.urlretrieve', | 
|  | 105                             side_effect=mock_urlretrieve): | 
|  | 106                 watcher.run() | 
|  | 107 | 
|  | 108     diff = subprocess.check_output(['git', 'show'], cwd=str(remote_local)) | 
|  | 109     assert '-bar'.encode() in diff | 
|  | 110     assert '+baz'.encode() in diff | 
|  | 111 | 
|  | 112 | 
|  | 113 def test_config_errors(tmpdir, errorneous_configs, caplog): | 
|  | 114     no_section, no_option, empty_option = errorneous_configs | 
|  | 115 | 
|  | 116     with pytest.raises(ConfigurationError): | 
|  | 117         read_config(os.path.join(str(tmpdir), 'nonextistent')) | 
|  | 118 | 
|  | 119     for cfg, msg in [(no_section, 'Section [extensions] is missing!'), | 
|  | 120                      (no_option, '"a" is not fully configured!'), | 
|  | 121                      (empty_option, '"a" is not fully configured!')]: | 
|  | 122 | 
|  | 123         config = read_config(cfg) | 
|  | 124         with ExtWatcher('a', config, False, False) as watcher: | 
|  | 125             watcher.run() | 
|  | 126 | 
|  | 127         records = caplog.get_records('call') | 
|  | 128 | 
|  | 129         assert msg in records[-1].message | 
| LEFT | RIGHT | 
|---|