Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Unified Diff: cms/bin/test_server.py

Issue 29912588: Issue 7019 - [CMS] Refactor `test_server.py` (Closed)
Patch Set: Created Oct. 16, 2018, 11:42 a.m.
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | tests/test_page_outputs.py » ('j') | tests/test_page_outputs.py » ('J')
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: cms/bin/test_server.py
diff --git a/cms/bin/test_server.py b/cms/bin/test_server.py
old mode 100644
new mode 100755
index 3255cd977b8af1a4fad89f6a324f5487fba2c074..28ba5e5aebaf7aeaa1b0d6a3eee05e63674d3c58
--- a/cms/bin/test_server.py
+++ b/cms/bin/test_server.py
@@ -13,20 +13,27 @@
# You should have received a copy of the GNU General Public License
# along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
-import mimetypes
+from __future__ import print_function
+
import os
-import sys
-import argparse
+import mimetypes
+from argparse import ArgumentParser
import jinja2
+from cms.converters import converters
from cms.utils import process_page
from cms.sources import create_source
-from cms.converters import converters
-source = None
-address = None
-port = None
+
+class Parameters:
Vasily Kuznetsov 2018/10/16 13:18:23 As we discussed via IRC, let's make this whole thi
Tudor Avram 2018/10/18 13:44:05 Done.
+ source = None
+ host = None
+ port = None
+ base_url = None
+
+
+MIME_FORMAT = '{}'
UNICODE_ENCODING = 'utf-8'
@@ -47,127 +54,267 @@ ERROR_TEMPLATE = '''
</body>
</html>'''
-# Initilize the mimetypes modules manually for consistent behavior,
+# Initialize the mimetypes modules manually for consistent behavior,
# ignoring local files and Windows Registry.
mimetypes.init([])
+def get_data(path):
+ """Read the data corresponding to a page.
+
+ Parameters
+ ----------
+ path: str
+ The path to the page to get the data for.
+
+ Returns
+ -------
+ str/ bytes
+ The data corresponding to the page we're trying to open.
+
+ """
+ if Parameters.source.has_static(path):
+ return Parameters.source.read_static(path)
+
+ page, data = get_page(path)
+ if page and has_conflicts(page):
+ raise Exception('The requested page conflicts with another page')
+ return data
+
+
def get_page(path):
+ """Construct a page and return its contents.
+
+ Parameters
+ ----------
+ path: str
+ The path of the page we want to construct.
+
+ Returns
+ -------
+ (str, str)
+ With the following format:
+ <page_name, page_contents>
+
+ """
path = path.strip('/')
if path == '':
- path = source.read_config().get('general', 'defaultlocale')
- if '/' in path:
+ locale, page = Parameters.source.read_config().get(
+ 'general', 'defaultlocale'), ''
+ elif '/' in path:
locale, page = path.split('/', 1)
else:
locale, page = path, ''
- default_page = source.read_config().get('general', 'defaultpage')
- alternative_page = '/'.join([page, default_page]).lstrip('/')
+ default_page = Parameters.source.read_config().get('general',
+ 'defaultpage')
+ possible_pages = [page, '/'.join([page, default_page]).lstrip('/')]
+
+ for page_format in converters.iterkeys():
+ for p in possible_pages:
+ if Parameters.source.has_page(p, page_format):
+ return p, process_page(Parameters.source, locale, p,
+ page_format, Parameters.base_url)
+
+ if Parameters.source.has_localizable_file(locale, page):
+ return page, Parameters.source.read_localizable_file(locale, page)
+
+ return None, None
- for format in converters.iterkeys():
- for p in (page, alternative_page):
- if source.has_page(p, format):
- site_url = 'http://{}:{}'.format(address, port)
- return (p, process_page(source, locale, p, format, site_url))
- if source.has_localizable_file(locale, page):
- return (page, source.read_localizable_file(locale, page))
- return (None, None)
+def has_conflicts(page):
+ """Check if a page has conflicts.
+ A page has conflicts if there are other pages with the same name.
+ Parameters
+ ----------
+ page: str
+ The path of the page we're checking for conflicts.
-def has_conflicting_pages(page):
- pages = [p for p, _ in source.list_pages()]
- pages.extend(source.list_localizable_files())
+ Returns
+ -------
+ bool
+ True - if the page has conflicts
+ False - otherwise
+
+ """
+ pages = [p for p, _ in Parameters.source.list_pages()]
+ pages.extend(Parameters.source.list_localizable_files())
if pages.count(page) > 1:
return True
- if any(p.startswith(page + '/') or page.startswith(p + '/') for p in pages):
+ if any(p.startswith(page + '/') or page.startswith(p + '/')
+ for p in pages):
return True
return False
-def get_data(path):
- if source.has_static(path):
- return source.read_static(path)
+def get_error_page(start_response, status, **kw):
+ """Create and display an error page.
- page, data = get_page(path)
- if page and has_conflicting_pages(page):
- raise Exception('The requested page conflicts with another page')
- return data
+ Parameters
+ ----------
+ start_response: function
+ It will be called before constructing the error page, to setup
+ things like the status of the response and the headers.
+ status: str
+ The status of the response we're sending the error page with.
+ Needs to have the following format: "<status_code> <status_message>"
+ kw: dict
+ Any additional arguments that will be passed onto the `stream` method
+ of a `jinja2 Template`.
+ Returns
+ -------
+ generator
+ of utf8 strings - fragments of the corresponding error HTML template.
-def show_error(start_response, status, **kwargs):
+ """
env = jinja2.Environment(autoescape=True)
- template = env.from_string(ERROR_TEMPLATE)
- mime = 'text/html; encoding=%s' % UNICODE_ENCODING
+ page_template = env.from_string(ERROR_TEMPLATE)
+ mime = 'text/html; encoding={}'.format(UNICODE_ENCODING)
+
start_response(status, [('Content-Type', mime)])
- for fragment in template.stream(status=status, **kwargs):
+
+ for fragment in page_template.stream(status=status, **kw):
yield fragment.encode(UNICODE_ENCODING)
+def set_parameters():
+ """Set the arguments required to run the script.
+
+ Performs the following actions:
+ 1. Setup the script's argument parser
+ 2. Read the arguments provided when running the script
+ 3. Set the fields of the Parameters namespace
+
+ """
+ parser = ArgumentParser(description='CMS development server created to '
+ 'test pages locally and on-the-fly')
+
+ parser.add_argument('path', default=os.curdir, nargs='?',
+ help='Path to the website we intend to run. If not '
+ 'provided, defaults, to the current directory.')
+ parser.add_argument('--host', default='localhost',
+ help='Address of the host the server will listen on. '
+ 'Defaults to "localhost".')
+ parser.add_argument('--port', default=5000, type=int,
+ help='TCP port the server will listen on. Default '
+ '5000.')
+
+ args = parser.parse_args()
+
+ Parameters.source = create_source(args.path)
+ Parameters.host = args.host
+ Parameters.port = args.port
+ Parameters.base_url = 'http://{0}:{1}'.format(args.host, args.port)
+
+
def handler(environ, start_response):
+ """Handle a request for a page.
+
+ Parameters
+ ----------
+ environ: dict
+ The environment under which the request si made.
+ start_response: function
+ Used to initiate a request response.
+
+ Returns
+ -------
+ [str]
+ With the response body.
+
+ """
path = environ.get('PATH_INFO')
data = get_data(path)
+
if data is None:
- return show_error(start_response, '404 Not Found', uri=path)
+ return get_error_page(start_response, '404 Not Found', uri=path)
mime = mimetypes.guess_type(path)[0] or 'text/html'
if isinstance(data, unicode):
data = data.encode(UNICODE_ENCODING)
- mime = '%s; charset=%s' % (mime, UNICODE_ENCODING)
+ mime = '{0}; charset={1}'.format(mime, UNICODE_ENCODING)
start_response('200 OK', [('Content-Type', mime)])
return [data]
-if __name__ == '__main__':
+def make_werkzeug_server():
+ """Set up a server that uses `werkzeug`.
- parser = argparse.ArgumentParser(description='CMS development server created to test pages locally and on-the-fly')
- parser.add_argument('path', nargs='?', default=os.curdir)
- parser.add_argument('-a', '--address', default='localhost', help='Address of the interface the server will listen on')
- parser.add_argument('-p', '--port', type=int, default=5000, help='TCP port the server will listen on')
- args = parser.parse_args()
+ Returns
+ -------
+ function
+ Used to run the server
+
+ Raises
+ ------
+ ImportError
+ If the package `werkzeug` is not installed
- source = create_source(args.path)
- address = args.address
- port = args.port
+ """
+ from werkzeug.serving import run_simple
+ def run_func(*args, **kwargs):
+ # The werkzeug logger must be configured before the
+ # root logger. Also we must prevent it from propagating
+ # messages, otherwise messages are logged twice.
+ import logging
+ logger = logging.getLogger('werkzeug')
+ logger.propagate = False
+ logger.setLevel(logging.INFO)
+ logger.addHandler(logging.StreamHandler())
+
+ run_simple(threaded=True, *args, **kwargs)
+
+ return run_func
+
+
+def make_builtins_server():
+ """Configure a server that only uses builtin packages.
+
+ Returns
+ -------
+ function
+ Used to run the server.
+
+ """
+ from SocketServer import ThreadingMixIn
+ from wsgiref.simple_server import WSGIServer, make_server
+
+ class ThreadedWSGIServer(ThreadingMixIn, WSGIServer):
+ daemon_threads = True
+
+ def run(host, port, app, **kwargs):
+ def wrapper(environ, start_response):
+ try:
+ return app(environ, start_response)
+ except Exception as e:
+ return get_error_page(start_response,
+ '500 Internal Server Error',
+ uri=environ.get('PATH_INFO'), error=e)
+
+ server = make_server(host, port, wrapper,
+ ThreadedWSGIServer)
+ print(' * Running on {0}:{1}'.format(*server.server_address))
+ server.serve_forever()
+
+ return run
+
+
+def main():
+ set_parameters()
try:
- from werkzeug.serving import ThreadedWSGIServer, run_simple
-
- # see https://github.com/mitsuhiko/werkzeug/pull/770
- ThreadedWSGIServer.daemon_threads = True
-
- def run(*args, **kwargs):
- # The werkzeug logger must be configured before the
- # root logger. Also we must prevent it from propagating
- # messages, otherwise messages are logged twice.
- import logging
- logger = logging.getLogger('werkzeug')
- logger.propagate = False
- logger.setLevel(logging.INFO)
- logger.addHandler(logging.StreamHandler())
-
- run_simple(threaded=True, *args, **kwargs)
+ run = make_werkzeug_server()
except ImportError:
- from SocketServer import ThreadingMixIn
- from wsgiref.simple_server import WSGIServer, make_server
-
- class ThreadedWSGIServer(ThreadingMixIn, WSGIServer):
- daemon_threads = True
-
- def run(host, port, app, **kwargs):
- def wrapper(environ, start_response):
- try:
- return app(environ, start_response)
- except Exception as e:
- return show_error(start_response, '500 Internal Server Error',
- uri=environ.get('PATH_INFO'), error=e)
+ run = make_builtins_server()
+ run(Parameters.host, Parameters.port, handler, use_reloader=True,
+ use_debugger=True)
- server = make_server(host, port, wrapper, ThreadedWSGIServer)
- print ' * Running on http://%s:%i/' % server.server_address
- server.serve_forever()
- run(address, port, handler, use_reloader=True, use_debugger=True)
+if __name__ == '__main__':
+ main()
« no previous file with comments | « no previous file | tests/test_page_outputs.py » ('j') | tests/test_page_outputs.py » ('J')

Powered by Google App Engine
This is Rietveld