Index: sitescripts/stats/bin/logprocessor.py |
=================================================================== |
--- a/sitescripts/stats/bin/logprocessor.py |
+++ b/sitescripts/stats/bin/logprocessor.py |
@@ -10,27 +10,81 @@ |
# Adblock Plus is distributed in the hope that it will be useful, |
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
# GNU General Public License for more details. |
# |
# You should have received a copy of the GNU General Public License |
# along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. |
-import os, sys, codecs, re, math, urllib, urlparse, socket, json |
+import argparse |
+import codecs |
+from collections import OrderedDict |
+from datetime import datetime, timedelta |
+import errno |
+import gzip |
+import json |
+import math |
+import multiprocessing |
+import numbers |
+import os |
+import re |
import pygeoip |
-from collections import OrderedDict |
+import socket |
+import subprocess |
+import sys |
+import traceback |
+import urllib |
+import urlparse |
+ |
import sitescripts.stats.common as common |
from sitescripts.utils import get_config, setupStderr |
-from datetime import datetime, timedelta |
log_regexp = None |
-mirror_name = None |
gecko_apps = None |
+def open_stats_file(path): |
+ parseresult = urlparse.urlparse(path) |
+ if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostname and parseresult.path: |
+ command = [ |
+ "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k", |
+ "-l", parseresult.username, |
+ parseresult.hostname, |
+ parseresult.path.lstrip("/") |
+ ] |
+ if parseresult.port: |
+ command[1:1] = ["-P", str(parseresult.port)] |
+ result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout |
+ elif parseresult.scheme in ("http", "https"): |
+ result = urllib.urlopen(path) |
+ elif os.path.exists(path): |
+ result = open(path, "rb") |
+ else: |
+ raise IOError("Path '%s' not recognized" % path) |
+ |
+ if path.endswith(".gz"): |
+ # Built-in gzip module doesn't support streaming (fixed in Python 3.2) |
+ result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.PIPE).stdout |
+ return result |
+ |
+def get_stats_files(): |
+ config = get_config() |
+ |
+ prefix = "mirror_" |
+ options = filter(lambda o: o.startswith(prefix), config.options("stats")) |
+ for option in options: |
+ if config.has_option("stats", option): |
+ value = config.get("stats", option) |
+ if " " in value: |
+ yield [option[len(prefix):]] + value.split(None, 1) |
+ else: |
+ print >>sys.stderr, "Option '%s' has invalid value: '%s'" % (option, value) |
+ else: |
+ print >>sys.stderr, "Option '%s' not found in the configuration" % option |
+ |
def cache_lru(func): |
""" |
Decorator that memoizes the return values of a single-parameter function in |
case it is called again with the same parameter. The 1024 most recent |
results are saved. |
""" |
results = OrderedDict() |
@@ -152,20 +206,25 @@ def parse_ua(ua): |
return "Other", "" |
def process_ip(ip, geo, geov6): |
match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip) |
if match: |
ip = match.group(1) |
- if ":" in ip: |
- country = geov6.country_code_by_addr(ip) |
- else: |
- country = geo.country_code_by_addr(ip) |
+ try: |
+ if ":" in ip: |
+ country = geov6.country_code_by_addr(ip) |
+ else: |
+ country = geo.country_code_by_addr(ip) |
+ except: |
+ traceback.print_exc() |
+ country = "" |
+ |
if country in (None, "", "--"): |
country = "unknown" |
country = country.lower() |
return ip, country |
@cache_last |
def parse_time(timestr, tz_hours, tz_minutes): |
@@ -300,32 +359,29 @@ def parse_chrome_query(query): |
applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion) |
return version, application, applicationVersion |
def parse_update_flag(query): |
return "update" if query == "update" else "install" |
def parse_record(line, ignored, geo, geov6): |
- global log_regexp, mirror_name |
+ global log_regexp |
if log_regexp == None: |
log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" "([^"]*)")?') |
- if mirror_name == None: |
- mirror_name = re.sub(r"\..*", "", socket.gethostname()) |
match = re.search(log_regexp, line) |
if not match: |
return None |
status = int(match.group(6)) |
if status != 200: |
return None |
info = { |
- "mirror": mirror_name, |
"size": int(match.group(7)), |
} |
info["ip"], info["country"] = process_ip(match.group(1), geo, geov6) |
info["time"], info["month"], info["day"], info["weekday"], info["hour"] = parse_time(match.group(2), int(match.group(3)), int(match.group(4))) |
info["file"], info["query"] = parse_path(match.group(5)) |
info["ua"], info["uaversion"] = parse_ua(match.group(8)) |
info["fullua"] = "%s %s" % (info["ua"], info["uaversion"]) |
@@ -377,42 +433,109 @@ def add_record(info, section, ignore_fie |
value = info[field] |
if field not in section: |
section[field] = {} |
if value not in section[field]: |
section[field][value] = {} |
add_record(info, section[field][value], ignore_fields + (field,)) |
-def parse_stdin(geo, geov6, verbose): |
+def parse_fileobj(mirror_name, fileobj, geo, geov6, ignored): |
data = {} |
- ignored = set() |
- for line in sys.stdin: |
+ for line in fileobj: |
info = parse_record(line, ignored, geo, geov6) |
if info == None: |
continue |
+ info["mirror"] = mirror_name |
if info["month"] not in data: |
data[info["month"]] = {} |
section = data[info["month"]] |
if info["file"] not in section: |
section[info["file"]] = {} |
section = section[info["file"]] |
add_record(info, section) |
+ return data |
- if verbose: |
- print "Ignored files" |
- print "=============" |
- print "\n".join(sorted(ignored)) |
- return data |
+def merge_objects(object1, object2, factor=1): |
+ for key, value in object2.iteritems(): |
+ try: |
+ key = unicode(key) |
+ except UnicodeDecodeError: |
+ key = unicode(key, encoding="latin-1") |
+ if isinstance(value, numbers.Number): |
+ object1[key] = object1.get(key, 0) + factor * value |
+ else: |
+ merge_objects(object1.setdefault(key, {}), value, factor) |
+ |
+def save_stats(server_type, data, factor=1): |
+ base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.filename_encode(server_type)) |
+ for month, month_data in data.iteritems(): |
+ for name, file_data in month_data.iteritems(): |
+ path = os.path.join(base_dir, common.filename_encode(month), common.filename_encode(name + ".json")) |
+ if os.path.exists(path): |
+ with codecs.open(path, "rb", encoding="utf-8") as fileobj: |
+ existing = json.load(fileobj) |
+ else: |
+ existing = {} |
+ |
+ merge_objects(existing, file_data, factor) |
+ |
+ dir = os.path.dirname(path) |
+ try: |
+ os.makedirs(dir) |
+ except OSError, e: |
+ if e.errno != errno.EEXIST: |
+ raise |
+ |
+ with codecs.open(path, "wb", encoding="utf-8") as fileobj: |
+ json.dump(existing, fileobj, indent=2, sort_keys=True) |
+ |
+def parse_source((mirror_name, server_type, log_file)): |
+ try: |
+ geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACHE) |
+ geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_CACHE) |
+ |
+ ignored = set() |
+ fileobj = open_stats_file(log_file) |
+ try: |
+ data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) |
+ finally: |
+ fileobj.close() |
+ return server_type, log_file, data, ignored |
+ except: |
+ print >>sys.stderr, "Unable to process log file '%s'" % log_file |
+ traceback.print_exc() |
+ return None, None, None, None |
+ |
+def parse_sources(sources, factor=1, verbose=False): |
+ pool = multiprocessing.Pool() |
+ try: |
+ for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1): |
+ if server_type == None: |
+ continue |
+ |
+ save_stats(server_type, data, factor) |
+ if verbose: |
+ print "Ignored files for %s" % log_file |
+ print "============================================================" |
+ print "\n".join(sorted(ignored)) |
+ finally: |
+ pool.close() |
if __name__ == "__main__": |
setupStderr() |
- verbose = (len(sys.argv) >= 2 and sys.argv[1] == "verbose") |
- geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACHE) |
- geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_CACHE) |
- result = parse_stdin(geo, geov6, verbose) |
+ parser = argparse.ArgumentParser(description="Processes log files and merges them into the stats database") |
+ parser.add_argument("--verbose", dest="verbose", action="store_const", const=True, default=False, help="Verbose mode, ignored requests will be listed") |
+ parser.add_argument("--revert", dest="factor", action="store_const", const=-1, default=1, help="Remove log data from the database") |
+ parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to") |
+ parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription") |
+ parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL") |
+ args = parser.parse_args() |
- with codecs.open(get_config().get("stats", "tempFile"), "wb", encoding="utf-8") as file: |
- json.dump(result, file, indent=2, sort_keys=True) |
+ if args.mirror_name and args.server_type and args.log_file: |
+ sources = [(args.mirror_name, args.server_type, args.log_file)] |
+ else: |
+ sources = get_stats_files() |
+ parse_sources(sources, args.factor, args.verbose) |