Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Unified Diff: sitescripts/stats/bin/logprocessor.py

Issue 5182947690807296: Centralize stats processing, have the stats server pull in logs (Closed)
Patch Set: Fixed comment and processing of non-Unicode keys Created Dec. 26, 2013, 2:09 p.m.
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sitescripts/stats/bin/logprocessor.py
===================================================================
--- a/sitescripts/stats/bin/logprocessor.py
+++ b/sitescripts/stats/bin/logprocessor.py
@@ -10,27 +10,81 @@
# Adblock Plus is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
-import os, sys, codecs, re, math, urllib, urlparse, socket, json
+import argparse
+import codecs
+from collections import OrderedDict
+from datetime import datetime, timedelta
+import errno
+import gzip
+import json
+import math
+import multiprocessing
+import numbers
+import os
+import re
import pygeoip
-from collections import OrderedDict
+import socket
+import subprocess
+import sys
+import traceback
+import urllib
+import urlparse
+
import sitescripts.stats.common as common
from sitescripts.utils import get_config, setupStderr
-from datetime import datetime, timedelta
log_regexp = None
-mirror_name = None
gecko_apps = None
+def open_stats_file(path):
+ parseresult = urlparse.urlparse(path)
+ if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostname and parseresult.path:
+ command = [
+ "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k",
+ "-l", parseresult.username,
+ parseresult.hostname,
+ parseresult.path.lstrip("/")
+ ]
+ if parseresult.port:
+ command[1:1] = ["-P", str(parseresult.port)]
+ result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout
+ elif parseresult.scheme in ("http", "https"):
+ result = urllib.urlopen(path)
+ elif os.path.exists(path):
+ result = open(path, "rb")
+ else:
+ raise IOError("Path '%s' not recognized" % path)
+
+ if path.endswith(".gz"):
+ # Built-in gzip module doesn't support streaming (fixed in Python 3.2)
+ result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.PIPE).stdout
+ return result
+
+def get_stats_files():
+ config = get_config()
+
+ prefix = "mirror_"
+ options = filter(lambda o: o.startswith(prefix), config.options("stats"))
+ for option in options:
+ if config.has_option("stats", option):
+ value = config.get("stats", option)
+ if " " in value:
+ yield [option[len(prefix):]] + value.split(None, 1)
+ else:
+ print >>sys.stderr, "Option '%s' has invalid value: '%s'" % (option, value)
+ else:
+ print >>sys.stderr, "Option '%s' not found in the configuration" % option
+
def cache_lru(func):
"""
Decorator that memoizes the return values of a single-parameter function in
case it is called again with the same parameter. The 1024 most recent
results are saved.
"""
results = OrderedDict()
@@ -152,20 +206,25 @@ def parse_ua(ua):
return "Other", ""
def process_ip(ip, geo, geov6):
match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip)
if match:
ip = match.group(1)
- if ":" in ip:
- country = geov6.country_code_by_addr(ip)
- else:
- country = geo.country_code_by_addr(ip)
+ try:
+ if ":" in ip:
+ country = geov6.country_code_by_addr(ip)
+ else:
+ country = geo.country_code_by_addr(ip)
+ except:
+ traceback.print_exc()
+ country = ""
+
if country in (None, "", "--"):
country = "unknown"
country = country.lower()
return ip, country
@cache_last
def parse_time(timestr, tz_hours, tz_minutes):
@@ -300,32 +359,29 @@ def parse_chrome_query(query):
applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion)
return version, application, applicationVersion
def parse_update_flag(query):
return "update" if query == "update" else "install"
def parse_record(line, ignored, geo, geov6):
- global log_regexp, mirror_name
+ global log_regexp
if log_regexp == None:
log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" "([^"]*)")?')
- if mirror_name == None:
- mirror_name = re.sub(r"\..*", "", socket.gethostname())
match = re.search(log_regexp, line)
if not match:
return None
status = int(match.group(6))
if status != 200:
return None
info = {
- "mirror": mirror_name,
"size": int(match.group(7)),
}
info["ip"], info["country"] = process_ip(match.group(1), geo, geov6)
info["time"], info["month"], info["day"], info["weekday"], info["hour"] = parse_time(match.group(2), int(match.group(3)), int(match.group(4)))
info["file"], info["query"] = parse_path(match.group(5))
info["ua"], info["uaversion"] = parse_ua(match.group(8))
info["fullua"] = "%s %s" % (info["ua"], info["uaversion"])
@@ -377,42 +433,109 @@ def add_record(info, section, ignore_fie
value = info[field]
if field not in section:
section[field] = {}
if value not in section[field]:
section[field][value] = {}
add_record(info, section[field][value], ignore_fields + (field,))
-def parse_stdin(geo, geov6, verbose):
+def parse_fileobj(mirror_name, fileobj, geo, geov6, ignored):
data = {}
- ignored = set()
- for line in sys.stdin:
+ for line in fileobj:
info = parse_record(line, ignored, geo, geov6)
if info == None:
continue
+ info["mirror"] = mirror_name
if info["month"] not in data:
data[info["month"]] = {}
section = data[info["month"]]
if info["file"] not in section:
section[info["file"]] = {}
section = section[info["file"]]
add_record(info, section)
+ return data
- if verbose:
- print "Ignored files"
- print "============="
- print "\n".join(sorted(ignored))
- return data
+def merge_objects(object1, object2, factor=1):
+ for key, value in object2.iteritems():
+ try:
+ key = unicode(key)
+ except UnicodeDecodeError:
+ key = unicode(key, encoding="latin-1")
+ if isinstance(value, numbers.Number):
+ object1[key] = object1.get(key, 0) + factor * value
+ else:
+ merge_objects(object1.setdefault(key, {}), value, factor)
+
+def save_stats(server_type, data, factor=1):
+ base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.filename_encode(server_type))
+ for month, month_data in data.iteritems():
+ for name, file_data in month_data.iteritems():
+ path = os.path.join(base_dir, common.filename_encode(month), common.filename_encode(name + ".json"))
+ if os.path.exists(path):
+ with codecs.open(path, "rb", encoding="utf-8") as fileobj:
+ existing = json.load(fileobj)
+ else:
+ existing = {}
+
+ merge_objects(existing, file_data, factor)
+
+ dir = os.path.dirname(path)
+ try:
+ os.makedirs(dir)
+ except OSError, e:
+ if e.errno != errno.EEXIST:
+ raise
+
+ with codecs.open(path, "wb", encoding="utf-8") as fileobj:
+ json.dump(existing, fileobj, indent=2, sort_keys=True)
+
+def parse_source((mirror_name, server_type, log_file)):
+ try:
+ geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACHE)
+ geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_CACHE)
+
+ ignored = set()
+ fileobj = open_stats_file(log_file)
+ try:
+ data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
+ finally:
+ fileobj.close()
+ return server_type, log_file, data, ignored
+ except:
+ print >>sys.stderr, "Unable to process log file '%s'" % log_file
+ traceback.print_exc()
+ return None, None, None, None
+
+def parse_sources(sources, factor=1, verbose=False):
+ pool = multiprocessing.Pool()
+ try:
+ for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1):
+ if server_type == None:
+ continue
+
+ save_stats(server_type, data, factor)
+ if verbose:
+ print "Ignored files for %s" % log_file
+ print "============================================================"
+ print "\n".join(sorted(ignored))
+ finally:
+ pool.close()
if __name__ == "__main__":
setupStderr()
- verbose = (len(sys.argv) >= 2 and sys.argv[1] == "verbose")
- geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACHE)
- geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_CACHE)
- result = parse_stdin(geo, geov6, verbose)
+ parser = argparse.ArgumentParser(description="Processes log files and merges them into the stats database")
+ parser.add_argument("--verbose", dest="verbose", action="store_const", const=True, default=False, help="Verbose mode, ignored requests will be listed")
+ parser.add_argument("--revert", dest="factor", action="store_const", const=-1, default=1, help="Remove log data from the database")
+ parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
+ parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
+ parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
+ args = parser.parse_args()
- with codecs.open(get_config().get("stats", "tempFile"), "wb", encoding="utf-8") as file:
- json.dump(result, file, indent=2, sort_keys=True)
+ if args.mirror_name and args.server_type and args.log_file:
+ sources = [(args.mirror_name, args.server_type, args.log_file)]
+ else:
+ sources = get_stats_files()
+ parse_sources(sources, args.factor, args.verbose)
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld