使用python多进程处理来自Xray core的日志

· Special

使用python多进程处理来自Xray core的日志,并分析ip次数以及去重
本次使用知识为进程数据隔离的解决方法(multiprocessing.Manager())和字符串处理方法(str.split())

进程池处理并使用Manager解决数据共享

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import multiprocessing
import time
import os



#处理文件的函数
def process_log(file_path,scan_dict):
    #统计ip和访问次数
    ip_set = set()
    total_enter_into = 0
    with open(file_path,'r',encoding='utf-8') as f:
        for line in f.readlines():
            ip = line.split(' ')[3].split(':')[0]
            total_enter_into += 1
            #判断ip是否统计过
            if ip not in ip_set:
                ip_set.add(ip)
            else:
                continue

    #将统计数据作为字典放入字典中
    scan_dict[file_path] = {'total_enter_into':total_enter_into,'ip_process':len(ip_set)}
    return scan_dict


def run():
    #开始处理进程
    pool = ProcessPoolExecutor(max_workers=10)
    #处理进程共享字典
    with multiprocessing.Manager() as manager:
        scan_dict = manager.dict()

        for file_path in os.listdir('日志扫描'):
            file_path_full = os.path.join('日志扫描',file_path)
            pool.submit(process_log(file_path_full,scan_dict))

        pool.shutdown(True)

        #展示统计数据
        for k,v in scan_dict.items():
            print(k,v)

if __name__ == '__main__':
    run()


进程池处理并使用闭包和函数回调解决进程数据传输问题

#处理文件的函数
def process_log(file_path,scan_dict):
    #统计ip和访问次数
    ip_set = set()
    total_enter_into = 0
    with open(file_path,'r',encoding='utf-8') as f:
        for line in f.readlines():
            ip = line.split(' ')[3].split(':')[0]
            total_enter_into += 1
            #判断ip是否统计过
            if ip not in ip_set:
                ip_set.add(ip)
            else:
                continue

    #将统计数据作为字典放入字典中
    scan_dict[file_path] = {'total_enter_into':total_enter_into,'ip_process':len(ip_set)}
    time.sleep(0.1)
    return scan_dict



#使用闭包解决进程数据不共享
def outer(info,file_path):
    def done(res,*args,**kwargs):
       info[file_path] = res.result()
    return done


def run():
    #开始处理进程
    pool = ProcessPoolExecutor(max_workers=10)
    #处理进程共享字典
    # with multiprocessing.Manager() as manager:
    #     # scan_dict = manager.dict()
    scan_dict = dict()

    for file_path in os.listdir('日志扫描'):
        file_path_full = os.path.join('日志扫描',file_path)
        fur = pool.submit(process_log,file_path_full,scan_dict)
        fur.add_done_callback(outer(scan_dict,file_path))

    pool.shutdown(True)

    #展示统计数据
    for k,v in scan_dict.items():
        print(k,v)




if __name__ == '__main__':
    run()


python


评论

行为验证™ 安全组件加载中...