使用python多进程处理来自Xray core的日志,并分析ip次数以及去重
本次使用知识为进程数据隔离的解决方法(multiprocessing.Manager())和字符串处理方法(str.split())
进程池处理并使用Manager解决数据共享
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import multiprocessing
import time
import os
#处理文件的函数
def process_log(file_path,scan_dict):
#统计ip和访问次数
ip_set = set()
total_enter_into = 0
with open(file_path,'r',encoding='utf-8') as f:
for line in f.readlines():
ip = line.split(' ')[3].split(':')[0]
total_enter_into += 1
#判断ip是否统计过
if ip not in ip_set:
ip_set.add(ip)
else:
continue
#将统计数据作为字典放入字典中
scan_dict[file_path] = {'total_enter_into':total_enter_into,'ip_process':len(ip_set)}
return scan_dict
def run():
#开始处理进程
pool = ProcessPoolExecutor(max_workers=10)
#处理进程共享字典
with multiprocessing.Manager() as manager:
scan_dict = manager.dict()
for file_path in os.listdir('日志扫描'):
file_path_full = os.path.join('日志扫描',file_path)
pool.submit(process_log(file_path_full,scan_dict))
pool.shutdown(True)
#展示统计数据
for k,v in scan_dict.items():
print(k,v)
if __name__ == '__main__':
run()
进程池处理并使用闭包和函数回调解决进程数据传输问题
#处理文件的函数
def process_log(file_path,scan_dict):
#统计ip和访问次数
ip_set = set()
total_enter_into = 0
with open(file_path,'r',encoding='utf-8') as f:
for line in f.readlines():
ip = line.split(' ')[3].split(':')[0]
total_enter_into += 1
#判断ip是否统计过
if ip not in ip_set:
ip_set.add(ip)
else:
continue
#将统计数据作为字典放入字典中
scan_dict[file_path] = {'total_enter_into':total_enter_into,'ip_process':len(ip_set)}
time.sleep(0.1)
return scan_dict
#使用闭包解决进程数据不共享
def outer(info,file_path):
def done(res,*args,**kwargs):
info[file_path] = res.result()
return done
def run():
#开始处理进程
pool = ProcessPoolExecutor(max_workers=10)
#处理进程共享字典
# with multiprocessing.Manager() as manager:
# # scan_dict = manager.dict()
scan_dict = dict()
for file_path in os.listdir('日志扫描'):
file_path_full = os.path.join('日志扫描',file_path)
fur = pool.submit(process_log,file_path_full,scan_dict)
fur.add_done_callback(outer(scan_dict,file_path))
pool.shutdown(True)
#展示统计数据
for k,v in scan_dict.items():
print(k,v)
if __name__ == '__main__':
run()