用Data-pipeline模式将nginx日志存储到elasticsearch中(续)

栏目: 后端 · 发布时间: 5年前

内容简介:接上篇的内容,这篇我们要解决第二个问题,就是如何将我们转换完成的json数据发送到es中去,上篇提到了要存储到es中,我们要使用kafka来做消息队列,实现发布和订阅消息流模式,因为涉及kafka内容,所以我们先说一下Kafka的一些基本知识,然后再看代码不然一是一头雾水。kafka介绍:1、什么是kafka?

接上篇的内容,这篇我们要解决第二个问题,就是如何将我们转换完成的json数据发送到es中去,上篇提到了要存储到es中,我们要使用kafka来做消息队列,实现发布和订阅消息流模式,因为涉及kafka内容,所以我们先说一下Kafka的一些基本知识,然后再看代码不然一是一头雾水。

kafka介绍:

1、什么是kafka?

Kafka是最初由Linkedin公司开发,是一个分布式、支
持分区的(partition)、多副本的(replica),基于
zookeeper协调的分布式消息系统,它的最大的特性就是可
以实时的处理大量数据以满足各种需求场景:比如基于
hadoop的批处理系统、低延迟的实时系统、storm/Spark
流式处理引擎,web/nginx日志、访问日志,消息服务等
等,用scala语言编写,Linkedin于2010年贡献给了Apache
基金会并成为顶级开源 项目。

2、kafka特性

- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,
它的延迟最低只有几毫秒,每个topic可以分多个partition,
consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持
数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n1个节点失败)
- 高并发:支持数千个客户端同时读写

3、kafka解决了什么问题?

kafka能在系统或应用程序之间构建可靠的用于传输实时数据的管道,
是一种高吞吐量的分布式发布订阅消息系统,
将不同的数据通过不同的topic实现发布订阅,
生产者生成的数据发布到对应的topic中,订阅这个topic的消费者
都可以消费这个topic中的数据。

4、kafka的一些概念和名词

1)Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线
性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。
2)Producer
负责发布消息到Kafka broke
3)Consumer
消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进行处理。
4)Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保
存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
5)Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
6)Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
7)Topic & Partition 关系
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使
得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个
Partition的所有消息和索引文件

除了上面的一些内容外,我也谈一下我自己的一些理解,为什么kafka变得如此流行,很大程度上因为的特性决定的,

比如数据的聚合,怎么理解,就是我们可以把产生数据的系统就做生产者,有多少生产者无所谓,只要建立对应的topic,把数据发送到这个topic中就可以,如果有不同部门向要相同的数据,就去订阅这个topic即可,这样最大的好处是使我们系统结构清晰,同时能减少我们很多重复的工作。

另外一个重要的特性就是高并发性,并支持分布式部署,在一个繁忙的系统中,产生的日志或者其它数据是非常庞大的,要对这些数据进行处理,必须要一个高吞吐量、低延迟的处理系统,Kafka正好满足这个需求,每个topic可以建立一个或多个分区,每个分区你可以简单理解为一个公路上的多个车道,每个车就是数据,因为车道多所以它可以加速数据的传输。

经过上面的介绍,相信大家都对Kafka有了一个基本的了解,那接下来回到我们上面的问题,看如何在我们这个例子中使用:

1、安装kafka,这个因为比较简单就不在费篇幅写了。

2、安装完毕后建立一个topic, 命令:

bin/kafka-topics.sh –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 1 –topic www_logs

3、安装:pip install kafka-python

4、实现生产者类:

from kafka import KafkaProducer
import json
 
 
class MyKafka(object):
 
    def __init__(self, kafka_brokers):
        self.producer = KafkaProducer(
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            bootstrap_servers=kafka_brokers
        )
 
    def send_page_data(self, json_data, topic):
        result = self.producer.send(topic, key=b'log', value=json_data)
        print("kafka send result: {}".format(result.get()))

代码不是很多,先定义了初始化类的__init__方法,需要两个参数,一个就是broker这个没什么可说的, value_serializer的意思是用于将用户提供的消息值转换为字节,send_page_data()方法是将数据发送给对应的topic。

5、修改下上篇中的脚本,使其实现转换后发送到kafka中,修改如下:

import time
import datetime
import socket
import json
from mykafka import MyKafka   #add
 
 
def parse_log_line(line):
    strptime = datetime.datetime.strptime
    hostname = socket.gethostname()
    time = line.split(' ')[3][1::]
    entry = {}
    entry['datetime'] = strptime(
        time, "%d/%b/%Y:%H:%M:%S").strftime("%Y-%m-%d %H:%M")
    entry['source'] = "{}".format(hostname)
    entry['type'] = "www_access"
    entry['log'] = "'{}'".format(line.rstrip())
    return entry
 
 
def show_entry(entry):
    temp = ",".join([
        entry['datetime'],
        entry['source'],
        entry['type'],
        entry['log']
    ])
    log_entry = {'log': entry}
    temp = json.dumps(log_entry)
    print("{}".format(temp))
    return temp
 
 
def follow(syslog_file):
    syslog_file.seek(0, 2)
    pubsub = MyKafka(["10.51.117.28:9092"])   #add
    while True:
        line = syslog_file.readline()
        if not line:
            time.sleep(0.1)
            continue
        else:
            entry = parse_log_line(line)
            if not entry:
                continue
            json_entry = show_entry(entry)
            print(json_entry)
            pubsub.send_page_data(json_entry, 'www_logs')  #add
 
 
f = open("access.log", "rt")
follow(f)

增加的行都已注释,不多解释了。

到这我再梳理下我们都干了什么。

第一,我们实现了access.log日志的实时读取,并传唤成json格式。

第二,我们讲转换完毕的数据发送到kafka中,topic名词是www_logs

以上我们都已完成,接下来的问题是要怎么把数据存储到es中,不过先别急,先让我们验证下我们之前的工作是否正确吧,要验证是否可以从topic中读取数据,我们还需要一个消费者程序,为简单验证,我这边实现一个最简单的消费者程序,如下:

from kafka import KafkaConsumer
consumer = KafkaConsumer('www_logs',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value))

就是从topic中读取数据,然后打印,如果没问题,就可以证明这条通路是通的,首先在一个终端运行,我们的日志分析程序,结果如下:

{"log": {"datetime": "2016-12-19 12:00", "source": "iZ258ml0cx5Z", "type": "www_access", "log": "'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \"GET / HTTP/1.1\" 502 602 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\"'"}}
{"log": {"datetime": "2016-12-19 12:00", "source": "iZ258ml0cx5Z", "type": "www_access", "log": "'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \"GET / HTTP/1.1\" 502 602 \"-\" \"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\"'"}}

行太多,我只截取了2行,因为我们有打印,说明转换是没有问题的,但是否发送到了kafka呢,我们在另一个终端运行我们的消费程序,结果如下:

www_logs:0:21: key=b'log' value=b'"{\\"log\\": {\\"datetime\\": \\"2016-12-19 12:00\\", \\"source\\": \\"iZ258ml0cx5Z\\", \\"type\\": \\"www_access\\", \\"log\\": \\"\'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \\\\\\"GET / HTTP/1.1\\\\\\" 502 602 \\\\\\"-\\\\\\" \\\\\\"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\\\\\\"\'\\"}}"'
www_logs:0:22: key=b'log' value=b'"{\\"log\\": {\\"datetime\\": \\"2016-12-19 12:00\\", \\"source\\": \\"iZ258ml0cx5Z\\", \\"type\\": \\"www_access\\", \\"log\\": \\"\'10.1.1.2 - - [19/Dec/2016:12:00:30 +0800] \\\\\\"GET / HTTP/1.1\\\\\\" 502 602 \\\\\\"-\\\\\\" \\\\\\"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Geco) Chrome/54.0.2840.99 Safari/537.36\\\\\\"\'\\"}}"'

可以看到我们消费者成功的从kafka中取得了数据,说明这条通路我们已经打通了,这条数据流已经没有问题,那接下来就剩下最后存储到es中的问题了,我们后续再接续,这篇已经很长了。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

有趣的二进制

有趣的二进制

[ 日] 爱甲健二 / 周自恒 / 人民邮电出版社 / 2015-10 / 39.00元

《有趣的二进制:软件安全与逆向分析》通过逆向工程,揭开人们熟知的软件背后的机器语言的秘密,并教给读者读懂这些二进制代码的方法。理解了这些方法,技术人员就能有效地Debug,防止软件受到恶意攻击和反编译。本书涵盖的技术包括:汇编与反汇编、调试与反调试、缓冲区溢出攻击与底层安全、钩子与注入、Metasploit 等安全工具。 《有趣的二进制:软件安全与逆向分析》适合对计算机原理、底层或计算机安全......一起来看看 《有趣的二进制》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具