Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Delta Between Two Patch Sets: sitescripts/stats/bin/logprocessor.py

Issue 5182947690807296: Centralize stats processing, have the stats server pull in logs (Closed)
Left Patch Set: Fixed various issues Created Dec. 22, 2013, 4 p.m.
Right Patch Set: Fixed comment and processing of non-Unicode keys Created Dec. 26, 2013, 2:09 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2013 Eyeo GmbH 4 # Copyright (C) 2006-2013 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
(...skipping 439 matching lines...) Expand 10 before | Expand all | Expand 10 after
450 data[info["month"]] = {} 450 data[info["month"]] = {}
451 section = data[info["month"]] 451 section = data[info["month"]]
452 452
453 if info["file"] not in section: 453 if info["file"] not in section:
454 section[info["file"]] = {} 454 section[info["file"]] = {}
455 section = section[info["file"]] 455 section = section[info["file"]]
456 456
457 add_record(info, section) 457 add_record(info, section)
458 return data 458 return data
459 459
460 def merge_objects(object1, object2): 460 def merge_objects(object1, object2, factor=1):
461 for key, value in object2.iteritems(): 461 for key, value in object2.iteritems():
462 if key in object1: 462 try:
463 if isinstance(value, numbers.Number): 463 key = unicode(key)
464 object1[key] += value 464 except UnicodeDecodeError:
465 else: 465 key = unicode(key, encoding="latin-1")
466 merge_objects(object1[key], object2[key]) 466 if isinstance(value, numbers.Number):
467 else: 467 object1[key] = object1.get(key, 0) + factor * value
468 object1[key] = value 468 else:
469 469 merge_objects(object1.setdefault(key, {}), value, factor)
470 def save_stats(server_type, data): 470
471 def save_stats(server_type, data, factor=1):
471 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type)) 472 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type))
472 for month, month_data in data.iteritems(): 473 for month, month_data in data.iteritems():
473 for name, file_data in month_data.iteritems(): 474 for name, file_data in month_data.iteritems():
474 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json")) 475 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json"))
475 if os.path.exists(path): 476 if os.path.exists(path):
476 with codecs.open(path, "rb", encoding="utf-8") as fileobj: 477 with codecs.open(path, "rb", encoding="utf-8") as fileobj:
477 existing = json.load(fileobj) 478 existing = json.load(fileobj)
478 else: 479 else:
479 existing = {} 480 existing = {}
480 481
481 merge_objects(existing, file_data) 482 merge_objects(existing, file_data, factor)
482 483
483 dir = os.path.dirname(path) 484 dir = os.path.dirname(path)
484 try: 485 try:
485 os.makedirs(dir) 486 os.makedirs(dir)
486 except OSError, e: 487 except OSError, e:
487 if e.errno != errno.EEXIST: 488 if e.errno != errno.EEXIST:
488 raise 489 raise
489 490
490 with codecs.open(path, "wb", encoding="utf-8") as fileobj: 491 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
491 json.dump(existing, fileobj, indent=2, sort_keys=True) 492 json.dump(existing, fileobj, indent=2, sort_keys=True)
492 493
493 def parse_source((mirror_name, server_type, log_file)): 494 def parse_source((mirror_name, server_type, log_file)):
494 try: 495 try:
495 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE) 496 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE)
496 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE) 497 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE)
497 498
498 ignored = set() 499 ignored = set()
499 fileobj = open_stats_file(log_file) 500 fileobj = open_stats_file(log_file)
500 try: 501 try:
501 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) 502 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
502 finally: 503 finally:
503 fileobj.close() 504 fileobj.close()
504 return server_type, log_file, data, ignored 505 return server_type, log_file, data, ignored
505 except: 506 except:
506 print >>sys.stderr, "Unable to process log file '%s'" % log_file 507 print >>sys.stderr, "Unable to process log file '%s'" % log_file
507 traceback.print_exc() 508 traceback.print_exc()
508 return None, None, None, None 509 return None, None, None, None
509 510
510 def parse_sources(sources, verbose): 511 def parse_sources(sources, factor=1, verbose=False):
511 pool = multiprocessing.Pool() 512 pool = multiprocessing.Pool()
512 for server_type, log_file, data, ignored in pool.imap(parse_source, sources, c hunksize=1): 513 try:
513 if server_type == None: 514 for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1):
514 continue 515 if server_type == None:
515 516 continue
516 save_stats(server_type, data) 517
517 if verbose: 518 save_stats(server_type, data, factor)
518 print "Ignored files for %s" % log_file 519 if verbose:
519 print "============================================================" 520 print "Ignored files for %s" % log_file
520 print "\n".join(sorted(ignored)) 521 print "============================================================"
522 print "\n".join(sorted(ignored))
523 finally:
524 pool.close()
521 525
522 if __name__ == "__main__": 526 if __name__ == "__main__":
523 setupStderr() 527 setupStderr()
524 528
525 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database") 529 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database")
526 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed") 530 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed")
531 parser.add_argument("--revert", dest="factor", action="store_const", const=-1, default=1, help="Remove log data from the database")
527 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to") 532 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
528 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription") 533 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
529 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL") 534 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
530 args = parser.parse_args() 535 args = parser.parse_args()
531 536
532 if args.mirror_name and args.server_type and args.log_file: 537 if args.mirror_name and args.server_type and args.log_file:
533 sources = [(args.mirror_name, args.server_type, args.log_file)] 538 sources = [(args.mirror_name, args.server_type, args.log_file)]
534 else: 539 else:
535 sources = get_stats_files() 540 sources = get_stats_files()
536 parse_sources(sources, args.verbose) 541 parse_sources(sources, args.factor, args.verbose)
LEFTRIGHT

Powered by Google App Engine
This is Rietveld