Kafka Connect 实时读取MSSQL数据到Kafka

栏目: 数据库 · SQL Server · 发布时间: 5年前

内容简介:在处理实时数据时,需要即时地获得数据库表中数据的变化,然后将数据变化发送到Kafka中。这篇文章将介绍如何使用Kafka Connector完成这一工作。当获取实时数据时,数据源需要支持对数据变化进行反馈。不同的数据源采用了不同的技术和方法实现该功能,因为我们的业务数据库是MS SQL Server,因此这篇文章采用MSQL作为数据源。首先需要选择Connector,不同的数据源有不同的Connector,例如ActiveMQ Connector、MySql Connector、MSSQL Connect

在处理实时数据时,需要即时地获得数据库表中数据的变化,然后将数据变化发送到Kafka中。这篇文章将介绍如何使用Kafka Connector完成这一工作。当获取实时数据时,数据源需要支持对数据变化进行反馈。不同的数据源采用了不同的技术和方法实现该功能,因为我们的业务数据库是MS SQL Server,因此这篇文章采用MSQL作为数据源。

选择Connector

首先需要选择Connector,不同的数据源有不同的Connector,例如ActiveMQ Connector、MySql Connector、MSSQL Connector等。即便是同一数据源,也可能有不同的第三方提供。我一共尝试了下面两个MSSQL Connector:

比较遗憾的是:这两个Connector,debezium的是Alpha版本,confluent的是Preview版本,反正都不是正式版,而 MySql 都已经有正式版本了,可见开源社区对MS真的不友好呀 >_<、 它们两个一个是使用MSSQL Server的 Change Data Capture 获取数据变更,一个是使用 Change Tracking

因为Change Tracking相比Change Data Capture来说,更轻量一些,因此我选用了confluent的Connector。其下载地址是: https://www.confluent.io/hub/

安装Connector

下载后,将其解压缩至 $KAFKA_HOME/connectors 文件夹下,如下图所示:

$KAFKA_HOME是你的kafka安装目录,如果是集群,要安装在集群下每台机器的connectors目录下。

Kafka Connect 实时读取MSSQL数据到Kafka

在上面的截图中,可以看到我还安装了confluentinc-kafka-connect-hdfs-5.0.0和debezium-connector-sqlserver两个connector。

配置Connector

接下来要对Connector进行配置,此时可以回顾一下 Kafka Connect 基本概念 。Connector是一组独立的集群,并且是作为Kafka集群的客户端,我们首先需要对Connector进行配置,配置文件位于 $KAFKA_HOME/config/connect-distributed.properties:

# kafka集群地址
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

# Connector集群的名称,同一集群内的Connector需要保持此group.id一致
group.id=connect-cluster

# 存储到kafka的数据格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# 内部转换器的格式,针对offsets、config和status,一般不需要修改
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# 用于保存offsets的topic,应该有多个partitions,并且拥有副本(replication)
# Kafka Connect会自动创建这个topic,但是你可以根据需要自行创建
# 如果kafka单机运行,replication.factor设置为1;当kafka为集群时,可以设置不大于集群中主机数
# 因为我这里的环境是3主机的集群,因此设为2
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
offset.storage.partitions=12

# 保存connector和task的配置,应该只有1个partition,并且有多个副本
config.storage.topic=connect-configs
config.storage.replication.factor=2

# 用于保存状态,可以拥有多个partition和replication
status.storage.topic=connect-status
status.storage.replication.factor=2
status.storage.partitions=6

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# RESET主机名,默认为本机
#rest.host.name=
# REST端口号
rest.port=18083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# 保存connectors的路径
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/opt/kafka/kafka_2.11-1.1.0/connectors

注意到connect-distributed.properties中的distributed。Kafka Connector有两种运行模式,单机(Standalone)和分布式(Distrubited)。因为单机通常作为测试运行,因此这篇文章只演示分布式运行模式。在config文件夹下,还有一个单机运行的配置文件,叫做connect-standalone.properties,内容大同小异。

创建Topic

尽管首次运行Kafka connector时,会自动创建上面的topic,但是如果创建出错,那么Connector就会启动失败。保险起见,可以在运行Connector之前,手动创建好上面的三个特殊topic。

# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic connect-offsets --replication-factor 2 --partitions 12

# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic connect-configs --replication-factor 2 --partitions 1

# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic connect-status --replication-factor 2 --partitions 6

运行Connector

接下来就可以运行Connctor了,此时还没有涉及到任何业务或者数据库相关的配置和操作(即 Kafka Connect 基本概念 中提到的用户配置)。

执行下面的代码以运行Connector:

# bin/connect-distributed.sh config/connect-distributed.properties

上面这样是前台运行,当退出 shell 后进程也就结束了,前台运行的好处就是在开始运行时便于调试。如果想要后台运行,则需加上-daemon选项:

# bin/connect-distributed.sh -daemon config/connect-distributed.properties

运行connect时,会看到不停地涌现大量INFO信息,此时可以修改一下connect-log4j.properties,只显示WARN信息。

# vim config/connect-log4j.properties
log4j.rootLogger=WARN, stdout

开启MSSQL数据库的Change Tracking

在继续进行之前,我们在数据库中创建表test_online,并且开启Change Tracking功能:

Go
CREATE TABLE [dbo].[test_online](
	[Id] [int] IDENTITY(1,1) NOT NULL,
	[UserName] [varchar](50) NOT NULL,
	[IsOnline] [bit] NOT NULL,
	[LastLogin] [int] NOT NULL,
 CONSTRAINT [PK_test_online] PRIMARY KEY CLUSTERED 
(
	[Id] ASC
)

Go
ALTER DATABASE db_name  
SET CHANGE_TRACKING = ON  
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  

Go
ALTER TABLE [db_name].dbo.[test_online]  
ENABLE CHANGE_TRACKING  
WITH (TRACK_COLUMNS_UPDATED = ON)

Kafka Connector REST API

当Kafka Connector运行起来以后,它就开启了REST API端口,像我们上面配置的是:18083。如果我们需要运行Task,比如实时捕捉数据库数据变化并写入Kafka,那么就需要像这个REST API提交用户配置(User Config)。在提交用户配置之前,我们先看看Kafka Connector REST API都包含哪些常见功能:

获取Worker的信息

因为我的kafka(主机名分别为kafka1、kafka2、kafka3)和kafka connector集群是共用主机的,因此可以使用下面的命令获取(你需要将下面的kafka1改成ip或者相应的主机名):

# curl -s kafka1:18083/ | jq
{
  "version": "1.1.0",
  "commit": "fdcf75ea326b8e07",
  "kafka_cluster_id": "N93UISCxTS-SYZPfM8p1sQ"
}

获取Worker上已经安装的Connector

此时的Connector是静态概念,即上面第一节安装的Confluent MSSQL Connector,从下面的显示可以看到,我安装了好几个Connector:

# curl -s kafka1:18083/connector-plugins | jq
[
  {
    "class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector",
    "type": "source",
    "version": "0.0.1.9"
  },
  {
    "class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "type": "sink",
    "version": "5.0.0"
  },
  {
    "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector",
    "type": "source",
    "version": "1.1.0"
  },
  {
    "class": "io.confluent.connect.storage.tools.SchemaSourceConnector",
    "type": "source",
    "version": "1.1.0"
  },
  {
    "class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "type": "source",
    "version": "0.9.0.Alpha1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "1.1.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "1.1.0"
  }
]

对于你来说,可能就只有io.confluent.connect.cdc.mssql.MsSqlSourceConnector这一个connector。

列出当前运行的connector(task)

# curl -s kafka1:18083/connectors | jq
[]

因为我们当前Connector中没有提交过任何的用户配置(即没有启动Task),因此上面返回空数组。

提交Connector用户配置

当提交用户配置时,就会启动一个Connector Task,Connector Task执行实际的作业。用户配置是一个Json文件,同样通过REST API提交:

# curl -s -X POST -H "Content-Type: application/json" --data '{
    "name": "connector-mssql-online",
    "config": {
        "connector.class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector",
        "tasks.max": 1,
        "server.name": "192.168.0.21",
        "server.port" : "1433",
        "username": "user_id",
        "password": "your_password",
        "initial.database": "db_name",
        "database.server.name": "awms",
        "change.tracking.tables": "dbo.test_online"
    }
}' http://kafka1:18083/connectors | jq

注意上面的配置要修改成你的本地配置。提交完成后,再次执行上一小节的命令,会看到已经有一个connector在运行了,其名称为connector-mssql-online:

curl -s kafka1:18083/connectors | jq
[
  "connector-mssql-online"
]

查看connector的信息

# curl -s kafka1:18083/connectors/connector-mssql-online | jq
{
  "name": "connector-mssql-online",
  "config": {
    "connector.class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector",
    "password": "your_password",
    "initial.database": "db_name",
    "server.name": "192.168.0.21",
    "tasks.max": "1",
    "server.port": "1433",
    "name": "connector-mssql-online",
    "database.server.name": "awms",
    "change.tracking.tables": "dbo.test_online",
    "username": "user_id"
  },
  "tasks": [
    {
      "connector": "connector-mssql-online",
      "task": 0
    }
  ],
  "type": "source"
}

上面task:0,不是说有0个task,是task的id是0。

查看connector下运行的task信息

使用下面的命令,可以查看connector下运行的task的信息:

# curl -s kafka1:18083/connectors/connector-mssql-online/tasks | jq
[
  {
    "id": {
      "connector": "connector-mssql-online",
      "task": 0
    },
    "config": {
      "connector.class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector",
      "password": "your_password",
      "initial.database": "db_name",
      "task.class": "io.confluent.connect.cdc.mssql.MsSqlSourceTask",
      "server.name": "192.168.0.21",
      "tasks.max": "1",
      "server.port": "1433",
      "name": "connector-mssql-online",
      "database.server.name": "awms",
      "change.tracking.tables": "dbo.test_online",
      "username": "user_id"
    }
  }
]

这里task的配置信息继承自connector的配置。

查看connector当前状态

# curl -s kafka1:18083/connectors/connector-mssql-online/status | jq
{
  "name": "connector-mssql-online",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.0.31:18083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "192.168.0.31:18083"
    }
  ],
  "type": "source"
}

暂停/重启 Connector

# curl -s -X PUT kafka1:18083/connectors/connector-mssql-online/pause
# curl -s -X PUT kafka1:18083/connectors/connector-mssql-online/resume

删除 Connector

# curl -s -X DELETE kafka1:18083/connectors/connector-mssql-online

从Kafka中读取变动数据

默认情况下,MSSQL Connector会将表的变动写入到:${databaseName}.${tableName} 这个topic中,这个topic的名称可以通过 topic.format 这个用户配置参数中进行设置,因为我们并没有配置,因此,topic的名称为db_name.test_online。

运行下面的控制台脚本,从Kafka中实时读取topic的内容:

# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic db_name.test_online --from-beginning

此时因为没有任何数据,因此控制台会阻塞。

对test_online表进行修改

依次执行下面的增删改语句,对test_online表进行修改:

insert into [test_online](UserName,IsOnline,LastLogin)
values('子阳', 1, DATEDIFF(s, '19700101',GETDATE()))

update test_online Set UserName='吉米' where UserName='子阳'
Delete test_online Where UserName='吉米'

现在查看Kafka读取端控制台,可以看到以Json格式实时收到了数据库变动的消息:

# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic tgstat_ddztest.test_online
{"Id":5,"UserName":"子阳","IsOnline":true,"LastLogin":1540635666,"_cdc_metadata":{"sys_change_operation":"I","sys_change_creation_version":"13","sys_change_version":"13","databaseName":"tgstat_ddztest","schemaName":"dbo","tableName":"test_online"}}
{"Id":5,"UserName":"吉米","IsOnline":true,"LastLogin":1540635666,"_cdc_metadata":{"sys_change_operation":"U","sys_change_creation_version":"0","sys_change_version":"14","databaseName":"tgstat_ddztest","schemaName":"dbo","tableName":"test_online"}}
null

从上面的消息可以看到,对于delete操作,收到了null。对于insert和update操作,收到了详细的变动信息。

至此,我们就配置完了Kafka Connector,并且实时获取到了数据库变更的消息。后续可以使用Spark Stream连接至此Topic,进行实时的数据运算和分析。以后有机会再进行掩饰。

感谢阅读,希望这篇文章能给你带来帮助!

Kafka Connect 实时读取MSSQL数据到Kafka


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Master Switch

The Master Switch

Tim Wu / Knopf / 2010-11-2 / USD 27.95

In this age of an open Internet, it is easy to forget that every American information industry, beginning with the telephone, has eventually been taken captive by some ruthless monopoly or cartel. Wit......一起来看看 《The Master Switch》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具