logstash_output_kafka:Mysql 同步 Kafka 深入详解

栏目: 数据库 · Mysql · 发布时间: 6年前

内容简介:实际业务场景中,会遇到基础数据存在Mysql中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。而mysql写入kafka的选型方案有:方案一:logstash_output_kafka 插件。

0、题记

实际业务场景中,会遇到基础数据存在 Mysql 中,实时写入数据量比较大的情景。迁移至kafka是一种比较好的业务选型方案。

logstash_output_kafka:Mysql 同步 Kafka 深入详解

而mysql写入kafka的选型方案有:

方案一:logstash_output_kafka 插件。

方案二:kafka_connector。

方案三:debezium 插件。

方案四:flume。

方案五:其他类似方案。

其中:debezium和flume是基于 mysql binlog 实现的。

如果需要同步历史全量数据+实时更新数据,建议使用logstash。

1、logstash同步原理

常用的logstash的插件是:logstash_input_jdbc实现关系型数据库到Elasticsearch等的同步。

实际上, 核心logstash的同步原理的掌握 ,有助于大家理解类似的各种库之间的同步。

logstash 核心原理 :输入生成事件,过滤器修改它们,输出将它们发送到其他地方。

logstash核心三部分组成:input、filter、output。

logstash_output_kafka:Mysql 同步 Kafka 深入详解
input { }
filter { }
output { }

1.1 input输入

包含但远不限于:

  1. jdbc:关系型数据库:mysql、oracle等。

  2. file:从文件系统上的文件读取。

  3. syslog:在已知端口514上侦听syslog消息。

  4. redis:redis消息。beats:处理 Beats发送的事件。

  5. kafka:kafka实时数据流。

1.2 filter过滤器

过滤器是Logstash管道中的中间处理设备。您可以将过滤器与条件组合,以便在事件满足特定条件时对其执行操作。

可以把它比作数据处理的 ETL 环节。

一些有用的过滤包括:

  1. grok:解析并构造任意文本。 Grok是目前Logstash中将非结构化日志数据解析为结构化和可查询内容的最佳方式 。有了内置于Logstash的120种模式,您很可能会找到满足您需求的模式!

  2. mutate:对事件字段执行常规转换。您可以重命名,删除,替换和修改事件中的字段。

  3. drop:完全删除事件,例如调试事件。

  4. clone:制作事件的副本,可能添加或删除字段。

  5. geoip:添加有关IP地址的地理位置的信息。

1.3 output输出

输出是Logstash管道的最后阶段。一些常用的输出包括:

  1. elasticsearch:将事件数据发送到Elasticsearch。

  2. file:将事件数据写入磁盘上的文件。

  3. kafka:将事件写入Kafka。

详细的filter demo参考:http://t.cn/EaAt4zP

2、同步Mysql到kafka配置参考

input {
    jdbc {
      jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base"
      jdbc_user => "root"
      jdbc_password => "xxxxxxx"
      jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      #schedule => "* * * * *"
      statement => "SELECT * from news_info WHERE id > :sql_last_value  order by id"
      use_column_value => true
      tracking_column => "id"               tracking_column_type => "numeric"
      record_last_run => true
      last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run"         }

}

filter {
   ruby{
        code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)"
    }
    ruby{
        code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)"
    }
  mutate {
    remove_field => [ "@version" ]
    remove_field => [ "@timestamp" ]
    remove_field => [ "gather_time" ]
    remove_field => [ "publish_time" ]
  }
}

 output {
      kafka {
            bootstrap_servers => "192.168.1.13:9092"
            codec => json_lines
            topic_id => "mytopic"

    }
    file {
            codec => json_lines
            path => "/tmp/output_a.log"
    }
 }

以上内容不复杂,不做细讲。

注意:
Mysql借助logstash同步后,日期类型格式:“2019-04-20 13:55:53”已经被识别为日期格式。

code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)"

是将Mysql中的时间格式转化为时间戳格式。

3、坑总结

3.1 坑1字段大小写问题

from星友:使用logstash同步mysql数据的,因为在jdbc.conf里面没有添加 lowercase_column_names

=> "false"  这个属性,所以logstash默认把查询结果的列明改为了小写,同步进了es,所以就导致es里面看到的字段名称全是小写。

最后总结:es是支持大写字段名称的,问题出在logstash没用好,需要在同步配置中加上 lowercase_column_names => "false"  。记录下来希望可以帮到更多人。

3.2 同步到ES中的数据会不会重复?

想将关系数据库的数据同步至ES中,如果在集群的多台服务器上同时启动logstash。

解读:实际项目中就是没用随机id  使用指定id作为es的_id ,指定id可以是url的md5.这样相同数据就会走更新覆盖以前数据

3.3 相同配置logstash,升级6.3之后不能同步数据。

解读:高版本基于时间增量有优化。

tracking_column_type => "timestamp" 应该是需要指定标识为时间类型,默认为数字类型numeric

3.4 ETL字段统一在哪处理?

解读:可以logstash同步mysql的时候 sql 查询阶段处理,如: select a_value as avalue***

或者filter阶段处理, mutate rename 处理。

mutate {
        rename => ["shortHostname", "hostname" ]
    }

或者kafka阶段借助kafka stream处理。

4、小结

  • 相关配置和同步都不复杂,复杂点往往在于filter阶段的解析还有logstash性能问题。

  • 需要结合实际业务场景做深入的研究和性能分析。

  • 有问题,欢迎留言讨论。

推荐阅读:

1、 实战 | canal 实现Mysql到Elasticsearch实时增量同步

2、 干货 | Debezium实现Mysql到Elasticsearch高效实时同步

3、 一张图理清楚关系型数据库与Elasticsearch同步 http://t.cn/EaAceD3

4、新的实现:http://t.cn/EaAt60O

5、mysql2mysql: http://t.cn/EaAtK7r 6、推荐开源实现: http://t.cn/EaAtjqN

logstash_output_kafka:Mysql 同步 Kafka 深入详解

加入星球,更短时间更快习得更多干货!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Design for Hackers

Design for Hackers

David Kadavy / Wiley / 2011-10-18 / USD 39.99

Discover the techniques behind beautiful design?by deconstructing designs to understand them The term ?hacker? has been redefined to consist of anyone who has an insatiable curiosity as to how thin......一起来看看 《Design for Hackers》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

随机密码生成器
随机密码生成器

多种字符组合密码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试