kafka 入门与实践

栏目: 服务器 · Apache · 发布时间: 5年前

内容简介:kafka 是 Apache 基金会下的一个开源软件,它的主要作用是用于提供分布式流处理以及消息队列服务。其官网

kafka 是 Apache 基金会下的一个开源软件,它的主要作用是用于提供分布式流处理以及消息队列服务。

kafka 入门与实践

其官网 https://kafka.apache.org/

最早是由 Linkedln 公司使用 scala 语言编写。

特性

  • 解耦:作为MQ,助力微服务(传统MQ更合适)。
  • 冗余:提供数据冗余,高可用。
  • 扩展性:简化应用扩展。
  • 灵活性:访问量剧增时应用仍可发挥作用,减轻后端压力。
  • 顺序保证:保证一个分区内消息的有序性。
  • 缓冲:数据密度较大的在线处理中缓冲数据,如物联网,网站监控等。
  • 高速写入:磁盘顺序写,而非随机写。
  • 高可靠性:通过zk做分布式一致性,同步到任意多块磁盘上,故障自动切换选主,自愈。
  • 高容量:通过横向扩展,LinkedIn每日通过Kafka存储的新增数据高达175TB,8000亿条消息。

应用场景

  • 消息队列:场景和常见MQ相似。

  • 行为跟踪:页面浏览、搜索等,实时记录到topic中,订阅者可用来实时监控或放到hadoop/离线仓库处理。

  • 元数据监控:作为操作记录的监控模块,记录操作信息,类似运维性质的数据监控(审计)。

  • 日志收集:收集服务器日志,交由文件服务器或hdfs处理。

  • 流处理:接收流数据,提供给流式计算框架使用,多用于数据密度较大场景。

kafka 入门与实践

例如

  1. 分析用户行为,设计更好的广告位。

  2. 对用户搜索关键词进行统计,分析当前流行趋势。

  3. 监控用户行为,防止用户无限制抓取网站数据。

  4. 网站实时监控,获得实时性能数据,及时发出网站性能告警。

  5. 批量导入数据到hadoop/数据仓库,对数据离线分析,获取有价值的商业信息。

基本概念

  • Producer:消息和数据的生产者,数据向topic发布。

  • Consumer:消息和数据的消费者,订阅topic并处理消息。

  • Broker:Kafka集群中的服务器,producer->broker->consumer。

  • Topic:消息的分类。

  • Partition:topic物理上的分组,一个topic可有多个partition,partition为一个有序队列,每个 partition中的数据有序,每个消息会对应一个id(offset)。

  • Message:消息,通信基本单位。

  • 流:一组从生产者移动到消费者的数据,kafka streams。

kafka 体系结构

kafka 入门与实践

  • Producer:消息的发布者
  • Consumer:消息的订阅者
  • Broker:中间的存储服务器

kafka 入门与实践

Producer

Producer将消息发布到指定的topic,同时producer可以决定消息归属于哪个partition,比如基于round-robin方式进行均匀分布。

可指定发布的分区,借助分区器+消息键实现消息均匀分布,可自定义分区器。

多个生产者可对应一个topic (如多个网页的监控对应一个topic)。

批量发送:Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中后一次性发出(对比redis pipeline),多用在流处理。

Broker

Broker进行数据的缓存代理,Kafka集群中的一台或者多台服务器统称为broker。

Broker可为消息设置偏移量。

为减少磁盘写入次数,broker会将消息暂时缓存起来,当消息的个数(或空间)达到上限时再flush到磁盘,减少I/O调用。

Broker 无状态,不保存订阅者的状态,订阅者自己保存。

Consumer

Consumer从topic订阅并处理消息。

每个Consumer属于一个consumer group,发送到topic到消息只会被每个group中的一个consumer消费。

一个topic中一个partition只会被group中一个consumer消费,但一个consumer可以同时消费多个partition中的消息。

Kafka只保证一个topic下一个partition中的消息被某个consumer消费时消息是有序的,RabbitMQ天然有序。

每个group中不同consumer间消费独立。

对于一个topic,一个group中consumer数目不能大于partition个数。

Consumer Group

对topic来说一个group就是一个”订阅者”,group作为一个整体对一个topic进行消费,不同group间独立订阅。

一个group内的consumer只能消费不同的partition。

Topic

topic可以认为是一类消息。

每个topic划分为多个pattition。

每个partiton在存储层面表现为append log,发布到partition的消息被追加到log文件结尾。

消息在log文件中的位置称为offset。

Partition

Partition是topic上的物理分组。

分区目的:将log分散到多个broker上,保证

消费效率。多个partition对应多个consumer,

增加并发消费能力。

一个topic可以分为多个partition。

每个partition是一个有序队列。

Partition中每条消息对应一个id(offset)。

kafka 入门与实践

Message

Message:通信的基本单位。

每个partition存储一部分message。

每条消息包含三个属性:

Offset:long

MessageSize:int32

Data:具体消息内容

Offset

Offset为消息在log文件中的位置(逻辑值)。

offset唯一标记一个partition中的一条消息,可理解为message的标识id。

消费者可将Offset可保存在zk或broker或本地。

消息的处理机制

Kafka对消息的重复、顺序性没有严格要求。

Kafka提供at-least-once delivery机制,即consumer异常后,有些消息可能会被重复的delivery。

Kafka为每条消息进行CRC校验,用于错误检测,CRC不通过的消息会被丢弃。

事务

非事务:”读取->处理->写入”中读写异步,流处理场景。

事务功能主要是一个服务端和协议级的功能,任何支持它的客户端库都可以使用它。

Kafka并未实现严格的”读取->处理->写入”的原子过程。

事务的机制主要exactly once实现,即消息只被发送一次,但目前只能保证读取的事务性,消费者一侧并未实现严格的事务性,按kafka的使用场景看也没必要实现。

安装

安装 Java

yum -y install java

Kafka下载

wget -c http://apache.claz.org/kafka/1.0.0/kafka_2.11-1.0.0.tgz

解压

tar  zxvf kafka_2.11-1.0.0.tgz

启动zk:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka:

bin/kafka-server-start.sh config/server.properties

创建topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1 Created topic "topic1".

查看topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

启动producer:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1

启动cousumer:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning

Producer发消息。

多节点:复制单节点配置文件,修改broker.id、监听端口、log路径即可。

与其他 MQ 比较

  • RabbitMQ:老牌MQ,应用较多,如OpenStack组件之间的通信,支持协议多,重量级消息队列,对路由、负载均衡、数据持久化支持很好,但无法适应持续产生的数据流,大量数据堆积时性能急剧下降。

  • ZeroMQ:号称最快的消息队列系统,擅长高级/复杂的队列,但使用也复杂,代码侵入,不提供持久化,只是一个库,相当于一个加强版的socket,与MQ区别较大。

  • Redis:Redis也有MQ功能,数据量小,数据大于10KB时基本异常慢,数据量小时性能优于RabbitMQ。

使用

生成者

root@vpn:~# cat producer.py
#/usr/bin/python3.5
# coding:utf-8
from kafka import KafkaProducer


# 生产者
def producer_message(topic_name):
    producer = KafkaProducer(bootstrap_servers=["c-5jgvwkxjgd.kafka.cn-east-1.internal:9092"])
    for i in range(10000):
        message_string = "msg%d" %i
        response = producer.send(topic_name, message_string.encode('utf-8'))
    producer.close()

producer_message('topic1')

消费者

root@vpn:~# cat consumer.py
#/usr/bin/python3.5
# coding:utf-8
from kafka import KafkaConsumer
# 消费者
def consumer_message(topic_name):

    consumer = KafkaConsumer(topic_name,bootstrap_servers=["c-5jgvwkxjgd.kafka.cn-east-1.internal:9092"])
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                             message.offset, message.key,
                                             message.value))


consumer_message('topic1')

本文由王总整理。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

深入应用C++11

深入应用C++11

祁宇 / 机械工业出版社 / 2015-5 / 79

在StackOverflow的最近一次世界性调查中,C++11在所有的编程语言中排名第二, C++11受到程序员的追捧是毫不意外的,因为它就像C++之父Bjarne Stroustrup说的:它看起来就像一门新的语言。C++11新增加了相当多的现代编程语言的特性,相比C++98/03,它在生产力、安全性、性能和易用性上都有了大幅提高。比如auto和decltype让我们从书写冗长的类型和繁琐的类型......一起来看看 《深入应用C++11》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具