| LEFT | RIGHT | 
|---|
| 1 # This Source Code Form is subject to the terms of the Mozilla Public | 1 # This Source Code Form is subject to the terms of the Mozilla Public | 
| 2 # License, v. 2.0. If a copy of the MPL was not distributed with this | 2 # License, v. 2.0. If a copy of the MPL was not distributed with this | 
| 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. | 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. | 
| 4 | 4 | 
| 5 import io | 5 import io | 
| 6 import os | 6 import os | 
| 7 import pytest | 7 import pytest | 
| 8 import shutil | 8 import shutil | 
| 9 import subprocess | 9 import subprocess | 
| 10 from zipfile import ZipFile | 10 from zipfile import ZipFile | 
| 11 try:  # Python 3+ | 11 try:  # Python 3+ | 
| 12     from unittest import mock | 12     from unittest import mock | 
| 13 except ImportError:  # Python 2.7 | 13 except ImportError:  # Python 2.7 | 
| 14     import mock | 14     import mock | 
| 15 | 15 | 
| 16 from watchextensions import ConfigurationError, ExtWatcher, read_config, urllib | 16 from watchextensions import (OMAHA_URL, ConfigurationError, ExtWatcher, | 
|  | 17                              read_config) | 
| 17 | 18 | 
| 18 | 19 | 
| 19 @pytest.fixture | 20 @pytest.fixture | 
| 20 def remote_local(tmpdir): | 21 def remote_local(tmpdir): | 
| 21     remote = tmpdir.mkdir('remote') | 22     remote = tmpdir.mkdir('remote') | 
| 22     subprocess.check_output(['git', 'init', '--bare'], cwd=str(remote)) | 23     subprocess.check_output(['git', 'init', '--bare'], cwd=str(remote)) | 
| 23 | 24 | 
| 24     local = tmpdir.mkdir('repository') | 25     local = tmpdir.mkdir('repository') | 
| 25     subprocess.check_output(['git', 'clone', str(remote), str(local)]) | 26     subprocess.check_output(['git', 'clone', str(remote), str(local)]) | 
| 26 | 27 | 
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after  Loading... | 
| 63 @pytest.fixture | 64 @pytest.fixture | 
| 64 def crx(tmpdir): | 65 def crx(tmpdir): | 
| 65     v2_path = str(tmpdir.join('v2.crx')) | 66     v2_path = str(tmpdir.join('v2.crx')) | 
| 66 | 67 | 
| 67     with ZipFile(v2_path, 'w') as fp: | 68     with ZipFile(v2_path, 'w') as fp: | 
| 68         fp.writestr('a.txt', 'foo\nbaz') | 69         fp.writestr('a.txt', 'foo\nbaz') | 
| 69 | 70 | 
| 70     return str(v2_path) | 71     return str(v2_path) | 
| 71 | 72 | 
| 72 | 73 | 
| 73 orig_urlopen = urllib.urlopen |  | 
| 74 |  | 
| 75 |  | 
| 76 def mock_urlopen(*args, **kwargs): | 74 def mock_urlopen(*args, **kwargs): | 
| 77     mocked_xml = ('<gupdate xmlns="http://www.google.com/update2/response" ' | 75     mocked_xml = ('<gupdate xmlns="http://www.google.com/update2/response" ' | 
| 78                   'protocol="2.0" server="prod"><app appid="abc" cohort="" ' | 76                   'protocol="2.0" server="prod"><app appid="abc" cohort="" ' | 
| 79                   'cohortname="" status="ok"><updatecheck version="0.2"/>' | 77                   'cohortname="" status="ok"><updatecheck version="0.2"/>' | 
| 80                   '</app></gupdate>').encode() | 78                   '</app></gupdate>').encode() | 
| 81     if 'https://omahaproxy.appspot.com/all.json?os=win' in args: | 79     if OMAHA_URL in args: | 
| 82         return orig_urlopen(*args) | 80         fp = io.open(os.path.join(os.path.dirname(__file__), | 
|  | 81                                   'data', | 
|  | 82                                   'omaha.json'), | 
|  | 83                      'rt') | 
|  | 84         mocked = fp.read().encode() | 
|  | 85         fp.close() | 
|  | 86     else: | 
|  | 87         mocked = mocked_xml | 
|  | 88 | 
| 83     buf = io.BytesIO() | 89     buf = io.BytesIO() | 
| 84     buf.write(mocked_xml) | 90     buf.write(mocked) | 
| 85     buf.seek(0) | 91     buf.seek(0) | 
| 86     return buf | 92     return buf | 
| 87 | 93 | 
| 88 | 94 | 
| 89 def test_download_push_and_diff(crx, test_config, tmpdir, remote_local): | 95 def test_download_push_and_diff(crx, test_config, tmpdir, remote_local): | 
| 90     def mock_urlretrieve(url, filename, *args, **kwargs): | 96     def mock_urlretrieve(url, filename, *args, **kwargs): | 
| 91         shutil.copyfile(crx, filename) | 97         shutil.copyfile(crx, filename) | 
| 92         return None | 98         return None | 
| 93 | 99 | 
| 94     config = read_config(test_config) | 100     config = read_config(test_config) | 
| (...skipping 19 matching lines...) Expand all  Loading... | 
| 114                      (no_option, '"a" is not fully configured!'), | 120                      (no_option, '"a" is not fully configured!'), | 
| 115                      (empty_option, '"a" is not fully configured!')]: | 121                      (empty_option, '"a" is not fully configured!')]: | 
| 116 | 122 | 
| 117         config = read_config(cfg) | 123         config = read_config(cfg) | 
| 118         with ExtWatcher('a', config, False, False) as watcher: | 124         with ExtWatcher('a', config, False, False) as watcher: | 
| 119             watcher.run() | 125             watcher.run() | 
| 120 | 126 | 
| 121         records = caplog.get_records('call') | 127         records = caplog.get_records('call') | 
| 122 | 128 | 
| 123         assert msg in records[-1].message | 129         assert msg in records[-1].message | 
| LEFT | RIGHT | 
|---|