| OLD | NEW | 
| (Empty) |  | 
 |    1 # This file is part of the Adblock Plus web scripts, | 
 |    2 # Copyright (C) 2006-present eyeo GmbH | 
 |    3 # | 
 |    4 # Adblock Plus is free software: you can redistribute it and/or modify | 
 |    5 # it under the terms of the GNU General Public License version 3 as | 
 |    6 # published by the Free Software Foundation. | 
 |    7 # | 
 |    8 # Adblock Plus is distributed in the hope that it will be useful, | 
 |    9 # but WITHOUT ANY WARRANTY; without even the implied warranty of | 
 |   10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
 |   11 # GNU General Public License for more details. | 
 |   12 # | 
 |   13 # You should have received a copy of the GNU General Public License | 
 |   14 # along with Adblock Plus.  If not, see <http://www.gnu.org/licenses/>. | 
 |   15  | 
 |   16 from __future__ import unicode_literals | 
 |   17  | 
 |   18 import subprocess | 
 |   19 import pytest | 
 |   20 import json | 
 |   21  | 
 |   22 from wsgi_intercept.interceptor import Httplib2Interceptor | 
 |   23  | 
 |   24 import sitescripts.oauth2dl.bin.constants as cnts | 
 |   25 from sitescripts.oauth2dl.test.dummy_wsgi_app import main as intercept_app | 
 |   26 from sitescripts.oauth2dl.bin.oauth2dl import download_file | 
 |   27  | 
 |   28  | 
 |   29 def get_valid_keyfile(): | 
 |   30     return { | 
 |   31         'private_key_id': 6, | 
 |   32         'private_key': cnts.DUMMY_PRIVATE_KEY, | 
 |   33         'client_email': 'firstpart@secondpart.com', | 
 |   34         'client_id': '8', | 
 |   35         'type': 'service_account', | 
 |   36     } | 
 |   37  | 
 |   38  | 
 |   39 def get_intercept_app(): | 
 |   40     """Return the intercepting WSGI application.""" | 
 |   41     return intercept_app | 
 |   42  | 
 |   43  | 
 |   44 def write_to_json(data, path): | 
 |   45     """Write data to JSON.""" | 
 |   46     with open(str(path), 'w') as f: | 
 |   47         json.dump(data, f) | 
 |   48  | 
 |   49  | 
 |   50 @pytest.fixture | 
 |   51 def rootdir(tmpdir): | 
 |   52     """Directory with prepared key and downloadable files.""" | 
 |   53     rootdir = tmpdir.join('root') | 
 |   54     rootdir.mkdir() | 
 |   55  | 
 |   56     # Keyfile missing a key - private_key_id | 
 |   57     invalid_keyfile_path = rootdir.join('keyfile_missing_key.json') | 
 |   58     data = get_valid_keyfile() | 
 |   59     data.pop('private_key') | 
 |   60     write_to_json(data, str(invalid_keyfile_path)) | 
 |   61  | 
 |   62     # Keyfile with invalid private key | 
 |   63     invalid_keyfile_path = rootdir.join('keyfile_invalid_private_key.json') | 
 |   64     data = get_valid_keyfile() | 
 |   65     data['private_key'] = data['private_key'][:-10] | 
 |   66     write_to_json(data, str(invalid_keyfile_path)) | 
 |   67  | 
 |   68     # Keyfile with wrong value for 'type' | 
 |   69     invalid_keyfile_path = rootdir.join('keyfile_invalid_type.json') | 
 |   70     data = get_valid_keyfile() | 
 |   71     data['type'] = 'invalid' | 
 |   72     write_to_json(data, str(invalid_keyfile_path)) | 
 |   73  | 
 |   74     # Valid (dummy) keyfile | 
 |   75     valid_keyfile_path = rootdir.join('good_keyfile.json') | 
 |   76     write_to_json(get_valid_keyfile(), str(valid_keyfile_path)) | 
 |   77  | 
 |   78     # Downloadable file | 
 |   79     rootdir.join('file_to_download').write('Success!') | 
 |   80  | 
 |   81     # Downloadable file with utf-8 characters | 
 |   82     rootdir.join('file_to_download_utf8').write('Ok \u1234'.encode('utf-8'), | 
 |   83                                                 mode='wb') | 
 |   84  | 
 |   85     return rootdir | 
 |   86  | 
 |   87  | 
 |   88 @pytest.fixture | 
 |   89 def dstfile(tmpdir): | 
 |   90     """Destination file for saving the downloaded whitelist.""" | 
 |   91     return tmpdir.join('dst') | 
 |   92  | 
 |   93  | 
 |   94 def run_script(*args, **kw): | 
 |   95     """Run download script with given arguments and return its output.""" | 
 |   96     try: | 
 |   97         cmd = kw.pop('cmd') | 
 |   98     except KeyError: | 
 |   99         cmd = 'python -m sitescripts.oauth2dl.bin.oauth2dl' | 
 |  100  | 
 |  101     cmd = [cmd] + list(args) | 
 |  102     cmd = ' '.join(cmd) | 
 |  103     proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, | 
 |  104                             stderr=subprocess.PIPE, shell=True, **kw) | 
 |  105  | 
 |  106     stdout, stderr = proc.communicate() | 
 |  107  | 
 |  108     return proc.returncode, stderr.decode('utf-8'), stdout.decode('utf-8') | 
 |  109  | 
 |  110  | 
 |  111 @pytest.mark.parametrize('args_in, expected_stderr, expected_code', [ | 
 |  112     ((), 'usage: oauth2dl.py [-h] [-k KEY] [-s SCOPE] [-o O] url', 2), | 
 |  113     (('www.test.com',), cnts.KEYFILE_NOT_FOUND_ERROR, 1), | 
 |  114     (('www.test.com', '-k', 'test.json'), cnts.SCOPE_NOT_FOUND_ERROR, 1), | 
 |  115     (('www.test.com', '-k', 'test.json', '-s', 'test'), | 
 |  116      "No such file or directory: 'test.json'", 1), | 
 |  117 ]) | 
 |  118 def test_error_messages(args_in, expected_stderr, expected_code): | 
 |  119     """Testing that appropriate error messages are provided.""" | 
 |  120     code, stderr, _ = run_script(*args_in) | 
 |  121  | 
 |  122     assert code == expected_code | 
 |  123     assert expected_stderr in stderr | 
 |  124  | 
 |  125  | 
 |  126 def test_extracting_from_environment_vars(): | 
 |  127     """Test if it uses the environment variables if none are provided.""" | 
 |  128     test_env = {'OAUTH2_KEY': 'env_test.json', | 
 |  129                 'OAUTH2_SCOPE': 'env_test_scope'} | 
 |  130     _, stderr, _ = run_script('www.test.com', env=test_env) | 
 |  131  | 
 |  132     assert cnts.KEYFILE_NOT_FOUND_ERROR not in stderr | 
 |  133     assert cnts.SCOPE_NOT_FOUND_ERROR not in stderr | 
 |  134  | 
 |  135  | 
 |  136 @pytest.mark.parametrize('key, expected_stderr, expected_code', [ | 
 |  137     ('keyfile_missing_key.json', 'Invalid key file format!', 1), | 
 |  138     ('keyfile_invalid_private_key.json', 'invalid_client: The OAuth ' | 
 |  139                                          'client was not found.', 1), | 
 |  140     ('keyfile_invalid_type.json', "('Unexpected credentials type', u'invalid'," | 
 |  141                                   " 'Expected', 'service_account')", 1), | 
 |  142     ('good_keyfile.json', 'invalid_client: The OAuth client was not found.', | 
 |  143      1), | 
 |  144 ]) | 
 |  145 def test_keyfile_errors(rootdir, key, expected_stderr, expected_code): | 
 |  146     """Testing how the script handles key file-related error messages. | 
 |  147  | 
 |  148     Connects to the actual google API, using set of dummy key files. | 
 |  149     """ | 
 |  150     keyfile_path = rootdir.join(key) | 
 |  151  | 
 |  152     code, stderr, _ = run_script('www.test.com', '-k', str(keyfile_path), '-s', | 
 |  153                                  'test') | 
 |  154  | 
 |  155     assert code == expected_code | 
 |  156     assert expected_stderr in stderr | 
 |  157  | 
 |  158  | 
 |  159 @pytest.mark.parametrize('file, expected', [ | 
 |  160     ('file_to_download', 'Success!'), | 
 |  161     ('file_to_download_utf8', '\u1234'), | 
 |  162 ]) | 
 |  163 def test_download(rootdir, file, expected): | 
 |  164     """Test authenticating and downloading a file. | 
 |  165  | 
 |  166     Uses a local server that simulates the interaction with the google API | 
 |  167     """ | 
 |  168     keyfile_path = str(rootdir.join('good_keyfile.json')) | 
 |  169     url = 'https://www.googleapis.com/download?path={0}'.format( | 
 |  170         str(rootdir.join(file)), | 
 |  171     ) | 
 |  172     scope = 'www.googleapis.com' | 
 |  173  | 
 |  174     with Httplib2Interceptor(get_intercept_app, host='www.googleapis.com', | 
 |  175                              port=443): | 
 |  176         _, data = download_file(url, keyfile_path, scope) | 
 |  177  | 
 |  178     assert expected in data | 
 |  179  | 
 |  180  | 
 |  181 def test_download_wrong_url(rootdir): | 
 |  182     """Test authenticating and trying to download a file from an invalid url. | 
 |  183  | 
 |  184     Uses a local server that simulates the interaction with the google API. | 
 |  185     """ | 
 |  186     keyfile_path = str(rootdir.join('good_keyfile.json')) | 
 |  187     url = 'https://www.googleapis.com/download?path={0}'.format( | 
 |  188         str(rootdir.join('inexistent_file'))) | 
 |  189     scope = 'www.googleapis.com' | 
 |  190  | 
 |  191     with Httplib2Interceptor(get_intercept_app, host='www.googleapis.com', | 
 |  192                              port=443): | 
 |  193         headers, data = download_file(url, keyfile_path, scope) | 
 |  194  | 
 |  195     assert 'NOT FOUND' in data.upper() | 
 |  196     assert headers['status'] == '404' | 
| OLD | NEW |