Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Side by Side Diff: sitescripts/stats/bin/logprocessor.py

Issue 6231865702744064: Issue 2239 - Make logprocessor close files and wait for subprocesses (Closed)
Patch Set: Removed usage of non-existing attribute Created April 1, 2015, 10:29 a.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2015 Eyeo GmbH 4 # Copyright (C) 2006-2015 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
(...skipping 24 matching lines...) Expand all
35 import traceback 35 import traceback
36 import urllib 36 import urllib
37 import urlparse 37 import urlparse
38 38
39 import sitescripts.stats.common as common 39 import sitescripts.stats.common as common
40 from sitescripts.utils import get_config, setupStderr 40 from sitescripts.utils import get_config, setupStderr
41 41
42 log_regexp = None 42 log_regexp = None
43 gecko_apps = None 43 gecko_apps = None
44 44
45 def open_stats_file(path): 45 class StatsFile:
46 parseresult = urlparse.urlparse(path) 46 def __init__(self, path):
47 if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostna me and parseresult.path: 47 self._inner_file = None
48 command = [ 48 self._processes = []
49 "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k",
50 "-l", parseresult.username,
51 parseresult.hostname,
52 parseresult.path.lstrip("/")
53 ]
54 if parseresult.port:
55 command[1:1] = ["-P", str(parseresult.port)]
56 result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout
57 elif parseresult.scheme in ("http", "https"):
58 result = urllib.urlopen(path)
59 elif os.path.exists(path):
60 result = open(path, "rb")
61 else:
62 raise IOError("Path '%s' not recognized" % path)
63 49
64 if path.endswith(".gz"): 50 parseresult = urlparse.urlparse(path)
65 # Built-in gzip module doesn't support streaming (fixed in Python 3.2) 51 if parseresult.scheme == "ssh" and parseresult.username and parseresult.host name and parseresult.path:
66 result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.P IPE).stdout 52 command = [
67 return result 53 "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k",
54 "-l", parseresult.username,
55 parseresult.hostname,
56 parseresult.path.lstrip("/")
57 ]
58 if parseresult.port:
59 command[1:1] = ["-P", str(parseresult.port)]
60 ssh_process = subprocess.Popen(command, stdout=subprocess.PIPE)
61 self._processes.push(ssh_process)
62 self._file = ssh_process.stdout
63 elif parseresult.scheme in ("http", "https"):
64 self._file = urllib.urlopen(path)
65 elif os.path.exists(path):
66 self._file = open(path, "rb")
67 else:
68 raise IOError("Path '%s' not recognized" % path)
69
70 if path.endswith(".gz"):
71 # Built-in gzip module doesn't support streaming (fixed in Python 3.2)
72 gzip_process = subprocess.Popen(["gzip", "-cd"], stdin=self._file, stdout= subprocess.PIPE)
73 self._processes.append(gzip_process)
74 self._file, self._inner_file = gzip_process.stdout, self._file
75
76 def __getattr__(self, name):
77 return getattr(self._file, name)
78
79 def close(self):
80 self._file.close()
81 if self._inner_file:
82 self._inner_file.close()
83 for process in self._processes:
84 process.wait()
68 85
69 def get_stats_files(): 86 def get_stats_files():
70 config = get_config() 87 config = get_config()
71 88
72 prefix = "mirror_" 89 prefix = "mirror_"
73 options = filter(lambda o: o.startswith(prefix), config.options("stats")) 90 options = filter(lambda o: o.startswith(prefix), config.options("stats"))
74 for option in options: 91 for option in options:
75 if config.has_option("stats", option): 92 if config.has_option("stats", option):
76 value = config.get("stats", option) 93 value = config.get("stats", option)
77 if " " in value: 94 if " " in value:
(...skipping 423 matching lines...) Expand 10 before | Expand all | Expand 10 after
501 518
502 with codecs.open(path, "wb", encoding="utf-8") as fileobj: 519 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
503 json.dump(existing, fileobj, indent=2, sort_keys=True) 520 json.dump(existing, fileobj, indent=2, sort_keys=True)
504 521
505 def parse_source(factor, lock, (mirror_name, server_type, log_file)): 522 def parse_source(factor, lock, (mirror_name, server_type, log_file)):
506 try: 523 try:
507 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE) 524 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE)
508 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE) 525 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE)
509 526
510 ignored = set() 527 ignored = set()
511 fileobj = open_stats_file(log_file) 528 fileobj = StatsFile(log_file)
512 try: 529 try:
513 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) 530 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
514 finally: 531 finally:
515 fileobj.close() 532 fileobj.close()
516 533
517 lock.acquire() 534 lock.acquire()
518 try: 535 try:
519 save_stats(server_type, data, factor) 536 save_stats(server_type, data, factor)
520 finally: 537 finally:
521 lock.release() 538 lock.release()
(...skipping 25 matching lines...) Expand all
547 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to") 564 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
548 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription") 565 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
549 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL") 566 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
550 args = parser.parse_args() 567 args = parser.parse_args()
551 568
552 if args.mirror_name and args.server_type and args.log_file: 569 if args.mirror_name and args.server_type and args.log_file:
553 sources = [(args.mirror_name, args.server_type, args.log_file)] 570 sources = [(args.mirror_name, args.server_type, args.log_file)]
554 else: 571 else:
555 sources = get_stats_files() 572 sources = get_stats_files()
556 parse_sources(sources, args.factor, args.verbose) 573 parse_sources(sources, args.factor, args.verbose)
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld