Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Side by Side Diff: sitescripts/stats/bin/logprocessor.py

Issue 6231865702744064: Issue 2239 - Make logprocessor close files and wait for subprocesses (Closed)
Patch Set: Created March 31, 2015, 10:06 a.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2015 Eyeo GmbH 4 # Copyright (C) 2006-2015 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
(...skipping 24 matching lines...) Expand all
35 import traceback 35 import traceback
36 import urllib 36 import urllib
37 import urlparse 37 import urlparse
38 38
39 import sitescripts.stats.common as common 39 import sitescripts.stats.common as common
40 from sitescripts.utils import get_config, setupStderr 40 from sitescripts.utils import get_config, setupStderr
41 41
42 log_regexp = None 42 log_regexp = None
43 gecko_apps = None 43 gecko_apps = None
44 44
45 def open_stats_file(path): 45 class StatsFile:
46 parseresult = urlparse.urlparse(path) 46 def __init__(self, path):
47 if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostna me and parseresult.path: 47 self._inner_file = None
48 command = [ 48 self._processes = []
49 "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k",
50 "-l", parseresult.username,
51 parseresult.hostname,
52 parseresult.path.lstrip("/")
53 ]
54 if parseresult.port:
55 command[1:1] = ["-P", str(parseresult.port)]
56 result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout
57 elif parseresult.scheme in ("http", "https"):
58 result = urllib.urlopen(path)
59 elif os.path.exists(path):
60 result = open(path, "rb")
61 else:
62 raise IOError("Path '%s' not recognized" % path)
63 49
64 if path.endswith(".gz"): 50 parseresult = urlparse.urlparse(path)
65 # Built-in gzip module doesn't support streaming (fixed in Python 3.2) 51 if parseresult.scheme == "ssh" and parseresult.username and parseresult.host name and parseresult.path:
66 result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.P IPE).stdout 52 command = [
67 return result 53 "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k",
54 "-l", parseresult.username,
55 parseresult.hostname,
56 parseresult.path.lstrip("/")
57 ]
58 if parseresult.port:
59 command[1:1] = ["-P", str(parseresult.port)]
60 ssh_process = subprocess.Popen(command, stdout=subprocess.PIPE)
61 self._processes.push(ssh_process)
62 self._file = ssh_process.stdout
63 elif parseresult.scheme in ("http", "https"):
64 self._file = urllib.urlopen(path)
65 elif os.path.exists(path):
66 self._file = open(path, "rb")
67 else:
68 raise IOError("Path '%s' not recognized" % path)
69
70 self._files.append(result)
71
72 if path.endswith(".gz"):
73 # Built-in gzip module doesn't support streaming (fixed in Python 3.2)
74 gzip_process = subprocess.Popen(["gzip", "-cd"], stdin=self._file, stdout= subprocess.PIPE)
75 self._processes.append(gzip_process)
76 self._file, self._inner_file = gzip_process.stdout, self._file
77
78 def __getattr__(self, name):
79 return getattr(self._file, name)
80
81 def close(self):
82 self._file.close()
83 if self._inner_file:
84 self._inner_file.close()
85 for process in self._processes:
86 process.wait()
68 87
69 def get_stats_files(): 88 def get_stats_files():
70 config = get_config() 89 config = get_config()
71 90
72 prefix = "mirror_" 91 prefix = "mirror_"
73 options = filter(lambda o: o.startswith(prefix), config.options("stats")) 92 options = filter(lambda o: o.startswith(prefix), config.options("stats"))
74 for option in options: 93 for option in options:
75 if config.has_option("stats", option): 94 if config.has_option("stats", option):
76 value = config.get("stats", option) 95 value = config.get("stats", option)
77 if " " in value: 96 if " " in value:
(...skipping 423 matching lines...) Expand 10 before | Expand all | Expand 10 after
501 520
502 with codecs.open(path, "wb", encoding="utf-8") as fileobj: 521 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
503 json.dump(existing, fileobj, indent=2, sort_keys=True) 522 json.dump(existing, fileobj, indent=2, sort_keys=True)
504 523
505 def parse_source(factor, lock, (mirror_name, server_type, log_file)): 524 def parse_source(factor, lock, (mirror_name, server_type, log_file)):
506 try: 525 try:
507 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE) 526 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE)
508 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE) 527 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE)
509 528
510 ignored = set() 529 ignored = set()
511 fileobj = open_stats_file(log_file) 530 fileobj = StatsFile(log_file)
512 try: 531 try:
513 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) 532 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
514 finally: 533 finally:
515 fileobj.close() 534 fileobj.close()
516 535
517 lock.acquire() 536 lock.acquire()
518 try: 537 try:
519 save_stats(server_type, data, factor) 538 save_stats(server_type, data, factor)
520 finally: 539 finally:
521 lock.release() 540 lock.release()
(...skipping 25 matching lines...) Expand all
547 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to") 566 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
548 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription") 567 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
549 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL") 568 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
550 args = parser.parse_args() 569 args = parser.parse_args()
551 570
552 if args.mirror_name and args.server_type and args.log_file: 571 if args.mirror_name and args.server_type and args.log_file:
553 sources = [(args.mirror_name, args.server_type, args.log_file)] 572 sources = [(args.mirror_name, args.server_type, args.log_file)]
554 else: 573 else:
555 sources = get_stats_files() 574 sources = get_stats_files()
556 parse_sources(sources, args.factor, args.verbose) 575 parse_sources(sources, args.factor, args.verbose)
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld