Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Delta Between Two Patch Sets: sitescripts/stats/bin/logprocessor.py

Issue 5182947690807296: Centralize stats processing, have the stats server pull in logs (Closed)
Left Patch Set: Created Dec. 20, 2013, 1:07 p.m.
Right Patch Set: Fixed comment and processing of non-Unicode keys Created Dec. 26, 2013, 2:09 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2013 Eyeo GmbH 4 # Copyright (C) 2006-2013 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details. 13 # GNU General Public License for more details.
14 # 14 #
15 # You should have received a copy of the GNU General Public License 15 # You should have received a copy of the GNU General Public License
16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. 16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
17 17
18 import argparse 18 import argparse
19 import codecs 19 import codecs
20 from collections import OrderedDict 20 from collections import OrderedDict
21 from datetime import datetime, timedelta 21 from datetime import datetime, timedelta
22 import errno 22 import errno
23 import gzip 23 import gzip
24 import json 24 import json
25 import math 25 import math
26 import multiprocessing
27 import numbers
26 import os 28 import os
27 import re 29 import re
28 import pygeoip 30 import pygeoip
29 import socket 31 import socket
30 from StringIO import StringIO
31 import subprocess 32 import subprocess
32 import sys 33 import sys
33 import traceback 34 import traceback
34 import urllib 35 import urllib
35 import urlparse 36 import urlparse
36 37
37 import sitescripts.stats.common as common 38 import sitescripts.stats.common as common
38 from sitescripts.utils import get_config, setupStderr 39 from sitescripts.utils import get_config, setupStderr
39 40
40 log_regexp = None 41 log_regexp = None
41 gecko_apps = None 42 gecko_apps = None
42 43
43 def open_stats_file(path): 44 def open_stats_file(path):
44 match = re.search(r"^ssh://(\w+)@([^/:]+)(?::(\d+))?/([^/]+)", path) 45 parseresult = urlparse.urlparse(path)
45 if match: 46 if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostna me and parseresult.path:
46 user, host, port, filename = match.groups() 47 command = [
47 command = ["ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k", "-l", user, host, filename] 48 "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k",
48 if port: 49 "-l", parseresult.username,
49 command[1:1] = ["-P", port] 50 parseresult.hostname,
50 51 parseresult.path.lstrip("/")
51 # Not using StringIO here would be better but gzip module needs seeking 52 ]
52 result = StringIO(subprocess.check_output(command)) 53 if parseresult.port:
53 elif path.startswith("http://") or path.startswith("https://"): 54 command[1:1] = ["-P", str(parseresult.port)]
54 result = StringIO(urllib.urlopen(path).read()) 55 result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout
56 elif parseresult.scheme in ("http", "https"):
57 result = urllib.urlopen(path)
55 elif os.path.exists(path): 58 elif os.path.exists(path):
56 result = open(path, "rb") 59 result = open(path, "rb")
57 else: 60 else:
58 raise IOError("Path '%s' not recognized" % path) 61 raise IOError("Path '%s' not recognized" % path)
59 62
60 if path.endswith(".gz"): 63 if path.endswith(".gz"):
61 result = gzip.GzipFile(fileobj=result) 64 # Built-in gzip module doesn't support streaming (fixed in Python 3.2)
65 result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.P IPE).stdout
62 return result 66 return result
63 67
64 def get_stats_files(): 68 def get_stats_files():
65 config = get_config() 69 config = get_config()
66 70
67 prefix = "mirror_" 71 prefix = "mirror_"
68 options = filter(lambda o: o.startswith(prefix), config.options("stats")) 72 options = filter(lambda o: o.startswith(prefix), config.options("stats"))
69 for option in options: 73 for option in options:
70 if config.has_option("stats", option): 74 if config.has_option("stats", option):
71 value = config.get("stats", option) 75 value = config.get("stats", option)
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after
200 if ua == "Adblock Plus": 204 if ua == "Adblock Plus":
201 return "ABP", "" 205 return "ABP", ""
202 206
203 return "Other", "" 207 return "Other", ""
204 208
205 def process_ip(ip, geo, geov6): 209 def process_ip(ip, geo, geov6):
206 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip) 210 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip)
207 if match: 211 if match:
208 ip = match.group(1) 212 ip = match.group(1)
209 213
210 if ":" in ip: 214 try:
211 country = geov6.country_code_by_addr(ip) 215 if ":" in ip:
212 else: 216 country = geov6.country_code_by_addr(ip)
213 country = geo.country_code_by_addr(ip) 217 else:
218 country = geo.country_code_by_addr(ip)
219 except:
220 traceback.print_exc()
221 country = ""
222
214 if country in (None, "", "--"): 223 if country in (None, "", "--"):
215 country = "unknown" 224 country = "unknown"
216 country = country.lower() 225 country = country.lower()
217 226
218 return ip, country 227 return ip, country
219 228
220 @cache_last 229 @cache_last
221 def parse_time(timestr, tz_hours, tz_minutes): 230 def parse_time(timestr, tz_hours, tz_minutes):
222 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S") 231 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S")
223 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h ours)) 232 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h ours))
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after
441 data[info["month"]] = {} 450 data[info["month"]] = {}
442 section = data[info["month"]] 451 section = data[info["month"]]
443 452
444 if info["file"] not in section: 453 if info["file"] not in section:
445 section[info["file"]] = {} 454 section[info["file"]] = {}
446 section = section[info["file"]] 455 section = section[info["file"]]
447 456
448 add_record(info, section) 457 add_record(info, section)
449 return data 458 return data
450 459
451 def merge_objects(object1, object2): 460 def merge_objects(object1, object2, factor=1):
452 for key, value in object2.iteritems(): 461 for key, value in object2.iteritems():
453 if key in object1: 462 try:
454 if isinstance(value, int): 463 key = unicode(key)
455 object1[key] += value 464 except UnicodeDecodeError:
456 else: 465 key = unicode(key, encoding="latin-1")
457 merge_objects(object1[key], object2[key]) 466 if isinstance(value, numbers.Number):
458 else: 467 object1[key] = object1.get(key, 0) + factor * value
459 object1[key] = value 468 else:
460 469 merge_objects(object1.setdefault(key, {}), value, factor)
461 def save_stats(server_type, data): 470
471 def save_stats(server_type, data, factor=1):
462 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type)) 472 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type))
463 for month, month_data in data.iteritems(): 473 for month, month_data in data.iteritems():
464 for name, file_data in month_data.iteritems(): 474 for name, file_data in month_data.iteritems():
465 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json")) 475 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json"))
466 if os.path.exists(path): 476 if os.path.exists(path):
467 with codecs.open(path, "rb", encoding="utf-8") as fileobj: 477 with codecs.open(path, "rb", encoding="utf-8") as fileobj:
468 existing = json.load(fileobj) 478 existing = json.load(fileobj)
469 else: 479 else:
470 existing = {} 480 existing = {}
471 481
472 merge_objects(existing, file_data) 482 merge_objects(existing, file_data, factor)
473 483
474 dir = os.path.dirname(path) 484 dir = os.path.dirname(path)
475 try: 485 try:
476 os.makedirs(dir) 486 os.makedirs(dir)
477 except OSError, e: 487 except OSError, e:
478 if e.errno != errno.EEXIST: 488 if e.errno != errno.EEXIST:
479 raise 489 raise
480 490
481 with codecs.open(path, "wb", encoding="utf-8") as fileobj: 491 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
482 json.dump(existing, fileobj, indent=2, sort_keys=True) 492 json.dump(existing, fileobj, indent=2, sort_keys=True)
483 493
484 def parse_file(mirror_name, server_type, log_file, geo, geov6, verbose): 494 def parse_source((mirror_name, server_type, log_file)):
485 ignored = set()
486 fileobj = open_stats_file(log_file)
487 try: 495 try:
488 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) 496 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE)
497 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE)
498
499 ignored = set()
500 fileobj = open_stats_file(log_file)
501 try:
502 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
503 finally:
504 fileobj.close()
505 return server_type, log_file, data, ignored
506 except:
507 print >>sys.stderr, "Unable to process log file '%s'" % log_file
508 traceback.print_exc()
509 return None, None, None, None
510
511 def parse_sources(sources, factor=1, verbose=False):
512 pool = multiprocessing.Pool()
513 try:
514 for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1):
515 if server_type == None:
516 continue
517
518 save_stats(server_type, data, factor)
519 if verbose:
520 print "Ignored files for %s" % log_file
521 print "============================================================"
522 print "\n".join(sorted(ignored))
489 finally: 523 finally:
490 fileobj.close() 524 pool.close()
491 save_stats(server_type, data)
492
493 if verbose:
494 print "Ignored files for %s" % log_file
495 print "============================================================"
496 print "\n".join(sorted(ignored))
497 525
498 if __name__ == "__main__": 526 if __name__ == "__main__":
499 setupStderr() 527 setupStderr()
500 528
501 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database") 529 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database")
502 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed") 530 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed")
531 parser.add_argument("--revert", dest="factor", action="store_const", const=-1, default=1, help="Remove log data from the database")
503 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to") 532 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
504 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription") 533 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
505 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL") 534 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
506 args = parser.parse_args() 535 args = parser.parse_args()
507 536
508 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACH E)
509 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_ CACHE)
510
511 if args.mirror_name and args.server_type and args.log_file: 537 if args.mirror_name and args.server_type and args.log_file:
512 parse_file(args.mirror_name, args.server_type, args.log_file, geo, geov6, ar gs.verbose) 538 sources = [(args.mirror_name, args.server_type, args.log_file)]
513 else: 539 else:
514 for mirror_name, server_type, log_file in get_stats_files(): 540 sources = get_stats_files()
515 try: 541 parse_sources(sources, args.factor, args.verbose)
516 parse_file(mirror_name, server_type, log_file, geo, geov6, args.verbose)
517 except:
518 print >>sys.stderr, "Unable to process log file '%s'" % log_file
519 traceback.print_exc()
520
LEFTRIGHT

Powered by Google App Engine
This is Rietveld