LEFT | RIGHT |
1 # coding: utf-8 | 1 # coding: utf-8 |
2 | 2 |
3 # This file is part of the Adblock Plus web scripts, | 3 # This file is part of the Adblock Plus web scripts, |
4 # Copyright (C) 2006-2013 Eyeo GmbH | 4 # Copyright (C) 2006-2013 Eyeo GmbH |
5 # | 5 # |
6 # Adblock Plus is free software: you can redistribute it and/or modify | 6 # Adblock Plus is free software: you can redistribute it and/or modify |
7 # it under the terms of the GNU General Public License version 3 as | 7 # it under the terms of the GNU General Public License version 3 as |
8 # published by the Free Software Foundation. | 8 # published by the Free Software Foundation. |
9 # | 9 # |
10 # Adblock Plus is distributed in the hope that it will be useful, | 10 # Adblock Plus is distributed in the hope that it will be useful, |
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of | 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 # GNU General Public License for more details. | 13 # GNU General Public License for more details. |
14 # | 14 # |
15 # You should have received a copy of the GNU General Public License | 15 # You should have received a copy of the GNU General Public License |
16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. | 16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. |
17 | 17 |
18 import argparse | 18 import argparse |
19 import codecs | 19 import codecs |
20 from collections import OrderedDict | 20 from collections import OrderedDict |
21 from datetime import datetime, timedelta | 21 from datetime import datetime, timedelta |
22 import errno | 22 import errno |
23 import gzip | 23 import gzip |
24 import json | 24 import json |
25 import math | 25 import math |
| 26 import multiprocessing |
| 27 import numbers |
26 import os | 28 import os |
27 import re | 29 import re |
28 import pygeoip | 30 import pygeoip |
29 import socket | 31 import socket |
30 from StringIO import StringIO | |
31 import subprocess | 32 import subprocess |
32 import sys | 33 import sys |
33 import traceback | 34 import traceback |
34 import urllib | 35 import urllib |
35 import urlparse | 36 import urlparse |
36 | 37 |
37 import sitescripts.stats.common as common | 38 import sitescripts.stats.common as common |
38 from sitescripts.utils import get_config, setupStderr | 39 from sitescripts.utils import get_config, setupStderr |
39 | 40 |
40 log_regexp = None | 41 log_regexp = None |
41 gecko_apps = None | 42 gecko_apps = None |
42 | 43 |
43 def open_stats_file(path): | 44 def open_stats_file(path): |
44 match = re.search(r"^ssh://(\w+)@([^/:]+)(?::(\d+))?/([^/]+)", path) | 45 parseresult = urlparse.urlparse(path) |
45 if match: | 46 if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostna
me and parseresult.path: |
46 user, host, port, filename = match.groups() | 47 command = [ |
47 command = ["ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k", "-l",
user, host, filename] | 48 "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k", |
48 if port: | 49 "-l", parseresult.username, |
49 command[1:1] = ["-P", port] | 50 parseresult.hostname, |
50 | 51 parseresult.path.lstrip("/") |
51 # Not using StringIO here would be better but gzip module needs seeking | 52 ] |
52 result = StringIO(subprocess.check_output(command)) | 53 if parseresult.port: |
53 elif path.startswith("http://") or path.startswith("https://"): | 54 command[1:1] = ["-P", str(parseresult.port)] |
54 result = StringIO(urllib.urlopen(path).read()) | 55 result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout |
| 56 elif parseresult.scheme in ("http", "https"): |
| 57 result = urllib.urlopen(path) |
55 elif os.path.exists(path): | 58 elif os.path.exists(path): |
56 result = open(path, "rb") | 59 result = open(path, "rb") |
57 else: | 60 else: |
58 raise IOError("Path '%s' not recognized" % path) | 61 raise IOError("Path '%s' not recognized" % path) |
59 | 62 |
60 if path.endswith(".gz"): | 63 if path.endswith(".gz"): |
61 result = gzip.GzipFile(fileobj=result) | 64 # Built-in gzip module doesn't support streaming (fixed in Python 3.2) |
| 65 result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.P
IPE).stdout |
62 return result | 66 return result |
63 | 67 |
64 def get_stats_files(): | 68 def get_stats_files(): |
65 config = get_config() | 69 config = get_config() |
66 | 70 |
67 prefix = "mirror_" | 71 prefix = "mirror_" |
68 options = filter(lambda o: o.startswith(prefix), config.options("stats")) | 72 options = filter(lambda o: o.startswith(prefix), config.options("stats")) |
69 for option in options: | 73 for option in options: |
70 if config.has_option("stats", option): | 74 if config.has_option("stats", option): |
71 value = config.get("stats", option) | 75 value = config.get("stats", option) |
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
200 if ua == "Adblock Plus": | 204 if ua == "Adblock Plus": |
201 return "ABP", "" | 205 return "ABP", "" |
202 | 206 |
203 return "Other", "" | 207 return "Other", "" |
204 | 208 |
205 def process_ip(ip, geo, geov6): | 209 def process_ip(ip, geo, geov6): |
206 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip) | 210 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip) |
207 if match: | 211 if match: |
208 ip = match.group(1) | 212 ip = match.group(1) |
209 | 213 |
210 if ":" in ip: | 214 try: |
211 country = geov6.country_code_by_addr(ip) | 215 if ":" in ip: |
212 else: | 216 country = geov6.country_code_by_addr(ip) |
213 country = geo.country_code_by_addr(ip) | 217 else: |
| 218 country = geo.country_code_by_addr(ip) |
| 219 except: |
| 220 traceback.print_exc() |
| 221 country = "" |
| 222 |
214 if country in (None, "", "--"): | 223 if country in (None, "", "--"): |
215 country = "unknown" | 224 country = "unknown" |
216 country = country.lower() | 225 country = country.lower() |
217 | 226 |
218 return ip, country | 227 return ip, country |
219 | 228 |
220 @cache_last | 229 @cache_last |
221 def parse_time(timestr, tz_hours, tz_minutes): | 230 def parse_time(timestr, tz_hours, tz_minutes): |
222 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S") | 231 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S") |
223 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h
ours)) | 232 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h
ours)) |
(...skipping 217 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
441 data[info["month"]] = {} | 450 data[info["month"]] = {} |
442 section = data[info["month"]] | 451 section = data[info["month"]] |
443 | 452 |
444 if info["file"] not in section: | 453 if info["file"] not in section: |
445 section[info["file"]] = {} | 454 section[info["file"]] = {} |
446 section = section[info["file"]] | 455 section = section[info["file"]] |
447 | 456 |
448 add_record(info, section) | 457 add_record(info, section) |
449 return data | 458 return data |
450 | 459 |
451 def merge_objects(object1, object2): | 460 def merge_objects(object1, object2, factor=1): |
452 for key, value in object2.iteritems(): | 461 for key, value in object2.iteritems(): |
453 if key in object1: | 462 try: |
454 if isinstance(value, int): | 463 key = unicode(key) |
455 object1[key] += value | 464 except UnicodeDecodeError: |
456 else: | 465 key = unicode(key, encoding="latin-1") |
457 merge_objects(object1[key], object2[key]) | 466 if isinstance(value, numbers.Number): |
458 else: | 467 object1[key] = object1.get(key, 0) + factor * value |
459 object1[key] = value | 468 else: |
460 | 469 merge_objects(object1.setdefault(key, {}), value, factor) |
461 def save_stats(server_type, data): | 470 |
| 471 def save_stats(server_type, data, factor=1): |
462 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil
ename_encode(server_type)) | 472 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil
ename_encode(server_type)) |
463 for month, month_data in data.iteritems(): | 473 for month, month_data in data.iteritems(): |
464 for name, file_data in month_data.iteritems(): | 474 for name, file_data in month_data.iteritems(): |
465 path = os.path.join(base_dir, common.filename_encode(month), common.filena
me_encode(name + ".json")) | 475 path = os.path.join(base_dir, common.filename_encode(month), common.filena
me_encode(name + ".json")) |
466 if os.path.exists(path): | 476 if os.path.exists(path): |
467 with codecs.open(path, "rb", encoding="utf-8") as fileobj: | 477 with codecs.open(path, "rb", encoding="utf-8") as fileobj: |
468 existing = json.load(fileobj) | 478 existing = json.load(fileobj) |
469 else: | 479 else: |
470 existing = {} | 480 existing = {} |
471 | 481 |
472 merge_objects(existing, file_data) | 482 merge_objects(existing, file_data, factor) |
473 | 483 |
474 dir = os.path.dirname(path) | 484 dir = os.path.dirname(path) |
475 try: | 485 try: |
476 os.makedirs(dir) | 486 os.makedirs(dir) |
477 except OSError, e: | 487 except OSError, e: |
478 if e.errno != errno.EEXIST: | 488 if e.errno != errno.EEXIST: |
479 raise | 489 raise |
480 | 490 |
481 with codecs.open(path, "wb", encoding="utf-8") as fileobj: | 491 with codecs.open(path, "wb", encoding="utf-8") as fileobj: |
482 json.dump(existing, fileobj, indent=2, sort_keys=True) | 492 json.dump(existing, fileobj, indent=2, sort_keys=True) |
483 | 493 |
484 def parse_file(mirror_name, server_type, log_file, geo, geov6, verbose): | 494 def parse_source((mirror_name, server_type, log_file)): |
485 ignored = set() | |
486 fileobj = open_stats_file(log_file) | |
487 try: | 495 try: |
488 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) | 496 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA
CHE) |
| 497 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR
Y_CACHE) |
| 498 |
| 499 ignored = set() |
| 500 fileobj = open_stats_file(log_file) |
| 501 try: |
| 502 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) |
| 503 finally: |
| 504 fileobj.close() |
| 505 return server_type, log_file, data, ignored |
| 506 except: |
| 507 print >>sys.stderr, "Unable to process log file '%s'" % log_file |
| 508 traceback.print_exc() |
| 509 return None, None, None, None |
| 510 |
| 511 def parse_sources(sources, factor=1, verbose=False): |
| 512 pool = multiprocessing.Pool() |
| 513 try: |
| 514 for server_type, log_file, data, ignored in pool.imap(parse_source, sources,
chunksize=1): |
| 515 if server_type == None: |
| 516 continue |
| 517 |
| 518 save_stats(server_type, data, factor) |
| 519 if verbose: |
| 520 print "Ignored files for %s" % log_file |
| 521 print "============================================================" |
| 522 print "\n".join(sorted(ignored)) |
489 finally: | 523 finally: |
490 fileobj.close() | 524 pool.close() |
491 save_stats(server_type, data) | |
492 | |
493 if verbose: | |
494 print "Ignored files for %s" % log_file | |
495 print "============================================================" | |
496 print "\n".join(sorted(ignored)) | |
497 | 525 |
498 if __name__ == "__main__": | 526 if __name__ == "__main__": |
499 setupStderr() | 527 setupStderr() |
500 | 528 |
501 parser = argparse.ArgumentParser(description="Processes log files and merges t
hem into the stats database") | 529 parser = argparse.ArgumentParser(description="Processes log files and merges t
hem into the stats database") |
502 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T
rue, default=False, help="Verbose mode, ignored requests will be listed") | 530 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T
rue, default=False, help="Verbose mode, ignored requests will be listed") |
| 531 parser.add_argument("--revert", dest="factor", action="store_const", const=-1,
default=1, help="Remove log data from the database") |
503 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server
that the file belongs to") | 532 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server
that the file belongs to") |
504 parser.add_argument("server_type", nargs="?", help="Server type like download,
update or subscription") | 533 parser.add_argument("server_type", nargs="?", help="Server type like download,
update or subscription") |
505 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local
file path, http:// or ssh:// URL") | 534 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local
file path, http:// or ssh:// URL") |
506 args = parser.parse_args() | 535 args = parser.parse_args() |
507 | 536 |
508 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACH
E) | |
509 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_
CACHE) | |
510 | |
511 if args.mirror_name and args.server_type and args.log_file: | 537 if args.mirror_name and args.server_type and args.log_file: |
512 parse_file(args.mirror_name, args.server_type, args.log_file, geo, geov6, ar
gs.verbose) | 538 sources = [(args.mirror_name, args.server_type, args.log_file)] |
513 else: | 539 else: |
514 for mirror_name, server_type, log_file in get_stats_files(): | 540 sources = get_stats_files() |
515 try: | 541 parse_sources(sources, args.factor, args.verbose) |
516 parse_file(mirror_name, server_type, log_file, geo, geov6, args.verbose) | |
517 except: | |
518 print >>sys.stderr, "Unable to process log file '%s'" % log_file | |
519 traceback.print_exc() | |
520 | |
LEFT | RIGHT |