| Index: sitescripts/stats/bin/logprocessor.py |
| =================================================================== |
| --- a/sitescripts/stats/bin/logprocessor.py |
| +++ b/sitescripts/stats/bin/logprocessor.py |
| @@ -15,16 +15,17 @@ |
| # You should have received a copy of the GNU General Public License |
| # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. |
| import argparse |
| import codecs |
| from collections import OrderedDict |
| from datetime import datetime, timedelta |
| import errno |
| +import functools |
| import gzip |
| import json |
| import math |
| import multiprocessing |
| import numbers |
| import os |
| import re |
| import pygeoip |
| @@ -501,42 +502,46 @@ def save_stats(server_type, data, factor |
| os.makedirs(dir) |
| except OSError, e: |
| if e.errno != errno.EEXIST: |
| raise |
| with codecs.open(path, "wb", encoding="utf-8") as fileobj: |
| json.dump(existing, fileobj, indent=2, sort_keys=True) |
| -def parse_source((mirror_name, server_type, log_file)): |
| +def parse_source(factor, lock, (mirror_name, server_type, log_file)): |
| try: |
| geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACHE) |
| geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_CACHE) |
| ignored = set() |
| fileobj = open_stats_file(log_file) |
| try: |
| data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored) |
| finally: |
| fileobj.close() |
| - return server_type, log_file, data, ignored |
| + |
| + lock.acquire() |
| + try: |
| + save_stats(server_type, data, factor) |
| + finally: |
| + lock.release() |
| + return log_file, ignored |
| except: |
| print >>sys.stderr, "Unable to process log file '%s'" % log_file |
| traceback.print_exc() |
| return None, None, None, None |
| def parse_sources(sources, factor=1, verbose=False): |
| pool = multiprocessing.Pool() |
| + lock = multiprocessing.Manager().Lock() |
| + callback = functools.partial(parse_source, factor, lock) |
| try: |
| - for server_type, log_file, data, ignored in pool.imap(parse_source, sources, chunksize=1): |
| - if server_type == None: |
| - continue |
| - |
| - save_stats(server_type, data, factor) |
| - if verbose: |
| + for log_file, ignored in pool.imap_unordered(callback, sources, chunksize=1): |
| + if verbose and ignored: |
| print "Ignored files for %s" % log_file |
| print "============================================================" |
| print "\n".join(sorted(ignored)) |
| finally: |
| pool.close() |
| if __name__ == "__main__": |
| setupStderr() |