Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Delta Between Two Patch Sets: sitescripts/stats/bin/logprocessor.py

Issue 5182947690807296: Centralize stats processing, have the stats server pull in logs (Closed)
Left Patch Set: Fixed exception thrown on invalid IPs Created Dec. 20, 2013, 2:46 p.m.
Right Patch Set: Fixed comment and processing of non-Unicode keys Created Dec. 26, 2013, 2:09 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2013 Eyeo GmbH 4 # Copyright (C) 2006-2013 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details. 13 # GNU General Public License for more details.
14 # 14 #
15 # You should have received a copy of the GNU General Public License 15 # You should have received a copy of the GNU General Public License
16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. 16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
17 17
18 import argparse 18 import argparse
19 import codecs 19 import codecs
20 from collections import OrderedDict 20 from collections import OrderedDict
21 from datetime import datetime, timedelta 21 from datetime import datetime, timedelta
22 import errno 22 import errno
23 import gzip 23 import gzip
24 import json 24 import json
25 import math 25 import math
26 import multiprocessing
27 import numbers
26 import os 28 import os
27 import re 29 import re
28 import pygeoip 30 import pygeoip
29 import socket 31 import socket
30 from StringIO import StringIO
31 import subprocess 32 import subprocess
32 import sys 33 import sys
33 import traceback 34 import traceback
34 import urllib 35 import urllib
35 import urlparse 36 import urlparse
36 37
37 import sitescripts.stats.common as common 38 import sitescripts.stats.common as common
38 from sitescripts.utils import get_config, setupStderr 39 from sitescripts.utils import get_config, setupStderr
39 40
40 log_regexp = None 41 log_regexp = None
41 gecko_apps = None 42 gecko_apps = None
42 43
43 def open_stats_file(path): 44 def open_stats_file(path):
44 match = re.search(r"^ssh://(\w+)@([^/:]+)(?::(\d+))?/([^/]+)", path) 45 parseresult = urlparse.urlparse(path)
Sebastian Noack 2013/12/20 15:48:37 You can use urlparse (from the built-in module "ur
45 if match: 46 if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostna me and parseresult.path:
46 user, host, port, filename = match.groups() 47 command = [
47 command = ["ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k", "-l", user, host, filename] 48 "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k",
48 if port: 49 "-l", parseresult.username,
49 command[1:1] = ["-P", port] 50 parseresult.hostname,
50 51 parseresult.path.lstrip("/")
51 # Not using StringIO here would be better but gzip module needs seeking 52 ]
52 result = StringIO(subprocess.check_output(command)) 53 if parseresult.port:
53 elif path.startswith("http://") or path.startswith("https://"): 54 command[1:1] = ["-P", str(parseresult.port)]
54 result = StringIO(urllib.urlopen(path).read()) 55 result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout
56 elif parseresult.scheme in ("http", "https"):
57 result = urllib.urlopen(path)
55 elif os.path.exists(path): 58 elif os.path.exists(path):
56 result = open(path, "rb") 59 result = open(path, "rb")
57 else: 60 else:
58 raise IOError("Path '%s' not recognized" % path) 61 raise IOError("Path '%s' not recognized" % path)
59 62
60 if path.endswith(".gz"): 63 if path.endswith(".gz"):
61 result = gzip.GzipFile(fileobj=result) 64 # Built-in gzip module doesn't support streaming (fixed in Python 3.2)
65 result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.P IPE).stdout
62 return result 66 return result
63 67
64 def get_stats_files(): 68 def get_stats_files():
65 config = get_config() 69 config = get_config()
66 70
67 prefix = "mirror_" 71 prefix = "mirror_"
68 options = filter(lambda o: o.startswith(prefix), config.options("stats")) 72 options = filter(lambda o: o.startswith(prefix), config.options("stats"))
69 for option in options: 73 for option in options:
70 if config.has_option("stats", option): 74 if config.has_option("stats", option):
71 value = config.get("stats", option) 75 value = config.get("stats", option)
(...skipping 374 matching lines...) Expand 10 before | Expand all | Expand 10 after
446 data[info["month"]] = {} 450 data[info["month"]] = {}
447 section = data[info["month"]] 451 section = data[info["month"]]
448 452
449 if info["file"] not in section: 453 if info["file"] not in section:
450 section[info["file"]] = {} 454 section[info["file"]] = {}
451 section = section[info["file"]] 455 section = section[info["file"]]
452 456
453 add_record(info, section) 457 add_record(info, section)
454 return data 458 return data
455 459
456 def merge_objects(object1, object2): 460 def merge_objects(object1, object2, factor=1):
457 for key, value in object2.iteritems(): 461 for key, value in object2.iteritems():
458 if key in object1: 462 try:
459 if isinstance(value, int): 463 key = unicode(key)
460 object1[key] += value 464 except UnicodeDecodeError:
461 else: 465 key = unicode(key, encoding="latin-1")
462 merge_objects(object1[key], object2[key]) 466 if isinstance(value, numbers.Number):
463 else: 467 object1[key] = object1.get(key, 0) + factor * value
464 object1[key] = value 468 else:
465 469 merge_objects(object1.setdefault(key, {}), value, factor)
466 def save_stats(server_type, data): 470
471 def save_stats(server_type, data, factor=1):
467 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type)) 472 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type))
468 for month, month_data in data.iteritems(): 473 for month, month_data in data.iteritems():
469 for name, file_data in month_data.iteritems(): 474 for name, file_data in month_data.iteritems():
470 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json")) 475 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json"))
471 if os.path.exists(path): 476 if os.path.exists(path):
472 with codecs.open(path, "rb", encoding="utf-8") as fileobj: 477 with codecs.open(path, "rb", encoding="utf-8") as fileobj:
473 existing = json.load(fileobj) 478 existing = json.load(fileobj)
474 else: 479 else:
475 existing = {} 480 existing = {}
476 481
477 merge_objects(existing, file_data) 482 merge_objects(existing, file_data, factor)
478 483
479 dir = os.path.dirname(path) 484 dir = os.path.dirname(path)
480 try: 485 try:
481 os.makedirs(dir) 486 os.makedirs(dir)
482 except OSError, e: 487 except OSError, e:
483 if e.errno != errno.EEXIST: 488 if e.errno != errno.EEXIST:
484 raise 489 raise
485 490
486 with codecs.open(path, "wb", encoding="utf-8") as fileobj: 491 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
487 json.dump(existing, fileobj, indent=2, sort_keys=True) 492 json.dump(existing, fileobj, indent=2, sort_keys=True)
488 493
489 def parse_file(mirror_name, server_type, log_file, geo, geov6, verbose): 494 def parse_source((mirror_name, server_type, log_file)):
490 ignored = set()
491 fileobj = open_stats_file(log_file)
492 try: 495 try:
493 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) 496 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE)
497 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE)
498
499 ignored = set()
500 fileobj = open_stats_file(log_file)
501 try:
502 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
503 finally:
504 fileobj.close()
505 return server_type, log_file, data, ignored
506 except:
507 print >>sys.stderr, "Unable to process log file '%s'" % log_file
508 traceback.print_exc()
509 return None, None, None, None
510
511 def parse_sources(sources, factor=1, verbose=False):
512 pool = multiprocessing.Pool()
513 try:
514 for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1):
515 if server_type == None:
516 continue
517
518 save_stats(server_type, data, factor)
519 if verbose:
520 print "Ignored files for %s" % log_file
521 print "============================================================"
522 print "\n".join(sorted(ignored))
494 finally: 523 finally:
495 fileobj.close() 524 pool.close()
496 save_stats(server_type, data)
497
498 if verbose:
499 print "Ignored files for %s" % log_file
500 print "============================================================"
501 print "\n".join(sorted(ignored))
502 525
503 if __name__ == "__main__": 526 if __name__ == "__main__":
504 setupStderr() 527 setupStderr()
505 528
506 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database") 529 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database")
507 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed") 530 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed")
531 parser.add_argument("--revert", dest="factor", action="store_const", const=-1, default=1, help="Remove log data from the database")
508 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to") 532 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
509 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription") 533 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
510 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL") 534 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
511 args = parser.parse_args() 535 args = parser.parse_args()
512 536
513 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACH E)
514 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_ CACHE)
515
516 if args.mirror_name and args.server_type and args.log_file: 537 if args.mirror_name and args.server_type and args.log_file:
517 parse_file(args.mirror_name, args.server_type, args.log_file, geo, geov6, ar gs.verbose) 538 sources = [(args.mirror_name, args.server_type, args.log_file)]
518 else: 539 else:
519 for mirror_name, server_type, log_file in get_stats_files(): 540 sources = get_stats_files()
520 try: 541 parse_sources(sources, args.factor, args.verbose)
521 parse_file(mirror_name, server_type, log_file, geo, geov6, args.verbose)
522 except:
523 print >>sys.stderr, "Unable to process log file '%s'" % log_file
524 traceback.print_exc()
525
LEFTRIGHT

Powered by Google App Engine
This is Rietveld