| Index: sitescripts/stats/bin/logprocessor.py |
| =================================================================== |
| --- a/sitescripts/stats/bin/logprocessor.py |
| +++ b/sitescripts/stats/bin/logprocessor.py |
| @@ -10,27 +10,81 @@ |
| # Adblock Plus is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. |
| -import os, sys, codecs, re, math, urllib, urlparse, socket, json |
| +import argparse |
| +import codecs |
| +from collections import OrderedDict |
| +from datetime import datetime, timedelta |
| +import errno |
| +import gzip |
| +import json |
| +import math |
| +import multiprocessing |
| +import numbers |
| +import os |
| +import re |
| import pygeoip |
| -from collections import OrderedDict |
| +import socket |
| +import subprocess |
| +import sys |
| +import traceback |
| +import urllib |
| +import urlparse |
| + |
| import sitescripts.stats.common as common |
| from sitescripts.utils import get_config, setupStderr |
| -from datetime import datetime, timedelta |
| log_regexp = None |
| -mirror_name = None |
| gecko_apps = None |
| +def open_stats_file(path): |
| + parseresult = urlparse.urlparse(path) |
| + if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostname and parseresult.path: |
| + command = [ |
| + "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k", |
| + "-l", parseresult.username, |
| + parseresult.hostname, |
| + parseresult.path.lstrip("/") |
| + ] |
| + if parseresult.port: |
| + command[1:1] = ["-P", str(parseresult.port)] |
| + result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout |
| + elif parseresult.scheme in ("http", "https"): |
| + result = urllib.urlopen(path) |
| + elif os.path.exists(path): |
| + result = open(path, "rb") |
| + else: |
| + raise IOError("Path '%s' not recognized" % path) |
| + |
| + if path.endswith(".gz"): |
| + # Built-in gzip module doesn't support streaming (fixed in Python 3.2) |
| + result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.PIPE).stdout |
| + return result |
| + |
| +def get_stats_files(): |
| + config = get_config() |
| + |
| + prefix = "mirror_" |
| + options = filter(lambda o: o.startswith(prefix), config.options("stats")) |
| + for option in options: |
| + if config.has_option("stats", option): |
| + value = config.get("stats", option) |
| + if " " in value: |
| + yield [option[len(prefix):]] + value.split(None, 1) |
| + else: |
| + print >>sys.stderr, "Option '%s' has invalid value: '%s'" % (option, value) |
| + else: |
| + print >>sys.stderr, "Option '%s' not found in the configuration" % option |
| + |
| def cache_lru(func): |
| """ |
| Decorator that memoizes the return values of a single-parameter function in |
| case it is called again with the same parameter. The 1024 most recent |
| results are saved. |
| """ |
| results = OrderedDict() |
| @@ -152,20 +206,25 @@ def parse_ua(ua): |
| return "Other", "" |
| def process_ip(ip, geo, geov6): |
| match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip) |
| if match: |
| ip = match.group(1) |
| - if ":" in ip: |
| - country = geov6.country_code_by_addr(ip) |
| - else: |
| - country = geo.country_code_by_addr(ip) |
| + try: |
| + if ":" in ip: |
| + country = geov6.country_code_by_addr(ip) |
| + else: |
| + country = geo.country_code_by_addr(ip) |
| + except: |
| + traceback.print_exc() |
| + country = "" |
| + |
| if country in (None, "", "--"): |
| country = "unknown" |
| country = country.lower() |
| return ip, country |
| @cache_last |
| def parse_time(timestr, tz_hours, tz_minutes): |
| @@ -300,32 +359,29 @@ def parse_chrome_query(query): |
| applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion) |
| return version, application, applicationVersion |
| def parse_update_flag(query): |
| return "update" if query == "update" else "install" |
| def parse_record(line, ignored, geo, geov6): |
| - global log_regexp, mirror_name |
| + global log_regexp |
| if log_regexp == None: |
| log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" "([^"]*)")?') |
| - if mirror_name == None: |
| - mirror_name = re.sub(r"\..*", "", socket.gethostname()) |
| match = re.search(log_regexp, line) |
| if not match: |
| return None |
| status = int(match.group(6)) |
| if status != 200: |
| return None |
| info = { |
| - "mirror": mirror_name, |
| "size": int(match.group(7)), |
| } |
| info["ip"], info["country"] = process_ip(match.group(1), geo, geov6) |
| info["time"], info["month"], info["day"], info["weekday"], info["hour"] = parse_time(match.group(2), int(match.group(3)), int(match.group(4))) |
| info["file"], info["query"] = parse_path(match.group(5)) |
| info["ua"], info["uaversion"] = parse_ua(match.group(8)) |
| info["fullua"] = "%s %s" % (info["ua"], info["uaversion"]) |
| @@ -377,42 +433,105 @@ def add_record(info, section, ignore_fie |
| value = info[field] |
| if field not in section: |
| section[field] = {} |
| if value not in section[field]: |
| section[field][value] = {} |
| add_record(info, section[field][value], ignore_fields + (field,)) |
| -def parse_stdin(geo, geov6, verbose): |
| +def parse_fileobj(mirror_name, fileobj, geo, geov6, ignored): |
| data = {} |
| - ignored = set() |
| - for line in sys.stdin: |
| + for line in fileobj: |
| info = parse_record(line, ignored, geo, geov6) |
| if info == None: |
| continue |
| + info["mirror"] = mirror_name |
| if info["month"] not in data: |
| data[info["month"]] = {} |
| section = data[info["month"]] |
| if info["file"] not in section: |
| section[info["file"]] = {} |
| section = section[info["file"]] |
| add_record(info, section) |
| + return data |
| - if verbose: |
| - print "Ignored files" |
| - print "=============" |
| - print "\n".join(sorted(ignored)) |
| - return data |
| +def merge_objects(object1, object2): |
| + for key, value in object2.iteritems(): |
| + key = unicode(key) |
| + if key in object1: |
| + if isinstance(value, numbers.Number): |
| + object1[key] += value |
| + else: |
| + merge_objects(object1[key], value) |
| + else: |
| + object1[key] = value |
| + |
| +def save_stats(server_type, data): |
| + base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.filename_encode(server_type)) |
| + for month, month_data in data.iteritems(): |
| + for name, file_data in month_data.iteritems(): |
| + path = os.path.join(base_dir, common.filename_encode(month), common.filename_encode(name + ".json")) |
| + if os.path.exists(path): |
| + with codecs.open(path, "rb", encoding="utf-8") as fileobj: |
| + existing = json.load(fileobj) |
| + else: |
| + existing = {} |
| + |
| + merge_objects(existing, file_data) |
| + |
| + dir = os.path.dirname(path) |
| + try: |
| + os.makedirs(dir) |
| + except OSError, e: |
| + if e.errno != errno.EEXIST: |
| + raise |
| + |
| + with codecs.open(path, "wb", encoding="utf-8") as fileobj: |
| + json.dump(existing, fileobj, indent=2, sort_keys=True) |
| + |
| +def parse_source((mirror_name, server_type, log_file)): |
| + try: |
| + geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACHE) |
| + geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_CACHE) |
| + |
| + ignored = set() |
| + fileobj = open_stats_file(log_file) |
| + try: |
| + data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) |
| + finally: |
| + fileobj.close() |
| + return server_type, log_file, data, ignored |
| + except: |
| + print >>sys.stderr, "Unable to process log file '%s'" % log_file |
| + traceback.print_exc() |
| + return None, None, None, None |
| + |
| +def parse_sources(sources, verbose): |
| + pool = multiprocessing.Pool() |
|
Sebastian Noack
2013/12/23 10:01:41
You should call pool.close(), when you are done.
|
| + for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1): |
| + if server_type == None: |
| + continue |
| + |
| + save_stats(server_type, data) |
| + if verbose: |
| + print "Ignored files for %s" % log_file |
| + print "============================================================" |
| + print "\n".join(sorted(ignored)) |
| if __name__ == "__main__": |
| setupStderr() |
| - verbose = (len(sys.argv) >= 2 and sys.argv[1] == "verbose") |
| - geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACHE) |
| - geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_CACHE) |
| - result = parse_stdin(geo, geov6, verbose) |
| + parser = argparse.ArgumentParser(description="Processes log files and merges them into the stats database") |
| + parser.add_argument("--verbose", dest="verbose", action="store_const", const=True, default=False, help="Verbose mode, ignored requests will be listed") |
| + parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to") |
| + parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription") |
| + parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL") |
| + args = parser.parse_args() |
| - with codecs.open(get_config().get("stats", "tempFile"), "wb", encoding="utf-8") as file: |
| - json.dump(result, file, indent=2, sort_keys=True) |
| + if args.mirror_name and args.server_type and args.log_file: |
| + sources = [(args.mirror_name, args.server_type, args.log_file)] |
| + else: |
| + sources = get_stats_files() |
| + parse_sources(sources, args.verbose) |