用Data-pipeline模式将nginx日志存储到elasticsearch中(1)

栏目: 服务器 · Nginx · 发布时间: 6年前

内容简介:做运维的小伙伴应该都知道nginx日志的重要性,一般出现访问问题,我们可能第一时间要去看日志去分析问题,但除了协助我们排查问题外,如果对nginx日志进一步分析可以得到更有用的数据,例如可以监控某站点的http状态码、PV,UV情况,request_time和response_time等,如果辅助其它工具进一步分析可以预防一些安全问题,比如同一个IP的访问某页面超出了限制,我们可以设置策略发现后可以自动拒绝有危险的IP访问,这也是我们经常说的防刷功能,今天要跟大家分享的是如何将nginx日志存储到es中,

做运维的小伙伴应该都知道nginx日志的重要性,一般出现访问问题,我们可能第一时间要去看日志去分析问题,但除了协助我们排查问题外,如果对nginx日志进一步分析可以得到更有用的数据,例如可以监控某站点的http状态码、PV,UV情况,request_time和response_time等,如果辅助其它 工具 进一步分析可以预防一些安全问题,比如同一个IP的访问某页面超出了限制,我们可以设置策略发现后可以自动拒绝有危险的IP访问,这也是我们经常说的防刷功能,今天要跟大家分享的是如何将nginx日志存储到es中,存储的目的当时是为了更好的分析它,大家都知道,如果要存储到es中,数据必须是json格式的,但如果当前日志不是json格式的就需要进行一个转换,要把每行数据转成json格式的这里有几个问题要明确:

首先,access.log是不断增长的,这就需要有个程序能实时读取新的日志进行转换,这是第一个要解决的问题。

其次,对转换完的数据我们能不能直接存储到es中,答案是不建议,因为如果转一次存一次,在程序中我们势必要判断每次存储是否成功,在高并发的网站中日志的产生量是非常巨大的,这样一来会影响整体的效率,那怎么办?在这种情况下,最好的解决办法就是用消息队列的方式去解决,这里我们选择kafka,这个后续会说。

根据上面的内容大家可以看到其实在分析nginx日志的过程就是一个数据流的处理,这也是我们标题所说的data-pipline,pipline故名意思就是管道的意思,加一个data就是一个数据管道,我们就是要建立一个这样的管道将数据源源不断的产生出来,就像水管的水一样。

因为要解决的二个问题,今天我们先解决第一个,先实现access.log实时读取和转换成json格式,先看我们的access.log的文件内容:

111.20.21.22 - - [19/Dec/2016:12:00:30 +0800] "GET / HTTP/1.1" 502 602 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36"

划重点了,请看代码:

import time
import datetime
import json
import socket
 
 
def parse_log_line(line):
    strptime = datetime.datetime.strptime
    hostname = socket.gethostname()
    time = line.split(' ')[3][1::]
    entry = {}
    entry['datetime'] = strptime(
        time, "%d/%b/%Y:%H:%M:%S").strftime("%Y-%m-%d %H:%M")
    entry['source'] = "{}".format(hostname)
    entry['type'] = "www_access"
    entry['log'] = "'{}'".format(line.rstrip())
    return entry
 
 
def show_entry(entry):
    temp = ",".join([
        entry['datetime'],
        entry['source'],
        entry['type'],
        entry['log']
    ])
    log_entry = {'log': entry}
    temp = json.dumps(log_entry)
    print("{}".format(temp))
    return temp
 
 
def follow(syslog_file):
    syslog_file.seek(0, 2)
    
    while True:
        line = syslog_file.readline()
        if not line:
            time.sleep(0.1)
            continue
        else:
            entry = parse_log_line(line)
            if not entry:
                continue
            json_entry = show_entry(entry)
            print(json_entry)
            
 
 
f = open("access.log", "rt")
follow(f)

运行结果:

{
	"log": {
		"datetime": "2016-12-19 12:00",
		"source": "iZ258ml0cx5Z",
		"type": "www_access",
		"log": "'111.20.21.22 - - [19/Dec/2016:12:00:30 +0800] \"GET / HTTP/1.1\" 502 602 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\"'"
	}
}

脚本整体逻辑是先将行日志转换成一个字典(parse_log_line()函数),然后将字典转成json格式(show_entry()函数),最后follow函数是开始从尾部读取日志文件注意seek(0,2)参数是2从文件底部读取。

写到这我们第一个问题就算是解决了,接下来的工作就是如果把数据传给kafka了,因为kafka本身就是一块比较大的内容限于篇幅我们下次继续,多谢各位观看,如果觉得还不错,还请帮转发。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Advanced Web Metrics with Google Analytics, 2nd Edition

Advanced Web Metrics with Google Analytics, 2nd Edition

Brian Clifton / Sybex / 2010-3-15 / USD 39.99

Valuable tips and tricks for using the latest version of Google Analytics Packed with insider tips and tricks, this how-to guide is fully revised to cover the latest version of Google Analytics and sh......一起来看看 《Advanced Web Metrics with Google Analytics, 2nd Edition》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器