Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Side by Side Diff: sitescripts/stats/bin/logprocessor.py

Issue 5182947690807296: Centralize stats processing, have the stats server pull in logs (Closed)
Patch Set: Closing pool, added --revert option Created Dec. 23, 2013, 9:41 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2013 Eyeo GmbH 4 # Copyright (C) 2006-2013 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details. 13 # GNU General Public License for more details.
14 # 14 #
15 # You should have received a copy of the GNU General Public License 15 # You should have received a copy of the GNU General Public License
16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. 16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
17 17
18 import os, sys, codecs, re, math, urllib, urlparse, socket, json 18 import argparse
19 import codecs
20 from collections import OrderedDict
21 from datetime import datetime, timedelta
22 import errno
23 import gzip
24 import json
25 import math
26 import multiprocessing
27 import numbers
28 import os
29 import re
19 import pygeoip 30 import pygeoip
20 from collections import OrderedDict 31 import socket
32 import subprocess
33 import sys
34 import traceback
35 import urllib
36 import urlparse
37
21 import sitescripts.stats.common as common 38 import sitescripts.stats.common as common
22 from sitescripts.utils import get_config, setupStderr 39 from sitescripts.utils import get_config, setupStderr
23 from datetime import datetime, timedelta
24 40
25 log_regexp = None 41 log_regexp = None
26 mirror_name = None
27 gecko_apps = None 42 gecko_apps = None
28 43
44 def open_stats_file(path):
45 parseresult = urlparse.urlparse(path)
46 if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostna me and parseresult.path:
47 command = [
48 "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k",
49 "-l", parseresult.username,
50 parseresult.hostname,
51 parseresult.path.lstrip("/")
52 ]
53 if parseresult.port:
54 command[1:1] = ["-P", str(parseresult.port)]
55 result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout
56 elif parseresult.scheme in ("http", "https"):
57 result = urllib.urlopen(path)
58 elif os.path.exists(path):
59 result = open(path, "rb")
60 else:
61 raise IOError("Path '%s' not recognized" % path)
62
63 if path.endswith(".gz"):
64 # Built-in gzip module doesn't support streaming (fixed in Python 3.2)
65 result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.P IPE).stdout
66 return result
67
68 def get_stats_files():
69 config = get_config()
70
71 prefix = "mirror_"
72 options = filter(lambda o: o.startswith(prefix), config.options("stats"))
73 for option in options:
74 if config.has_option("stats", option):
75 value = config.get("stats", option)
76 if " " in value:
77 yield [option[len(prefix):]] + value.split(None, 1)
78 else:
79 print >>sys.stderr, "Option '%s' has invalid value: '%s'" % (option, val ue)
80 else:
81 print >>sys.stderr, "Option '%s' not found in the configuration" % option
82
29 def cache_lru(func): 83 def cache_lru(func):
30 """ 84 """
31 Decorator that memoizes the return values of a single-parameter function in 85 Decorator that memoizes the return values of a single-parameter function in
32 case it is called again with the same parameter. The 1024 most recent 86 case it is called again with the same parameter. The 1024 most recent
33 results are saved. 87 results are saved.
34 """ 88 """
35 89
36 results = OrderedDict() 90 results = OrderedDict()
37 results.entries_left = 1024 91 results.entries_left = 1024
38 92
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 if ua == "Adblock Plus": 204 if ua == "Adblock Plus":
151 return "ABP", "" 205 return "ABP", ""
152 206
153 return "Other", "" 207 return "Other", ""
154 208
155 def process_ip(ip, geo, geov6): 209 def process_ip(ip, geo, geov6):
156 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip) 210 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip)
157 if match: 211 if match:
158 ip = match.group(1) 212 ip = match.group(1)
159 213
160 if ":" in ip: 214 try:
161 country = geov6.country_code_by_addr(ip) 215 if ":" in ip:
162 else: 216 country = geov6.country_code_by_addr(ip)
163 country = geo.country_code_by_addr(ip) 217 else:
218 country = geo.country_code_by_addr(ip)
219 except:
220 traceback.print_exc()
221 country = ""
222
164 if country in (None, "", "--"): 223 if country in (None, "", "--"):
165 country = "unknown" 224 country = "unknown"
166 country = country.lower() 225 country = country.lower()
167 226
168 return ip, country 227 return ip, country
169 228
170 @cache_last 229 @cache_last
171 def parse_time(timestr, tz_hours, tz_minutes): 230 def parse_time(timestr, tz_hours, tz_minutes):
172 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S") 231 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S")
173 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h ours)) 232 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h ours))
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 357
299 # Only leave the major and minor release number for application 358 # Only leave the major and minor release number for application
300 applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion) 359 applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion)
301 360
302 return version, application, applicationVersion 361 return version, application, applicationVersion
303 362
304 def parse_update_flag(query): 363 def parse_update_flag(query):
305 return "update" if query == "update" else "install" 364 return "update" if query == "update" else "install"
306 365
307 def parse_record(line, ignored, geo, geov6): 366 def parse_record(line, ignored, geo, geov6):
308 global log_regexp, mirror_name 367 global log_regexp
309 if log_regexp == None: 368 if log_regexp == None:
310 log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" " ([^"]*)")?') 369 log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" " ([^"]*)")?')
311 if mirror_name == None:
312 mirror_name = re.sub(r"\..*", "", socket.gethostname())
313 370
314 match = re.search(log_regexp, line) 371 match = re.search(log_regexp, line)
315 if not match: 372 if not match:
316 return None 373 return None
317 374
318 status = int(match.group(6)) 375 status = int(match.group(6))
319 if status != 200: 376 if status != 200:
320 return None 377 return None
321 378
322 info = { 379 info = {
323 "mirror": mirror_name,
324 "size": int(match.group(7)), 380 "size": int(match.group(7)),
325 } 381 }
326 382
327 info["ip"], info["country"] = process_ip(match.group(1), geo, geov6) 383 info["ip"], info["country"] = process_ip(match.group(1), geo, geov6)
328 info["time"], info["month"], info["day"], info["weekday"], info["hour"] = pars e_time(match.group(2), int(match.group(3)), int(match.group(4))) 384 info["time"], info["month"], info["day"], info["weekday"], info["hour"] = pars e_time(match.group(2), int(match.group(3)), int(match.group(4)))
329 info["file"], info["query"] = parse_path(match.group(5)) 385 info["file"], info["query"] = parse_path(match.group(5))
330 info["ua"], info["uaversion"] = parse_ua(match.group(8)) 386 info["ua"], info["uaversion"] = parse_ua(match.group(8))
331 info["fullua"] = "%s %s" % (info["ua"], info["uaversion"]) 387 info["fullua"] = "%s %s" % (info["ua"], info["uaversion"])
332 info["clientid"] = match.group(9) 388 info["clientid"] = match.group(9)
333 389
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
375 continue 431 continue
376 432
377 value = info[field] 433 value = info[field]
378 if field not in section: 434 if field not in section:
379 section[field] = {} 435 section[field] = {}
380 if value not in section[field]: 436 if value not in section[field]:
381 section[field][value] = {} 437 section[field][value] = {}
382 438
383 add_record(info, section[field][value], ignore_fields + (field,)) 439 add_record(info, section[field][value], ignore_fields + (field,))
384 440
385 def parse_stdin(geo, geov6, verbose): 441 def parse_fileobj(mirror_name, fileobj, geo, geov6, ignored):
386 data = {} 442 data = {}
387 ignored = set() 443 for line in fileobj:
388 for line in sys.stdin:
389 info = parse_record(line, ignored, geo, geov6) 444 info = parse_record(line, ignored, geo, geov6)
390 if info == None: 445 if info == None:
391 continue 446 continue
392 447
448 info["mirror"] = mirror_name
393 if info["month"] not in data: 449 if info["month"] not in data:
394 data[info["month"]] = {} 450 data[info["month"]] = {}
395 section = data[info["month"]] 451 section = data[info["month"]]
396 452
397 if info["file"] not in section: 453 if info["file"] not in section:
398 section[info["file"]] = {} 454 section[info["file"]] = {}
399 section = section[info["file"]] 455 section = section[info["file"]]
400 456
401 add_record(info, section) 457 add_record(info, section)
458 return data
402 459
403 if verbose: 460 def merge_objects(object1, object2, factor=1):
404 print "Ignored files" 461 for key, value in object2.iteritems():
405 print "=============" 462 key = unicode(key)
406 print "\n".join(sorted(ignored)) 463 if isinstance(value, numbers.Number):
407 return data 464 object1[key] = object1.get(key, 0) + factor * value
465 else:
466 merge_objects(object1.setdefault(key, {}), value, factor)
467
468 def save_stats(server_type, data, factor=1):
469 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type))
470 for month, month_data in data.iteritems():
471 for name, file_data in month_data.iteritems():
472 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json"))
473 if os.path.exists(path):
474 with codecs.open(path, "rb", encoding="utf-8") as fileobj:
475 existing = json.load(fileobj)
476 else:
477 existing = {}
478
479 merge_objects(existing, file_data, factor)
480
481 dir = os.path.dirname(path)
482 try:
483 os.makedirs(dir)
484 except OSError, e:
485 if e.errno != errno.EEXIST:
486 raise
487
488 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
489 json.dump(existing, fileobj, indent=2, sort_keys=True)
490
491 def parse_source((mirror_name, server_type, log_file)):
492 try:
493 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE)
494 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE)
495
496 ignored = set()
497 fileobj = open_stats_file(log_file)
498 try:
499 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
500 finally:
501 fileobj.close()
502 return server_type, log_file, data, ignored
503 except:
504 print >>sys.stderr, "Unable to process log file '%s'" % log_file
505 traceback.print_exc()
506 return None, None, None, None
507
508 def parse_sources(sources, factor=1, verbose=False):
509 pool = multiprocessing.Pool()
510 for server_type, log_file, data, ignored in pool.imap(parse_source, sources, c hunksize=1):
511 if server_type == None:
512 continue
513
514 save_stats(server_type, data, factor)
515 if verbose:
516 print "Ignored files for %s" % log_file
517 print "============================================================"
518 print "\n".join(sorted(ignored))
519 pool.close()
Sebastian Noack 2013/12/24 10:08:42 You should put the code above into a try-finally b
408 520
409 if __name__ == "__main__": 521 if __name__ == "__main__":
410 setupStderr() 522 setupStderr()
411 523
412 verbose = (len(sys.argv) >= 2 and sys.argv[1] == "verbose") 524 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database")
413 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACH E) 525 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed")
414 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_ CACHE) 526 parser.add_argument("--revert", dest="factor", action="store_const", const=-1, default=1, help="Remove log data from the database")
415 result = parse_stdin(geo, geov6, verbose) 527 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
528 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
529 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
530 args = parser.parse_args()
416 531
417 with codecs.open(get_config().get("stats", "tempFile"), "wb", encoding="utf-8" ) as file: 532 if args.mirror_name and args.server_type and args.log_file:
418 json.dump(result, file, indent=2, sort_keys=True) 533 sources = [(args.mirror_name, args.server_type, args.log_file)]
534 else:
535 sources = get_stats_files()
536 parse_sources(sources, args.factor, args.verbose)
OLDNEW
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld