Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Side by Side Diff: sitescripts/stats/bin/logprocessor.py

Issue 5182947690807296: Centralize stats processing, have the stats server pull in logs (Closed)
Patch Set: Fixed exception thrown on invalid IPs Created Dec. 20, 2013, 2:46 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2013 Eyeo GmbH 4 # Copyright (C) 2006-2013 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details. 13 # GNU General Public License for more details.
14 # 14 #
15 # You should have received a copy of the GNU General Public License 15 # You should have received a copy of the GNU General Public License
16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. 16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
17 17
18 import os, sys, codecs, re, math, urllib, urlparse, socket, json 18 import argparse
19 import codecs
20 from collections import OrderedDict
21 from datetime import datetime, timedelta
22 import errno
23 import gzip
24 import json
25 import math
26 import os
27 import re
19 import pygeoip 28 import pygeoip
20 from collections import OrderedDict 29 import socket
30 from StringIO import StringIO
31 import subprocess
32 import sys
33 import traceback
34 import urllib
35 import urlparse
36
21 import sitescripts.stats.common as common 37 import sitescripts.stats.common as common
22 from sitescripts.utils import get_config, setupStderr 38 from sitescripts.utils import get_config, setupStderr
23 from datetime import datetime, timedelta
24 39
25 log_regexp = None 40 log_regexp = None
26 mirror_name = None
27 gecko_apps = None 41 gecko_apps = None
28 42
43 def open_stats_file(path):
44 match = re.search(r"^ssh://(\w+)@([^/:]+)(?::(\d+))?/([^/]+)", path)
Sebastian Noack 2013/12/20 15:48:37 You can use urlparse (from the built-in module "ur
45 if match:
46 user, host, port, filename = match.groups()
47 command = ["ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k", "-l", user, host, filename]
48 if port:
49 command[1:1] = ["-P", port]
50
51 # Not using StringIO here would be better but gzip module needs seeking
52 result = StringIO(subprocess.check_output(command))
53 elif path.startswith("http://") or path.startswith("https://"):
54 result = StringIO(urllib.urlopen(path).read())
55 elif os.path.exists(path):
56 result = open(path, "rb")
57 else:
58 raise IOError("Path '%s' not recognized" % path)
59
60 if path.endswith(".gz"):
61 result = gzip.GzipFile(fileobj=result)
62 return result
63
64 def get_stats_files():
65 config = get_config()
66
67 prefix = "mirror_"
68 options = filter(lambda o: o.startswith(prefix), config.options("stats"))
69 for option in options:
70 if config.has_option("stats", option):
71 value = config.get("stats", option)
72 if " " in value:
73 yield [option[len(prefix):]] + value.split(None, 1)
74 else:
75 print >>sys.stderr, "Option '%s' has invalid value: '%s'" % (option, val ue)
76 else:
77 print >>sys.stderr, "Option '%s' not found in the configuration" % option
78
29 def cache_lru(func): 79 def cache_lru(func):
30 """ 80 """
31 Decorator that memoizes the return values of a single-parameter function in 81 Decorator that memoizes the return values of a single-parameter function in
32 case it is called again with the same parameter. The 1024 most recent 82 case it is called again with the same parameter. The 1024 most recent
33 results are saved. 83 results are saved.
34 """ 84 """
35 85
36 results = OrderedDict() 86 results = OrderedDict()
37 results.entries_left = 1024 87 results.entries_left = 1024
38 88
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 if ua == "Adblock Plus": 200 if ua == "Adblock Plus":
151 return "ABP", "" 201 return "ABP", ""
152 202
153 return "Other", "" 203 return "Other", ""
154 204
155 def process_ip(ip, geo, geov6): 205 def process_ip(ip, geo, geov6):
156 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip) 206 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip)
157 if match: 207 if match:
158 ip = match.group(1) 208 ip = match.group(1)
159 209
160 if ":" in ip: 210 try:
161 country = geov6.country_code_by_addr(ip) 211 if ":" in ip:
162 else: 212 country = geov6.country_code_by_addr(ip)
163 country = geo.country_code_by_addr(ip) 213 else:
214 country = geo.country_code_by_addr(ip)
215 except:
216 traceback.print_exc()
217 country = ""
218
164 if country in (None, "", "--"): 219 if country in (None, "", "--"):
165 country = "unknown" 220 country = "unknown"
166 country = country.lower() 221 country = country.lower()
167 222
168 return ip, country 223 return ip, country
169 224
170 @cache_last 225 @cache_last
171 def parse_time(timestr, tz_hours, tz_minutes): 226 def parse_time(timestr, tz_hours, tz_minutes):
172 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S") 227 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S")
173 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h ours)) 228 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h ours))
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 353
299 # Only leave the major and minor release number for application 354 # Only leave the major and minor release number for application
300 applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion) 355 applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion)
301 356
302 return version, application, applicationVersion 357 return version, application, applicationVersion
303 358
304 def parse_update_flag(query): 359 def parse_update_flag(query):
305 return "update" if query == "update" else "install" 360 return "update" if query == "update" else "install"
306 361
307 def parse_record(line, ignored, geo, geov6): 362 def parse_record(line, ignored, geo, geov6):
308 global log_regexp, mirror_name 363 global log_regexp
309 if log_regexp == None: 364 if log_regexp == None:
310 log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" " ([^"]*)")?') 365 log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" " ([^"]*)")?')
311 if mirror_name == None:
312 mirror_name = re.sub(r"\..*", "", socket.gethostname())
313 366
314 match = re.search(log_regexp, line) 367 match = re.search(log_regexp, line)
315 if not match: 368 if not match:
316 return None 369 return None
317 370
318 status = int(match.group(6)) 371 status = int(match.group(6))
319 if status != 200: 372 if status != 200:
320 return None 373 return None
321 374
322 info = { 375 info = {
323 "mirror": mirror_name,
324 "size": int(match.group(7)), 376 "size": int(match.group(7)),
325 } 377 }
326 378
327 info["ip"], info["country"] = process_ip(match.group(1), geo, geov6) 379 info["ip"], info["country"] = process_ip(match.group(1), geo, geov6)
328 info["time"], info["month"], info["day"], info["weekday"], info["hour"] = pars e_time(match.group(2), int(match.group(3)), int(match.group(4))) 380 info["time"], info["month"], info["day"], info["weekday"], info["hour"] = pars e_time(match.group(2), int(match.group(3)), int(match.group(4)))
329 info["file"], info["query"] = parse_path(match.group(5)) 381 info["file"], info["query"] = parse_path(match.group(5))
330 info["ua"], info["uaversion"] = parse_ua(match.group(8)) 382 info["ua"], info["uaversion"] = parse_ua(match.group(8))
331 info["fullua"] = "%s %s" % (info["ua"], info["uaversion"]) 383 info["fullua"] = "%s %s" % (info["ua"], info["uaversion"])
332 info["clientid"] = match.group(9) 384 info["clientid"] = match.group(9)
333 385
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
375 continue 427 continue
376 428
377 value = info[field] 429 value = info[field]
378 if field not in section: 430 if field not in section:
379 section[field] = {} 431 section[field] = {}
380 if value not in section[field]: 432 if value not in section[field]:
381 section[field][value] = {} 433 section[field][value] = {}
382 434
383 add_record(info, section[field][value], ignore_fields + (field,)) 435 add_record(info, section[field][value], ignore_fields + (field,))
384 436
385 def parse_stdin(geo, geov6, verbose): 437 def parse_fileobj(mirror_name, fileobj, geo, geov6, ignored):
386 data = {} 438 data = {}
387 ignored = set() 439 for line in fileobj:
388 for line in sys.stdin:
389 info = parse_record(line, ignored, geo, geov6) 440 info = parse_record(line, ignored, geo, geov6)
390 if info == None: 441 if info == None:
391 continue 442 continue
392 443
444 info["mirror"] = mirror_name
393 if info["month"] not in data: 445 if info["month"] not in data:
394 data[info["month"]] = {} 446 data[info["month"]] = {}
395 section = data[info["month"]] 447 section = data[info["month"]]
396 448
397 if info["file"] not in section: 449 if info["file"] not in section:
398 section[info["file"]] = {} 450 section[info["file"]] = {}
399 section = section[info["file"]] 451 section = section[info["file"]]
400 452
401 add_record(info, section) 453 add_record(info, section)
454 return data
455
456 def merge_objects(object1, object2):
457 for key, value in object2.iteritems():
458 if key in object1:
459 if isinstance(value, int):
460 object1[key] += value
461 else:
462 merge_objects(object1[key], object2[key])
463 else:
464 object1[key] = value
465
466 def save_stats(server_type, data):
467 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type))
468 for month, month_data in data.iteritems():
469 for name, file_data in month_data.iteritems():
470 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json"))
471 if os.path.exists(path):
472 with codecs.open(path, "rb", encoding="utf-8") as fileobj:
473 existing = json.load(fileobj)
474 else:
475 existing = {}
476
477 merge_objects(existing, file_data)
478
479 dir = os.path.dirname(path)
480 try:
481 os.makedirs(dir)
482 except OSError, e:
483 if e.errno != errno.EEXIST:
484 raise
485
486 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
487 json.dump(existing, fileobj, indent=2, sort_keys=True)
488
489 def parse_file(mirror_name, server_type, log_file, geo, geov6, verbose):
490 ignored = set()
491 fileobj = open_stats_file(log_file)
492 try:
493 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
494 finally:
495 fileobj.close()
496 save_stats(server_type, data)
402 497
403 if verbose: 498 if verbose:
404 print "Ignored files" 499 print "Ignored files for %s" % log_file
405 print "=============" 500 print "============================================================"
406 print "\n".join(sorted(ignored)) 501 print "\n".join(sorted(ignored))
407 return data
408 502
409 if __name__ == "__main__": 503 if __name__ == "__main__":
410 setupStderr() 504 setupStderr()
411 505
412 verbose = (len(sys.argv) >= 2 and sys.argv[1] == "verbose") 506 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database")
507 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed")
508 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
509 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
510 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
511 args = parser.parse_args()
512
413 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACH E) 513 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACH E)
414 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_ CACHE) 514 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_ CACHE)
415 result = parse_stdin(geo, geov6, verbose)
416 515
417 with codecs.open(get_config().get("stats", "tempFile"), "wb", encoding="utf-8" ) as file: 516 if args.mirror_name and args.server_type and args.log_file:
418 json.dump(result, file, indent=2, sort_keys=True) 517 parse_file(args.mirror_name, args.server_type, args.log_file, geo, geov6, ar gs.verbose)
518 else:
519 for mirror_name, server_type, log_file in get_stats_files():
520 try:
521 parse_file(mirror_name, server_type, log_file, geo, geov6, args.verbose)
522 except:
523 print >>sys.stderr, "Unable to process log file '%s'" % log_file
524 traceback.print_exc()
525
OLDNEW
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld