python学习总结

第一个项目日志分析。(存在不足)

目录

一、日志分析项目 1

1、概述 1

2、 分析的前提 1

1) 半结构化数据 1

2)结构化数据 1

3)结构化数据 1

3、 文本分析 1

4、 提取数据 2

1) 按照空格分隔 2

5、 类型转换 3

1)时间转换 3

2) 状态码和字节数 4

3) 请求信息的解析 4

4) 映射 4

6、 正则表达式提取: 6

7、 异常处理: 7

8、 异常处理: 8

1) 数据载入 8

9、 时间窗口分析: 9

1) 概念 9

2) 当width > interval 10

3) 当width =  interval 10

4) 当width < interval 11

5) 时序数据 11

6) 数据分析基本程序结构 11

7) 窗口函数实现 12

10、 分发: 14

1) 生产者消费者模型 14

2) queue模块–队列 14

11、 分发器实现: 16

12、 分发器实现代码: 17

13、 整合代码 19

14、 完成分析功能 22

15、 状态码分析 22

16、 日志文件的加载 23

17、 完整代码 23

18、 浏览器分析 27

19、 完整版代码(最终版) 30

一、日志分析项目

1、概述

生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载,健康情况,可以分析客服的分部情况、客户行为,还可以做出预测等。

 

一般采集流程

日志产出——》采集(logstash、flume、scribe)——》存储——》分析——》存储(数据库、NoSQL)——》可视化

 

开源实时日志分析ELK平台

Logstash收集日志,并存放到ElasticSearch集群中,Kibana则是从ES集群中查询数据生成图表,返回到浏览器的端。

2、分析的前提

1)半结构化数据

日志是半结构化数据,是有组织的,有格式的数据,可以分割成行和列,就可以当做表理解和处理,当然也可以分析里面的数据。

2)结构化数据

结构化数据就是数据库里面的数据,定义行列还可以定义数据的类型。Html。

3)结构化数据

非结构化数据,音频和视频等。

3、文本分析

日志是文本文件,需要依赖文件IO、字符串操作、正则表达式技术。

通过这些就可以将日志中的数据提取出来。

183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)”

 

里面的数据对后期的分析都是必须的。

4、提取数据

  • 按照空格分隔

 

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

for word in line.split():
print(word)

 

 

切割情况:

183.60.212.153

[19/Feb/2013:10:23:29

+0800]

“GET

/o2o/media.html?menu=3

HTTP/1.1”

200

16691

“-”

“Mozilla/5.0

(compatible;

EasouSpider;

+http://www.easou.com/search/spider.html)”

 

缺点:没有按照要求的格式分隔好,所需要的数据多都是按照空格分隔开了。所以,定义的时候不选用在文件中出现的字符就可以省下好多事。

改进:依旧按照空格分隔,但是遇到双引号、中括号特殊处理一下。

先按照空格切分,然后迭代一个个字符,如果发现是[ 或者” ,则就不判断是是否是空格,直到发现] 或者”结尾等,这个区间获取的就是时间等数据。

 

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
chars = set(”  \t”)

def makekey(line:str):
start = 0
skip =False
for i,c in enumerate(line):
if not skip and c in ‘”[‘ :   #遇到” 或者[
start = i + 1
skip = True
elif skip and c in ‘”]’:    #遇到 ” 或者]
skip = False
yield line[start:i]
start = i + 1
continue

if skip:
continue

if c in chars:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1

else:
if start < len(line):
yield line[start:]

print(list(makekey(line)))

 

 

[‘183.60.212.153’, ‘-‘, ‘-‘, ’19/Feb/2013:10:23:29 +0800’, ‘GET /o2o/media.html?menu=3 HTTP/1.1’, ‘200’, ‘16691’, ‘-‘, ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’]

 

 

5、类型转换

文件中的数据是有类型的,例如时间、状态吗,对不同的文件进行不同的类型转换.自定义转换等。

1)时间转换

19/Feb/2013:10:23:29 +0800  对应的格式是

%d/%b/%Y:%H:%M:%S  %z

使用的函数应该是datetime类中的strptime方法。

import datetime
timestr = ’19/Feb/2013:10:23:29 +0800′
def conver_time(timestr):
return datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’)
print(conver_time(timestr))

 

转换结果:

2013-02-19 10:23:29+08:00

利用lanbda可以转换为一行的函数。

lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’)

 

2)状态码和字节数

都是int整型,使用int函数进行转换。

3)请求信息的解析

‘GET /o2o/media.html?menu=3 HTTP/1.1′

request = ‘GET /o2o/media.html?menu=3 HTTP/1.1’
def get_request(request:str):
return dict(zip([‘method’,’url’,’protocol’],request.split()))

 

lambda request:dict(zip([‘method’,’url’,’protocol’],request.split()))

 

利用zip函数组建字典,三项利用split()空格进行分割。

输出的结果:

{‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘protocol’: ‘HTTP/1.1′}

4)映射

对每一个字段进行命名,然后与值和类型转换的方法对应,解析每一行是必须要有顺序的。

import datetime
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
chars = set(”  \t”)

def makekey(line:str):
start = 0
skip =False
for i,c in enumerate(line):
if not skip and c in ‘”[‘ :   #遇到” 或者[
start = i + 1
skip = True
elif skip and c in ‘”]’:    #遇到 ” 或者]
skip = False
yield line[start:i]
start = i + 1
continue

if skip:
continue

if c in chars:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1

else:
if start < len(line):
yield line[start:]

# print(list(makekey(line)))
names = (‘remote’,”,”,’dateime’,’request’,’status’,’size’,”,’useragent’)
ops = (None,None,None,
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
int,int,None,None
)
def extract(line:str):
return dict(map(lambda item:(item[0],item[2](item[1])if item[2] is not None else item[1]),zip(names,makekey(line),ops)))
print(extract(line))

 

{‘remote’: ‘183.60.212.153’, ”: ‘-‘, ‘dateime’: datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), ‘request’: {‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘protocol’: ‘HTTP/1.1’}, ‘status’: 200, ‘size’: 16691, ‘useragent’: ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’}

names = (‘remote’,”,”,’dateime’,’request’,’status’,’size’,”,’useragent’)
ops = (None,None,None,
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
int,int,None,None
)
def extract(line:str):
return dict(map(lambda item:(item[0],item[2](item[1])if item[2] is not None else item[1]),zip(names,makekey(line),ops)))
print(extract(line))

 

6、正则表达式提取:

1)构造一个正则表达式提取需要的字段,

pattern = ”'([\d.]{7,}) –  – \[([/\w +:]+)\] “(\w+) (\S+) ([\w/\d.]+)” (\d+)(\d+).+”(.+)” ”’

2)进一步改造pattern分组,ops和名词对象,不需要names了。

pattern = ”'(?P<remote>[\d.]{7,}) –  – \[(?P<datetime>[/\w +:]+)\] “(?P<method>\w+) (?P<url>\S+) (?P<procotol>[\w/\d.]+)” (?P<status>\d+)(?P<size>\d+).+”(?P<useragent>.+)” ”’

命名分组:

ops = (
‘dateime’:lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
‘request’:lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’:int,
‘size’:int
)

3)完整代码:

import datetime
import re

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

# mathcer = re.match(pattern,line)
# if mathcer:
#     print(mathcer.groupdict())
regex = re.compile(pattern)

def extract(line:str):
matcher = regex.match(line)
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

 

{‘remote’: ‘183.60.212.153’, ‘datetime’: datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), ‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘procotol’: ‘HTTP/1.1’, ‘status’: 200, ‘size’: 16691, ‘useragent’: ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’}

7、异常处理:

 

import datetime
import re

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

# mathcer = re.match(pattern,line)
# if mathcer:
#     print(mathcer.groupdict())
regex = re.compile(pattern)

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

 

日志中出现一些不匹配的行,需要处理。

regex.match()可能匹配不上,所以增加一个判断,采用抛出异常等形式。或者返回一个特殊值得方式,告知调用者没有匹配。

8、异常处理:

1) 数据载入

对于本项目,数据就是日志的一行行记录,载入数据就是文件IO的读取,将或者数据的方法封装成函数。

import datetime
import re

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

regex = re.compile(pattern)

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

def load(path):
“””装载日志文件”””
    with open(path)as f:
for line in f:
filds = extract(line)
if fields:
yield fields
else:
continue
 

9、时间窗口分析:

1) 概念

许多数据,例如日志,都是和时间相关的,都是按照时间顺序产生的。

产生的数据分析的时候,要按照时间求值。

 

Interval表示每一次求值的时间间隔。

Width时间窗口的宽度,指的是一次求值的时间窗口宽度。

2) 当width > interval

 

 

数据求值的时候会有重叠

 

3) 当width =  interval

 

4) 当width < interval

一般不采纳这种方案,会有数据缺失。

 

5) 时序数据

运维环境中,日志、监控等产生的数据都是与时间相关的数据,按照时间的先后产生并记录下来数据,所以一般按照时间对数据进行分析。

 

6) 数据分析基本程序结构

无限的生成随机函数,产生时间相关的数据,返回时间和随机数的字典。

每次取3个数据,求平均值。

import random
import datetime
import time

def source():
while True:
yield {‘value’:random.randint(1,100),’datetime’:datetime.datetime.now()}
time.sleep(1)
#获取数据
s = source()
items = [next(s)for _ in range(3)]

#处理函数
def handler(iterable):
return sum(map(lambda item:item[‘value’],iterable)) / len(iterable)

print(items)
print(“{:.2f}”.format(handler(items)))

 

[{‘value’: 87, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 29, 556430)}, {‘value’: 32, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 30, 557127)}, {‘value’: 2, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 31, 557792)}]

40.33

上面代码模拟一段时间内产生了数据,等一段固定时间取数据来计算平均值。

7) 窗口函数实现

将上面的获取数据的程序拓展为window函数,使用重叠的方案。

 

import random
import datetime
import time

def source(second=1):
while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
# #获取数据
# s = source()
# items = [next(s)for _ in range(3)]
# print(items)
# print(“{:.2f}”.format(handler(items)))

def window(iterator,handler,width:int,interval:int):
“””
     窗口函数
    :param iterator: 数据源,生成器,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒。
    :param interval: 处理时间间隔,秒
    :return:
    “””
    start = datetime.datetime.strptime(‘20170101 000000 +0800′,’%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800′,’%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
delta = datetime.timedelta(seconds=width-interval)
while True:
#从数据源获取数据
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval计算buffer中的数据一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数

def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

window(source(),handler,10,5)

 

 

时间计算:

 

 

10、分发:

1)生产者消费者模型

对于一个监控系统,需要处理很多数据,包括日志,对其中已有的数据采集、分析。被监控对象就是数据的生产者producer,数据的处理程序就是数据的消费者consumer。

生产者消费者传统模型。

 

 

 

 

 

 

 

 

传统的生产者消费者模式,生产者生产,消费者消费,这种模型存在问题,开发代码的耦合性太高,如果生成规模扩大,不易扩展,生产和消费的速度很难匹配等。

 

解决的办法就是——队列queue

作用——解耦、缓冲。

 

 

 

 

 

 

 

 

 

 

 

 

生产者往往会部署好几个程序,日志也会产生好多,而消费者也会有多个程序,去提取日志分析处理。

 

数据的生产是不稳定的,会造成短时间数据的”潮涌”,需要缓冲。

消费者的消费能力不一样,有快有慢,消费者可以自己决定消费缓冲区中的数据。

 

单机可以使用queue内建的模块构件进程内的队列,满足多个线程之间的消费需要。

大型系统可以使用第三方消息中间件:RabbitMQ  RocketMQ   Kafka.

2)queue模块–队列

queue模块提供了一个先进先出的队列Queue。

 

Queue.Queue(maxsize=0)

创建FIFO队列,返回Queue对象。

maxsixe小于等于0,队列长度没有限制。

 

Queue.get(block=True,timeout=None)

从队列中移除元素并返回这个元素。

Block为阻塞。Timeout为超时。

如果block为True,是阻塞,timeout为None就是一直阻塞。

如果block为True,是阻塞,timeout有值的话就会阻塞到一定秒数抛出异常。

Block为False,是非阻塞,timeout就被忽略,要么成功返回一个元素,严么抛出empty异常。

 

Queue.get_nowait()

等价于get(False),也就是说要么成功返回一个元素,要么抛出异常。

但是queue的这种阻塞效果,需要多线程的时候演示。

 

Queue.put(item,block=True,timeout=None)

把一个元素加入到队列中去。

Block=True,timeout=None,一直阻塞至有空位置防元素。

Block=True,timeout=5,阻塞5秒就抛出full异常。

Block=True,timeout实效,立即返回,能塞进去就塞,不能塞就返回抛出异常。

 

Queue.put_nowait(item)

等价于put(item,False),也就是能塞进去就塞,不能就抛出full异常。

 

#Queue测试。
from queue import Queue
import random

q = Queue()
q.put(random.randint(1,100))
q.put(random.randint(1,100))
print(q.get())
print(q.get())
#print(q.get())
print(q.get(timeout=3))

 

第一个print  : 68

第二个print:15

第三个print :阻塞

第四个print:超过timeout报错,empty。

 

 

 

 

11、分发器实现:

生产者(数据源)生产数据,缓冲到消息队列中。

 

数据处理流程:

数据加载-》 提取-》 分析(滑动窗口函数)

 

处理大量数据的时候,对于一个数据源来说,需要多个消费者处理,但是如果分配就是个问题了。

需要一个分发器(调度器),把数据分发给不同的消费者处理。

每一个消费者拿到数据后,有自己的处理函数,要有注册机制。

 

数据加载——》提取——》分发——》  分析函数1

| —–》 分析函数2

分析1 和分析2是不同的handler,不同的窗口宽度,间隔时间。

 

如何分发?

轮询策略。

一对多的副本发送,一个数据通过分发器,发送到n个消费者。

 

消息队列

在生产者和消费者之间使用消费队列,那么所有消费者公用一个消息队列,还是各自拥有一个队列呢?

共用一个队列也是可以的,但是需要解决争抢的问题,相对来说每个消费者自己拥有一个队列,何为容易。

 

如何注册;

在调度器内部记录有哪些消费者,每一个消费者拥有自己的队列。

 

线程。

由于一个数据会被多个不同的注册过的handler处理,最好的方式就是线程。

线程使用举例。

线程使用举例:

import threading
#定义线程
# target线程中运行的函数,args这个啊哈双女户运行时候需要的实参元组。
t = threading.Thread(target=window,args=(src,handler,width,interval))

#启动线程
t.start()

 

12、分发器实现代码:

import random
import datetime
import time
import threading
from queue import Queue

def source(second=1):
“””
    生成数据
    :param second:
    :return:
    “””
    while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
# #获取数据
# s = source()
# items = [next(s)for _ in range(3)]
# print(items)
# print(“{:.2f}”.format(handler(items)))

def window(iterator,handler,width:int,interval:int):
“””
     窗口函数
    :param iterator: 数据源,生成器,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒。
    :param interval: 处理时间间隔,秒
    :return:
    “””
    start = datetime.datetime.strptime(‘20170101 000000 +0800′,’%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800′,’%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
delta = datetime.timedelta(seconds=width-interval)
while True:
#从数据源获取数据
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval计算buffer中的数据一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数

def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

window(source(),handler,10,5)

def dispatcher(src):
#分发器中记录handler,同时保存各自的队列
handlers = []
queues = []

def reg(handler,width:int,interval:int):
“””
        注册窗口处理函数
        :param handler:注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        :return:
        “””
    q = Queue()
queues.append(q)

h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)

def run():
for t in handlers:
t.start()  #启动线程处理数据

for item in src:   #将数据源提取到的数据分发到所有队列中。
for q in queues:
q.put(item)
return reg,run
reg,run = dispatcher(source())

reg(handler,10,5)  #注册
run()   #运行

13、整合代码

import random
import datetime
import time
import threading
from queue import Queue

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

regex = re.compile(pattern)  #编译

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
         ‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

def load(path):
“””装载日志文件”””
    with open(path)as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
#数据处理
def source(second=1):
“””
    生成数据
    :param second:
    :return:
    “””
    while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)

#滑动窗口函数
def window(iterator,handler,width:int,interval:int):
“””
     窗口函数
    :param iterator: 数据源,生成器,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒。
    :param interval: 处理时间间隔,秒
    :return:
    “””

    start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
    delta = datetime.timedelta(seconds=width-interval)

while True:
#从数据源获取数据
        data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval计算buffer中的数据一次。
        if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
            buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数

#随机数平均数测试函数
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

def donothing_handler(iterable):
return iterable

def dispatcher(src):
#分发器中记录handler,同时保存各自的队列
    handlers = []
queues = []

def reg(handler,width:int,interval:int):
“””
        注册窗口处理函数
        :param handler:注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        :return:
        “””
    q = Queue()
queues.append(q)

h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)

def run():
for t in handlers:
t.start()  #启动线程处理数据

        for item in src:   #将数据源提取到的数据分发到所有队列中。
            for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
    import sys
path = ‘test.log’

    reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #运行

reg(handler,10,5)  #注册
run()   #运行

 14、完成分析功能

分析日志很重要,通过海量数据分析就能够知道是否遭受了攻击,是否被爬取及爬取高峰期,是否有盗链等。

 

百度(baidu)爬虫名称(baiduspider)

谷歌(goole)爬虫名称(Googlebot)

15、状态码分析

状态码分析:

304  服务器收到客户端提交的请求参数,发现资源未变化,要求浏览器使用静态资源的缓存。

404 服务器找不到请求的资源。

304占比大,说明静态缓存效果明显,404占比大,说明网站出现了问题。或者尝试嗅探资源。

如果400、500占比突然开始增大,网站一定出问题了。

 

def status_hanler(iterable):
#时间窗口内的一批函数
    status = {}
for item in iterable:
key = item[‘status’]
status[key] = status.get(key,0)+1

total = len(iterable)
return {k:status[k]/total for k,v in status.items()}

16、日志文件的加载

目前实现的代码中,只能接受一个路径,修改为一批路径。

 

可以约定一下路径下文件的存放方式:

如果送来的是一批路径,就迭代其中的路径。

如果路径是一个普通文件,就按照行读取文件。

如果路径是一个目录,就遍历路径下所有普通文件,每一个文件按照行处理,不递归处理子目录:

 

from pathlib import Path

def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
                
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
        if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
        elif p.is_file():
yield from openfile(str(p))

 

17、完整代码

import random
import datetime
import time
import threading
from queue import Queue

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

regex = re.compile(pattern)  #编译

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
         ‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue

def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
        if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
        elif p.is_file():
yield from openfile(str(p))

#数据处理
def source(second=1):
“””
    生成数据
    :param second:
    :return:
    “””
    while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)

#滑动窗口函数
def window(iterator,handler,width:int,interval:int):
“””
     窗口函数
    :param iterator: 数据源,生成器,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒。
    :param interval: 处理时间间隔,秒
    :return:
    “””

    start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
    delta = datetime.timedelta(seconds=width-interval)

while True:
#从数据源获取数据
        data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval计算buffer中的数据一次。
        if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
            buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数

#随机数平均数测试函数
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

def donothing_handler(iterable):
return iterable

def dispatcher(src):
#分发器中记录handler,同时保存各自的队列
    handlers = []
queues = []

def reg(handler,width:int,interval:int):
“””
        注册窗口处理函数
        :param handler:注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        :return:
        “””
    q = Queue()
queues.append(q)

h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)

def run():
for t in handlers:
t.start()  #启动线程处理数据

        for item in src:   #将数据源提取到的数据分发到所有队列中。
            for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
    import sys
path = ‘test.log’

    reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #运行

reg(handler,10,5)  #注册
run()   #运行

 

 

可以指定文件或目录,对日志进行数据分析。

分析函数可以动态注册

数据可以分发给不同的分析处理程序处理。

 

 

18、浏览器分析

1)Useragent

这里指的是,软件按照一定的格式想远端的服务器提供一个标识自己的字符串。

在HTTP协议中,使用user-agent字段传送这个字符串。

2)信息提取

Pyyaml uaparser user-agent模块

 

安装 pip install Pyyaml uaparser user-agent

使用

from user_agents import parse

useragents = [
“Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)”\
“Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)”\
“Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0”
]

for uastring in useragents:
ua = parse(uastring)
print(ua.brower,ua.brower.family,ua.brower.version,ua.brower.version_string)

#运行结构

 

 

数据分析代码:

from user_agents import parse

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda useragent: parse(useragent)

}

增加浏览器分析函数:

 

from user_agents import parse

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda useragent: parse(useragent)

}

from user_agents import parse

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda ua: parse(ua)
}

#浏览器分析
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]

key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
return browers

 

统计所有浏览器:

 

allbrowers = {}
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]

key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
allbrowers[key] = allbrowers.get(key,0)+1

print(sorted(allbrowers.items(),key =lambda x:x[1],reversed=True))[:10]
return browers

19、完整版代码(最终版)

import random
import datetime
import time
import threading
from queue import Queue

line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’

regex = re.compile(pattern)  #编译

ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
         ‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’

def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))

def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue

def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
        if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
        elif p.is_file():
yield from openfile(str(p))

#数据处理
def source(second=1):
“””
    生成数据
    :param second:
    :return:
    “””
    while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)

#滑动窗口函数
def window(iterator,handler,width:int,interval:int):
“””
     窗口函数
    :param iterator: 数据源,生成器,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒。
    :param interval: 处理时间间隔,秒
    :return:
    “””

    start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
    delta = datetime.timedelta(seconds=width-interval)

while True:
#从数据源获取数据
        data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]

#每隔interval计算buffer中的数据一次。
        if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
            buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数

#随机数平均数测试函数
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)

def donothing_handler(iterable):
return iterable

def status_hanler(iterable):
#时间窗口内的一批函数
    status = {}
for item in iterable:
key = item[‘status’]
status[key] = status.get(key,0)+1

total = len(iterable)
return {k:status[k]/total for k,v in status.items()}

#浏览器分析
allbrowers = {}
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]

key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
allbrowers[key] = allbrowers.get(key,0)+1

print(sorted(allbrowers.items(),key =lambda x:x[1],reversed=True))[:10]
return browers

def dispatcher(src):
#分发器中记录handler,同时保存各自的队列
    handlers = []
queues = []

def reg(handler,width:int,interval:int):
“””
        注册窗口处理函数
        :param handler:注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        :return:
        “””
    q = Queue()
queues.append(q)

h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)

def run():
for t in handlers:
t.start()  #启动线程处理数据

        for item in src:   #将数据源提取到的数据分发到所有队列中。
            for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
    import sys
path = ‘test.log’

    reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #运行

reg(handler,10,5)  #注册
run()   #运行

 

 

本文来自投稿,不代表Linux运维部落立场,如若转载,请注明出处:http://www.178linux.com/97632

发表评论

登录后才能评论

联系我们

400-080-6560

在线咨询:点击这里给我发消息

邮件:1823388528@qq.com

工作时间:周一至周五,9:30-18:30,节假日同时也值班