| Left: | ||
| Right: |
| LEFT | RIGHT |
|---|---|
| 1 # This Source Code Form is subject to the terms of the Mozilla Public | 1 # This Source Code Form is subject to the terms of the Mozilla Public |
| 2 # License, v. 2.0. If a copy of the MPL was not distributed with this | 2 # License, v. 2.0. If a copy of the MPL was not distributed with this |
| 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. | 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. |
| 4 | 4 |
| 5 import io | 5 import io |
| 6 import os | 6 import os |
| 7 import pytest | 7 import pytest |
| 8 import shutil | 8 import shutil |
| 9 import subprocess | 9 import subprocess |
| 10 from zipfile import ZipFile | 10 from zipfile import ZipFile |
| 11 try: # Python 3+ | 11 try: # Python 3+ |
| 12 from unittest import mock | 12 from unittest import mock |
| 13 except ImportError: # Python 2.7 | 13 except ImportError: # Python 2.7 |
| 14 import mock | 14 import mock |
| 15 | 15 |
| 16 from watchextensions import ConfigurationError, ExtWatcher, read_config, urllib | 16 from watchextensions import (OMAHA_URL, ConfigurationError, ExtWatcher, |
| 17 read_config) | |
| 17 | 18 |
| 18 | 19 |
| 19 @pytest.fixture | 20 @pytest.fixture |
| 20 def remote_local(tmpdir): | 21 def remote_local(tmpdir): |
| 21 remote = tmpdir.mkdir('remote') | 22 remote = tmpdir.mkdir('remote') |
| 22 subprocess.check_output(['git', 'init', '--bare'], cwd=str(remote)) | 23 subprocess.check_output(['git', 'init', '--bare'], cwd=str(remote)) |
| 23 | 24 |
| 24 local = tmpdir.mkdir('repository') | 25 local = tmpdir.mkdir('repository') |
| 25 subprocess.check_output(['git', 'clone', str(remote), str(local)]) | 26 subprocess.check_output(['git', 'clone', str(remote), str(local)]) |
| 26 | 27 |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 63 @pytest.fixture | 64 @pytest.fixture |
| 64 def crx(tmpdir): | 65 def crx(tmpdir): |
| 65 v2_path = str(tmpdir.join('v2.crx')) | 66 v2_path = str(tmpdir.join('v2.crx')) |
| 66 | 67 |
| 67 with ZipFile(v2_path, 'w') as fp: | 68 with ZipFile(v2_path, 'w') as fp: |
| 68 fp.writestr('a.txt', 'foo\nbaz') | 69 fp.writestr('a.txt', 'foo\nbaz') |
| 69 | 70 |
| 70 return str(v2_path) | 71 return str(v2_path) |
| 71 | 72 |
| 72 | 73 |
| 73 orig_urlopen = urllib.urlopen | |
| 74 | |
| 75 | |
| 76 def mock_urlopen(*args, **kwargs): | 74 def mock_urlopen(*args, **kwargs): |
| 77 mocked_xml = ('<gupdate xmlns="http://www.google.com/update2/response" ' | 75 mocked_xml = ('<gupdate xmlns="http://www.google.com/update2/response" ' |
| 78 'protocol="2.0" server="prod"><app appid="abc" cohort="" ' | 76 'protocol="2.0" server="prod"><app appid="abc" cohort="" ' |
| 79 'cohortname="" status="ok"><updatecheck version="0.2"/>' | 77 'cohortname="" status="ok"><updatecheck version="0.2"/>' |
| 80 '</app></gupdate>').encode() | 78 '</app></gupdate>').encode() |
| 81 if 'https://omahaproxy.appspot.com/all.json?os=win' in args: | 79 if OMAHA_URL in args: |
| 82 return orig_urlopen(*args) | 80 fp = io.open(os.path.join(os.path.dirname(__file__), |
|
Vasily Kuznetsov
2018/05/15 16:49:31
This makes the test hit that API every time it's r
tlucas
2018/05/16 09:52:37
No, you are right. I'm also mocking this now.
| |
| 81 'data', | |
| 82 'omaha.json'), | |
| 83 'rt') | |
| 84 mocked = fp.read().encode() | |
| 85 fp.close() | |
| 86 else: | |
| 87 mocked = mocked_xml | |
| 88 | |
| 83 buf = io.BytesIO() | 89 buf = io.BytesIO() |
| 84 buf.write(mocked_xml) | 90 buf.write(mocked) |
| 85 buf.seek(0) | 91 buf.seek(0) |
| 86 return buf | 92 return buf |
| 87 | 93 |
| 88 | 94 |
| 89 def test_download_push_and_diff(crx, test_config, tmpdir, remote_local): | 95 def test_download_push_and_diff(crx, test_config, tmpdir, remote_local): |
| 90 def mock_urlretrieve(url, filename, *args, **kwargs): | 96 def mock_urlretrieve(url, filename, *args, **kwargs): |
| 91 shutil.copyfile(crx, filename) | 97 shutil.copyfile(crx, filename) |
| 92 return None | 98 return None |
| 93 | 99 |
| 94 config = read_config(test_config) | 100 config = read_config(test_config) |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 114 (no_option, '"a" is not fully configured!'), | 120 (no_option, '"a" is not fully configured!'), |
| 115 (empty_option, '"a" is not fully configured!')]: | 121 (empty_option, '"a" is not fully configured!')]: |
| 116 | 122 |
| 117 config = read_config(cfg) | 123 config = read_config(cfg) |
| 118 with ExtWatcher('a', config, False, False) as watcher: | 124 with ExtWatcher('a', config, False, False) as watcher: |
| 119 watcher.run() | 125 watcher.run() |
| 120 | 126 |
| 121 records = caplog.get_records('call') | 127 records = caplog.get_records('call') |
| 122 | 128 |
| 123 assert msg in records[-1].message | 129 assert msg in records[-1].message |
| LEFT | RIGHT |