Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Side by Side Diff: sitescripts/stats/bin/logprocessor.py

Issue 5182947690807296: Centralize stats processing, have the stats server pull in logs (Closed)
Patch Set: Created Dec. 20, 2013, 1:07 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2013 Eyeo GmbH 4 # Copyright (C) 2006-2013 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details. 13 # GNU General Public License for more details.
14 # 14 #
15 # You should have received a copy of the GNU General Public License 15 # You should have received a copy of the GNU General Public License
16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. 16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
17 17
18 import os, sys, codecs, re, math, urllib, urlparse, socket, json 18 import argparse
19 import codecs
20 from collections import OrderedDict
21 from datetime import datetime, timedelta
22 import errno
23 import gzip
24 import json
25 import math
26 import os
27 import re
19 import pygeoip 28 import pygeoip
20 from collections import OrderedDict 29 import socket
30 from StringIO import StringIO
31 import subprocess
32 import sys
33 import traceback
34 import urllib
35 import urlparse
36
21 import sitescripts.stats.common as common 37 import sitescripts.stats.common as common
22 from sitescripts.utils import get_config, setupStderr 38 from sitescripts.utils import get_config, setupStderr
23 from datetime import datetime, timedelta
24 39
25 log_regexp = None 40 log_regexp = None
26 mirror_name = None
27 gecko_apps = None 41 gecko_apps = None
28 42
43 def open_stats_file(path):
44 match = re.search(r"^ssh://(\w+)@([^/:]+)(?::(\d+))?/([^/]+)", path)
45 if match:
46 user, host, port, filename = match.groups()
47 command = ["ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k", "-l", user, host, filename]
48 if port:
49 command[1:1] = ["-P", port]
50
51 # Not using StringIO here would be better but gzip module needs seeking
52 result = StringIO(subprocess.check_output(command))
53 elif path.startswith("http://") or path.startswith("https://"):
54 result = StringIO(urllib.urlopen(path).read())
55 elif os.path.exists(path):
56 result = open(path, "rb")
57 else:
58 raise IOError("Path '%s' not recognized" % path)
59
60 if path.endswith(".gz"):
61 result = gzip.GzipFile(fileobj=result)
62 return result
63
64 def get_stats_files():
65 config = get_config()
66
67 prefix = "mirror_"
68 options = filter(lambda o: o.startswith(prefix), config.options("stats"))
69 for option in options:
70 if config.has_option("stats", option):
71 value = config.get("stats", option)
72 if " " in value:
73 yield [option[len(prefix):]] + value.split(None, 1)
74 else:
75 print >>sys.stderr, "Option '%s' has invalid value: '%s'" % (option, val ue)
76 else:
77 print >>sys.stderr, "Option '%s' not found in the configuration" % option
78
29 def cache_lru(func): 79 def cache_lru(func):
30 """ 80 """
31 Decorator that memoizes the return values of a single-parameter function in 81 Decorator that memoizes the return values of a single-parameter function in
32 case it is called again with the same parameter. The 1024 most recent 82 case it is called again with the same parameter. The 1024 most recent
33 results are saved. 83 results are saved.
34 """ 84 """
35 85
36 results = OrderedDict() 86 results = OrderedDict()
37 results.entries_left = 1024 87 results.entries_left = 1024
38 88
(...skipping 259 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 348
299 # Only leave the major and minor release number for application 349 # Only leave the major and minor release number for application
300 applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion) 350 applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion)
301 351
302 return version, application, applicationVersion 352 return version, application, applicationVersion
303 353
304 def parse_update_flag(query): 354 def parse_update_flag(query):
305 return "update" if query == "update" else "install" 355 return "update" if query == "update" else "install"
306 356
307 def parse_record(line, ignored, geo, geov6): 357 def parse_record(line, ignored, geo, geov6):
308 global log_regexp, mirror_name 358 global log_regexp
309 if log_regexp == None: 359 if log_regexp == None:
310 log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" " ([^"]*)")?') 360 log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" " ([^"]*)")?')
311 if mirror_name == None:
312 mirror_name = re.sub(r"\..*", "", socket.gethostname())
313 361
314 match = re.search(log_regexp, line) 362 match = re.search(log_regexp, line)
315 if not match: 363 if not match:
316 return None 364 return None
317 365
318 status = int(match.group(6)) 366 status = int(match.group(6))
319 if status != 200: 367 if status != 200:
320 return None 368 return None
321 369
322 info = { 370 info = {
323 "mirror": mirror_name,
324 "size": int(match.group(7)), 371 "size": int(match.group(7)),
325 } 372 }
326 373
327 info["ip"], info["country"] = process_ip(match.group(1), geo, geov6) 374 info["ip"], info["country"] = process_ip(match.group(1), geo, geov6)
328 info["time"], info["month"], info["day"], info["weekday"], info["hour"] = pars e_time(match.group(2), int(match.group(3)), int(match.group(4))) 375 info["time"], info["month"], info["day"], info["weekday"], info["hour"] = pars e_time(match.group(2), int(match.group(3)), int(match.group(4)))
329 info["file"], info["query"] = parse_path(match.group(5)) 376 info["file"], info["query"] = parse_path(match.group(5))
330 info["ua"], info["uaversion"] = parse_ua(match.group(8)) 377 info["ua"], info["uaversion"] = parse_ua(match.group(8))
331 info["fullua"] = "%s %s" % (info["ua"], info["uaversion"]) 378 info["fullua"] = "%s %s" % (info["ua"], info["uaversion"])
332 info["clientid"] = match.group(9) 379 info["clientid"] = match.group(9)
333 380
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
375 continue 422 continue
376 423
377 value = info[field] 424 value = info[field]
378 if field not in section: 425 if field not in section:
379 section[field] = {} 426 section[field] = {}
380 if value not in section[field]: 427 if value not in section[field]:
381 section[field][value] = {} 428 section[field][value] = {}
382 429
383 add_record(info, section[field][value], ignore_fields + (field,)) 430 add_record(info, section[field][value], ignore_fields + (field,))
384 431
385 def parse_stdin(geo, geov6, verbose): 432 def parse_fileobj(mirror_name, fileobj, geo, geov6, ignored):
386 data = {} 433 data = {}
387 ignored = set() 434 for line in fileobj:
388 for line in sys.stdin:
389 info = parse_record(line, ignored, geo, geov6) 435 info = parse_record(line, ignored, geo, geov6)
390 if info == None: 436 if info == None:
391 continue 437 continue
392 438
439 info["mirror"] = mirror_name
393 if info["month"] not in data: 440 if info["month"] not in data:
394 data[info["month"]] = {} 441 data[info["month"]] = {}
395 section = data[info["month"]] 442 section = data[info["month"]]
396 443
397 if info["file"] not in section: 444 if info["file"] not in section:
398 section[info["file"]] = {} 445 section[info["file"]] = {}
399 section = section[info["file"]] 446 section = section[info["file"]]
400 447
401 add_record(info, section) 448 add_record(info, section)
449 return data
450
451 def merge_objects(object1, object2):
452 for key, value in object2.iteritems():
453 if key in object1:
454 if isinstance(value, int):
455 object1[key] += value
456 else:
457 merge_objects(object1[key], object2[key])
458 else:
459 object1[key] = value
460
461 def save_stats(server_type, data):
462 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type))
463 for month, month_data in data.iteritems():
464 for name, file_data in month_data.iteritems():
465 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json"))
466 if os.path.exists(path):
467 with codecs.open(path, "rb", encoding="utf-8") as fileobj:
468 existing = json.load(fileobj)
469 else:
470 existing = {}
471
472 merge_objects(existing, file_data)
473
474 dir = os.path.dirname(path)
475 try:
476 os.makedirs(dir)
477 except OSError, e:
478 if e.errno != errno.EEXIST:
479 raise
480
481 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
482 json.dump(existing, fileobj, indent=2, sort_keys=True)
483
484 def parse_file(mirror_name, server_type, log_file, geo, geov6, verbose):
485 ignored = set()
486 fileobj = open_stats_file(log_file)
487 try:
488 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
489 finally:
490 fileobj.close()
491 save_stats(server_type, data)
402 492
403 if verbose: 493 if verbose:
404 print "Ignored files" 494 print "Ignored files for %s" % log_file
405 print "=============" 495 print "============================================================"
406 print "\n".join(sorted(ignored)) 496 print "\n".join(sorted(ignored))
407 return data
408 497
409 if __name__ == "__main__": 498 if __name__ == "__main__":
410 setupStderr() 499 setupStderr()
411 500
412 verbose = (len(sys.argv) >= 2 and sys.argv[1] == "verbose") 501 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database")
502 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed")
503 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
504 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
505 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
506 args = parser.parse_args()
507
413 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACH E) 508 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACH E)
414 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_ CACHE) 509 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_ CACHE)
415 result = parse_stdin(geo, geov6, verbose)
416 510
417 with codecs.open(get_config().get("stats", "tempFile"), "wb", encoding="utf-8" ) as file: 511 if args.mirror_name and args.server_type and args.log_file:
418 json.dump(result, file, indent=2, sort_keys=True) 512 parse_file(args.mirror_name, args.server_type, args.log_file, geo, geov6, ar gs.verbose)
513 else:
514 for mirror_name, server_type, log_file in get_stats_files():
515 try:
516 parse_file(mirror_name, server_type, log_file, geo, geov6, args.verbose)
517 except:
518 print >>sys.stderr, "Unable to process log file '%s'" % log_file
519 traceback.print_exc()
520
OLDNEW
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld