Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Unified Diff: sitescripts/stats/bin/logprocessor.py

Issue 5912761519308800: Stats processing: don`t send data back to the main process, save it directly in the worker process (Closed)
Patch Set: Created Jan. 31, 2014, 12:46 p.m.
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sitescripts/stats/bin/logprocessor.py
===================================================================
--- a/sitescripts/stats/bin/logprocessor.py
+++ b/sitescripts/stats/bin/logprocessor.py
@@ -15,16 +15,17 @@
# You should have received a copy of the GNU General Public License
# along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
import argparse
import codecs
from collections import OrderedDict
from datetime import datetime, timedelta
import errno
+import functools
import gzip
import json
import math
import multiprocessing
import numbers
import os
import re
import pygeoip
@@ -501,42 +502,46 @@ def save_stats(server_type, data, factor
os.makedirs(dir)
except OSError, e:
if e.errno != errno.EEXIST:
raise
with codecs.open(path, "wb", encoding="utf-8") as fileobj:
json.dump(existing, fileobj, indent=2, sort_keys=True)
-def parse_source((mirror_name, server_type, log_file)):
+def parse_source(factor, lock, (mirror_name, server_type, log_file)):
try:
geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACHE)
geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_CACHE)
ignored = set()
fileobj = open_stats_file(log_file)
try:
data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
finally:
fileobj.close()
- return server_type, log_file, data, ignored
+
+ lock.acquire()
+ try:
+ save_stats(server_type, data, factor)
+ finally:
+ lock.release()
+ return log_file, ignored
except:
print >>sys.stderr, "Unable to process log file '%s'" % log_file
traceback.print_exc()
return None, None, None, None
def parse_sources(sources, factor=1, verbose=False):
pool = multiprocessing.Pool()
+ lock = multiprocessing.Manager().Lock()
+ callback = functools.partial(parse_source, factor, lock)
try:
- for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1):
- if server_type == None:
- continue
-
- save_stats(server_type, data, factor)
- if verbose:
+ for log_file, ignored in pool.imap_unordered(callback, sources, chunksize=1):
+ if verbose and ignored:
print "Ignored files for %s" % log_file
print "============================================================"
print "\n".join(sorted(ignored))
finally:
pool.close()
if __name__ == "__main__":
setupStderr()
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld