日志分析项目

日志分析

123456

# 日志分析完整代码

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path

# 数据源
PATTERN = '''(?P<remote>[\d\.]{7,})\s-\s-\s-[(?P<datetime>[^\[\]]+)]\s\
"(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s\
(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''

regex = re.compile(PATTERN)  # 编译
from user_agents import parse

ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr , '%d/%b/%Y:%H:%M:%S %z') ,
    'status': int ,
    'size': int ,
    'useragent': lambda ua: parse(ua)
}


def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {name: ops.get(name , lambda x: x) for name , data in matcher.groupdict().items()}


# 装载文件
def openfile(path: str):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失败则抛弃或记录日志


def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
                elif p.is_file():
                    yield from openfile(str(p))


# 数据处理
def source(second=1):
    """生成数据"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))) ,
            'value': random.randint(1 , 100)
        }
        time.sleep(second)


# 滑动窗口函数
def window(src: Queue , handler , width: int , interval: int):
    """
    窗口函数

    :param src: 数据源,缓存队列,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒
    :param interval: 处理时间间隔,秒
    """

    start = datetime.datetime.strptime('20170101 000000 +0800' , '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800' , '%Y%m%d %H%M%S %z')
    buffer = []  # 窗口中的待计算数据
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 从数据源获取数据
        data = src.get()
        if data:
            buffer.append(data)  # 存入临时缓冲等待计算
            current = data['datetime']

        # 每隔interval计算buffer中的数据一次
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{}'.format(ret))
            start = current

            # 清除超出width的数据
            buffer = [x for x in buffer if x['datetime'] > current - delta]


# 随机数平均数测试函数
def handler(iterable):
    return sum(map(lambda x: x['value'] , iterable)) / len(iterable)


# 测试函数
def donothing_handler(iterable):
    return iterable


# 状态码占比
def status_handler(iterable):
    # 时间窗口内的一批数据
    status = {}
    for item in iterable:
        key = item['status']
        status[key] = status.get(key , 0) + 1
    # total = sum(status.values())
    total = len(iterable)
    return {k: status[k] / total for k , v in status.items()}


allbrowsers = {}


# 浏览器分析
def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']

        key = (ua.browser.family , ua.browser.version_string)
        browsers[key] = browsers.get(key , 0) + 1
        allbrowsers[key] = allbrowsers.get(key , 0) + 1

    print(sorted(allbrowsers.items() , key=lambda x: x[1] , reverse=True)[:10])
    return browsers


# 分发器
def dispatcher(src):
    # 分发器中记录handler, 同时保存各自的队列
    handlers = []
    queues = []

    def reg(handler , width: int , interval: int):
        """
        注册窗口处理函数

        :param handler: 注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        """
        q = Queue()
        queues.append(q)

        h = threading.Thread(target=window , args=(q , handler , width , interval))
        handlers.append(h)

    def run():
        for t in handlers:
            t.start()  # 启动线程处理数据

        for item in src:  # 数据源取到的数据分发到所有队列中
            for q in queues:
                q.put(item)

    return reg , run


if __name__ == "__main__":
    import sys

    # path = sys.argv[1]
    path = 'test.log'

    reg , run = dispatcher(load(path))
    reg(status_handler , 10 , 5)  # 注册
    reg(browser_handler , 5 , 5)
    run()  # 运行

本文来自投稿,不代表Linux运维部落立场,如若转载,请注明出处:http://www.178linux.com/97752

(2)
上一篇 2018-05-07 21:01
下一篇 2018-05-07 21:31

相关推荐

  • Python文件操作

    计算机体系架构 运算器:完成各种算术运算、逻辑运算、出具传输等数据加工处理 控制器:控制程序的执行 CPU = 运算器 + 控制器 存储器:用于记忆程序的数据,内存 输入设备:将数据或程序输入到计算机中 输出设备:将数据或程序的处理结果展示给用户 文件IO常用操作 open 打开 read 读取 write 写入 close 关闭 readline 行读取 …

    Python笔记 2018-05-02
  • Python 部分知识点总结(三)

    此篇博客只是记录第五周未掌握或不熟悉的知识点,用来加深印象。

    2018-04-08
  • 树 非线性结构,每个元素可有多个前驱和后继 树是n(n>=0)个元素的集合,n=0时,称为空树,树只有一个特殊的没有前驱的元素,称为树的根root,树中除了根结点外,其余元素只能有一个前驱,可以有零个和多个后继,子树也有自己的根 结点:树中的数据元素 结点的度degree:结点拥有的子树的数目称为度,记作d(v)。树的度是树内各结点的度最大值 叶子结点…

    2018-04-16
  • 文件操作

    文件操作 冯诺依曼体系架构 CPU由运算器和控制器组成 运算器,完成各种算数的运算,逻辑运算,数据传输等数据加工处理 控制器,控制程序的执行 存储器,用于记忆程序的数据,列如内存 输入设备,将数据或者程序输入到计算机中列如键盘 鼠标 输出设备,将数据或者程序的处理结果展示给用户,列如显示器,打印机等等   一般说的IO操作,指的是文件的IO,如果是指网络的I…

    Python笔记 2018-05-02
  • ss

    sas

    Python笔记 2018-05-10
  • 函数执行过程和递归函数练习题

    函数执行过程和递归函数练习题

    2018-04-16