Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Side by Side Diff: sitescripts/stats/bin/logprocessor.py

Issue 5912761519308800: Stats processing: don`t send data back to the main process, save it directly in the worker process (Closed)
Patch Set: Created Jan. 31, 2014, 12:46 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2013 Eyeo GmbH 4 # Copyright (C) 2006-2013 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details. 13 # GNU General Public License for more details.
14 # 14 #
15 # You should have received a copy of the GNU General Public License 15 # You should have received a copy of the GNU General Public License
16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. 16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
17 17
18 import argparse 18 import argparse
19 import codecs 19 import codecs
20 from collections import OrderedDict 20 from collections import OrderedDict
21 from datetime import datetime, timedelta 21 from datetime import datetime, timedelta
22 import errno 22 import errno
23 import functools
23 import gzip 24 import gzip
24 import json 25 import json
25 import math 26 import math
26 import multiprocessing 27 import multiprocessing
27 import numbers 28 import numbers
28 import os 29 import os
29 import re 30 import re
30 import pygeoip 31 import pygeoip
31 import socket 32 import socket
32 import subprocess 33 import subprocess
(...skipping 466 matching lines...) Expand 10 before | Expand all | Expand 10 after
499 dir = os.path.dirname(path) 500 dir = os.path.dirname(path)
500 try: 501 try:
501 os.makedirs(dir) 502 os.makedirs(dir)
502 except OSError, e: 503 except OSError, e:
503 if e.errno != errno.EEXIST: 504 if e.errno != errno.EEXIST:
504 raise 505 raise
505 506
506 with codecs.open(path, "wb", encoding="utf-8") as fileobj: 507 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
507 json.dump(existing, fileobj, indent=2, sort_keys=True) 508 json.dump(existing, fileobj, indent=2, sort_keys=True)
508 509
509 def parse_source((mirror_name, server_type, log_file)): 510 def parse_source(factor, lock, (mirror_name, server_type, log_file)):
510 try: 511 try:
511 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE) 512 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE)
512 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE) 513 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE)
513 514
514 ignored = set() 515 ignored = set()
515 fileobj = open_stats_file(log_file) 516 fileobj = open_stats_file(log_file)
516 try: 517 try:
517 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) 518 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
518 finally: 519 finally:
519 fileobj.close() 520 fileobj.close()
520 return server_type, log_file, data, ignored 521
522 lock.acquire()
523 try:
524 save_stats(server_type, data, factor)
525 finally:
526 lock.release()
527 return log_file, ignored
521 except: 528 except:
522 print >>sys.stderr, "Unable to process log file '%s'" % log_file 529 print >>sys.stderr, "Unable to process log file '%s'" % log_file
523 traceback.print_exc() 530 traceback.print_exc()
524 return None, None, None, None 531 return None, None, None, None
525 532
526 def parse_sources(sources, factor=1, verbose=False): 533 def parse_sources(sources, factor=1, verbose=False):
527 pool = multiprocessing.Pool() 534 pool = multiprocessing.Pool()
535 lock = multiprocessing.Manager().Lock()
536 callback = functools.partial(parse_source, factor, lock)
528 try: 537 try:
529 for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1): 538 for log_file, ignored in pool.imap_unordered(callback, sources, chunksize=1) :
530 if server_type == None: 539 if verbose and ignored:
531 continue
532
533 save_stats(server_type, data, factor)
534 if verbose:
535 print "Ignored files for %s" % log_file 540 print "Ignored files for %s" % log_file
536 print "============================================================" 541 print "============================================================"
537 print "\n".join(sorted(ignored)) 542 print "\n".join(sorted(ignored))
538 finally: 543 finally:
539 pool.close() 544 pool.close()
540 545
541 if __name__ == "__main__": 546 if __name__ == "__main__":
542 setupStderr() 547 setupStderr()
543 548
544 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database") 549 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database")
545 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed") 550 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed")
546 parser.add_argument("--revert", dest="factor", action="store_const", const=-1, default=1, help="Remove log data from the database") 551 parser.add_argument("--revert", dest="factor", action="store_const", const=-1, default=1, help="Remove log data from the database")
547 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to") 552 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
548 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription") 553 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
549 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL") 554 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
550 args = parser.parse_args() 555 args = parser.parse_args()
551 556
552 if args.mirror_name and args.server_type and args.log_file: 557 if args.mirror_name and args.server_type and args.log_file:
553 sources = [(args.mirror_name, args.server_type, args.log_file)] 558 sources = [(args.mirror_name, args.server_type, args.log_file)]
554 else: 559 else:
555 sources = get_stats_files() 560 sources = get_stats_files()
556 parse_sources(sources, args.factor, args.verbose) 561 parse_sources(sources, args.factor, args.verbose)
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld