OLD | NEW |
(Empty) | |
| 1 # This file is part of the Adblock Plus web scripts, |
| 2 # Copyright (C) 2006-present eyeo GmbH |
| 3 # |
| 4 # Adblock Plus is free software: you can redistribute it and/or modify |
| 5 # it under the terms of the GNU General Public License version 3 as |
| 6 # published by the Free Software Foundation. |
| 7 # |
| 8 # Adblock Plus is distributed in the hope that it will be useful, |
| 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 11 # GNU General Public License for more details. |
| 12 # |
| 13 # You should have received a copy of the GNU General Public License |
| 14 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. |
| 15 |
| 16 from __future__ import unicode_literals |
| 17 |
| 18 import subprocess |
| 19 import pytest |
| 20 import json |
| 21 |
| 22 from wsgi_intercept.interceptor import Httplib2Interceptor |
| 23 |
| 24 import sitescripts.oauth2dl.bin.constants as cnts |
| 25 from sitescripts.oauth2dl.test.dummy_wsgi_app import main as intercept_app |
| 26 from sitescripts.oauth2dl.bin.oauth2dl import download_file |
| 27 |
| 28 |
| 29 def get_valid_keyfile(): |
| 30 return { |
| 31 'private_key_id': 6, |
| 32 'private_key': cnts.DUMMY_PRIVATE_KEY, |
| 33 'client_email': 'firstpart@secondpart.com', |
| 34 'client_id': '8', |
| 35 'type': 'service_account', |
| 36 } |
| 37 |
| 38 |
| 39 def get_intercept_app(): |
| 40 """Return the intercepting WSGI application.""" |
| 41 return intercept_app |
| 42 |
| 43 |
| 44 def write_to_json(data, path): |
| 45 """Write data to JSON.""" |
| 46 with open(str(path), 'w') as f: |
| 47 json.dump(data, f) |
| 48 |
| 49 |
| 50 @pytest.fixture |
| 51 def rootdir(tmpdir): |
| 52 """Directory with prepared key and downloadable files.""" |
| 53 rootdir = tmpdir.join('root') |
| 54 rootdir.mkdir() |
| 55 |
| 56 # Keyfile missing a key - private_key_id |
| 57 invalid_keyfile_path = rootdir.join('keyfile_missing_key.json') |
| 58 data = get_valid_keyfile() |
| 59 data.pop('private_key') |
| 60 write_to_json(data, str(invalid_keyfile_path)) |
| 61 |
| 62 # Keyfile with invalid private key |
| 63 invalid_keyfile_path = rootdir.join('keyfile_invalid_private_key.json') |
| 64 data = get_valid_keyfile() |
| 65 data['private_key'] = data['private_key'][:-10] |
| 66 write_to_json(data, str(invalid_keyfile_path)) |
| 67 |
| 68 # Keyfile with wrong value for 'type' |
| 69 invalid_keyfile_path = rootdir.join('keyfile_invalid_type.json') |
| 70 data = get_valid_keyfile() |
| 71 data['type'] = 'invalid' |
| 72 write_to_json(data, str(invalid_keyfile_path)) |
| 73 |
| 74 # Valid (dummy) keyfile |
| 75 valid_keyfile_path = rootdir.join('good_keyfile.json') |
| 76 write_to_json(get_valid_keyfile(), str(valid_keyfile_path)) |
| 77 |
| 78 # Downloadable file |
| 79 rootdir.join('file_to_download').write('Success!') |
| 80 |
| 81 # Downloadable file with utf-8 characters |
| 82 rootdir.join('file_to_download_utf8').write('Ok \u1234'.encode('utf-8'), |
| 83 mode='wb') |
| 84 |
| 85 return rootdir |
| 86 |
| 87 |
| 88 @pytest.fixture |
| 89 def dstfile(tmpdir): |
| 90 """Destination file for saving the downloaded whitelist.""" |
| 91 return tmpdir.join('dst') |
| 92 |
| 93 |
| 94 def run_script(*args, **kw): |
| 95 """Run download script with given arguments and return its output.""" |
| 96 try: |
| 97 cmd = kw.pop('cmd') |
| 98 except KeyError: |
| 99 cmd = 'python -m sitescripts.oauth2dl.bin.oauth2dl' |
| 100 |
| 101 cmd = [cmd] + list(args) |
| 102 cmd = ' '.join(cmd) |
| 103 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, |
| 104 stderr=subprocess.PIPE, shell=True, **kw) |
| 105 |
| 106 stdout, stderr = proc.communicate() |
| 107 |
| 108 return proc.returncode, stderr.decode('utf-8'), stdout.decode('utf-8') |
| 109 |
| 110 |
| 111 @pytest.mark.parametrize('args_in, expected_stderr, expected_code', [ |
| 112 ((), 'usage: oauth2dl.py [-h] [-k KEY] [-s SCOPE] [-o O] url', 2), |
| 113 (('www.test.com',), cnts.KEYFILE_NOT_FOUND_ERROR, 1), |
| 114 (('www.test.com', '-k', 'test.json'), cnts.SCOPE_NOT_FOUND_ERROR, 1), |
| 115 (('www.test.com', '-k', 'test.json', '-s', 'test'), |
| 116 "No such file or directory: 'test.json'", 1), |
| 117 ]) |
| 118 def test_error_messages(args_in, expected_stderr, expected_code): |
| 119 """Testing that appropriate error messages are provided.""" |
| 120 code, stderr, _ = run_script(*args_in) |
| 121 |
| 122 assert code == expected_code |
| 123 assert expected_stderr in stderr |
| 124 |
| 125 |
| 126 def test_extracting_from_environment_vars(): |
| 127 """Test if it uses the environment variables if none are provided.""" |
| 128 test_env = {'OAUTH2_KEY': 'env_test.json', |
| 129 'OAUTH2_SCOPE': 'env_test_scope'} |
| 130 _, stderr, _ = run_script('www.test.com', env=test_env) |
| 131 |
| 132 assert cnts.KEYFILE_NOT_FOUND_ERROR not in stderr |
| 133 assert cnts.SCOPE_NOT_FOUND_ERROR not in stderr |
| 134 |
| 135 |
| 136 @pytest.mark.parametrize('key, expected_stderr, expected_code', [ |
| 137 ('keyfile_missing_key.json', 'Invalid key file format!', 1), |
| 138 ('keyfile_invalid_private_key.json', 'invalid_client: The OAuth ' |
| 139 'client was not found.', 1), |
| 140 ('keyfile_invalid_type.json', "('Unexpected credentials type', u'invalid'," |
| 141 " 'Expected', 'service_account')", 1), |
| 142 ('good_keyfile.json', 'invalid_client: The OAuth client was not found.', |
| 143 1), |
| 144 ]) |
| 145 def test_keyfile_errors(rootdir, key, expected_stderr, expected_code): |
| 146 """Testing how the script handles key file-related error messages. |
| 147 |
| 148 Connects to the actual google API, using set of dummy key files. |
| 149 """ |
| 150 keyfile_path = rootdir.join(key) |
| 151 |
| 152 code, stderr, _ = run_script('www.test.com', '-k', str(keyfile_path), '-s', |
| 153 'test') |
| 154 |
| 155 assert code == expected_code |
| 156 assert expected_stderr in stderr |
| 157 |
| 158 |
| 159 @pytest.mark.parametrize('file, expected', [ |
| 160 ('file_to_download', 'Success!'), |
| 161 ('file_to_download_utf8', '\u1234'), |
| 162 ]) |
| 163 def test_download(rootdir, file, expected): |
| 164 """Test authenticating and downloading a file. |
| 165 |
| 166 Uses a local server that simulates the interaction with the google API |
| 167 """ |
| 168 keyfile_path = str(rootdir.join('good_keyfile.json')) |
| 169 url = 'https://www.googleapis.com/download?path={0}'.format( |
| 170 str(rootdir.join(file)), |
| 171 ) |
| 172 scope = 'www.googleapis.com' |
| 173 |
| 174 with Httplib2Interceptor(get_intercept_app, host='www.googleapis.com', |
| 175 port=443): |
| 176 _, data = download_file(url, keyfile_path, scope) |
| 177 |
| 178 assert expected in data |
| 179 |
| 180 |
| 181 def test_download_wrong_url(rootdir): |
| 182 """Test authenticating and trying to download a file from an invalid url. |
| 183 |
| 184 Uses a local server that simulates the interaction with the google API. |
| 185 """ |
| 186 keyfile_path = str(rootdir.join('good_keyfile.json')) |
| 187 url = 'https://www.googleapis.com/download?path={0}'.format( |
| 188 str(rootdir.join('inexistent_file'))) |
| 189 scope = 'www.googleapis.com' |
| 190 |
| 191 with Httplib2Interceptor(get_intercept_app, host='www.googleapis.com', |
| 192 port=443): |
| 193 headers, data = download_file(url, keyfile_path, scope) |
| 194 |
| 195 assert 'NOT FOUND' in data.upper() |
| 196 assert headers['status'] == '404' |
OLD | NEW |