Index: sitescripts/stats/bin/logprocessor.py |
=================================================================== |
--- a/sitescripts/stats/bin/logprocessor.py |
+++ b/sitescripts/stats/bin/logprocessor.py |
@@ -15,16 +15,17 @@ |
# You should have received a copy of the GNU General Public License |
# along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. |
import argparse |
import codecs |
from collections import OrderedDict |
from datetime import datetime, timedelta |
import errno |
+import functools |
import gzip |
import json |
import math |
import multiprocessing |
import numbers |
import os |
import re |
import pygeoip |
@@ -501,42 +502,46 @@ def save_stats(server_type, data, factor |
os.makedirs(dir) |
except OSError, e: |
if e.errno != errno.EEXIST: |
raise |
with codecs.open(path, "wb", encoding="utf-8") as fileobj: |
json.dump(existing, fileobj, indent=2, sort_keys=True) |
-def parse_source((mirror_name, server_type, log_file)): |
+def parse_source(factor, lock, (mirror_name, server_type, log_file)): |
try: |
geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACHE) |
geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_CACHE) |
ignored = set() |
fileobj = open_stats_file(log_file) |
try: |
data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) |
finally: |
fileobj.close() |
- return server_type, log_file, data, ignored |
+ |
+ lock.acquire() |
+ try: |
+ save_stats(server_type, data, factor) |
+ finally: |
+ lock.release() |
+ return log_file, ignored |
except: |
print >>sys.stderr, "Unable to process log file '%s'" % log_file |
traceback.print_exc() |
return None, None, None, None |
def parse_sources(sources, factor=1, verbose=False): |
pool = multiprocessing.Pool() |
+ lock = multiprocessing.Manager().Lock() |
+ callback = functools.partial(parse_source, factor, lock) |
try: |
- for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1): |
- if server_type == None: |
- continue |
- |
- save_stats(server_type, data, factor) |
- if verbose: |
+ for log_file, ignored in pool.imap_unordered(callback, sources, chunksize=1): |
+ if verbose and ignored: |
print "Ignored files for %s" % log_file |
print "============================================================" |
print "\n".join(sorted(ignored)) |
finally: |
pool.close() |
if __name__ == "__main__": |
setupStderr() |