golang如何使用sarama访问kafka

栏目: Go · 发布时间: 5年前

https://www.jianshu.com/p/3102418e5a7d

golang如何使用sarama访问kafka
golang连接kafka有三种client认证方式:

无认证
TLS认证
SASL/PLAIN认证, (其他SASL/SCRAM, SASL/GSSAPI都不支持)

下面一个客户端代码例子访问kafka服务器,来发送和接受消息。

使用方式


命令行参数

$ ./kafkaclient -h
Usage of ./client:
  -ca string
        CA Certificate (default "ca.pem")
  -cert string
        Client Certificate (default "cert.pem")
  -command string
        consumer|producer (default "consumer")
  -host string
        Common separated kafka hosts (default "localhost:9093")
  -key string
        Client Key (default "key.pem")
  -partition int
        Kafka topic partition
  -password string
        SASL password
  -sasl
        SASL enable
  -tls
        TLS enable
  -topic string
        Kafka topic (default "test--topic")
  -username string
        SASL username


作为producer启动

$ ./kafkaclient -command producer \
  -host kafka1:9092,kafka2:9092

## SASL/PLAIN enable
$ ./kafkaclient -command producer \
  -sasl -username kafkaclient -password kafkapassword \
  -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command producer \
  -tls -cert client.pem -key client.key -ca ca.pem \
  -host kafka1:9093,kafka2:9093

producer发送消息给kafka:
> aaa
2018/12/15 07:11:21 Produced message: [aaa]
> bbb
2018/12/15 07:11:30 Produced message: [bbb]
> quit


作为consumer启动

$ ./kafkaclient -command consumer \
  -host kafka1:9092,kafka2:9092

## SASL/PLAIN enabled
$ ./kafkaclient -command consumer \
  -sasl -username kafkaclient -password kafkapassword \
  -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command consumer \
  -tls -cert client.pem -key client.key -ca ca.pem \
  -host kafka1:9093,kafka2:9093

consumer从kafka接受消息:
2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]


完整源代码如下

这个代码使用到了Shopify/sarama库,请自行下载使用。
$ cat kafkaclient.go
package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "io/ioutil"
    "bufio"
    "strings"

    "crypto/tls"
    "crypto/x509"

    "github.com/Shopify/sarama"
)

var (
    command     string
    hosts       string
    topic       string
    partition   int
    saslEnable  bool
    username    string
    password    string
    tlsEnable   bool
    clientcert  string
    clientkey   string
    cacert      string
)

func main() {
    flag.StringVar(&command,    "command",      "consumer",         "consumer|producer")
    flag.StringVar(&hosts,      "host",         "localhost:9093",   "Common separated kafka hosts")
    flag.StringVar(&topic,      "topic",        "test--topic",      "Kafka topic")
    flag.IntVar(&partition,     "partition",    0,                  "Kafka topic partition")

    flag.BoolVar(&saslEnable,   "sasl",          false,             "SASL enable")
    flag.StringVar(&username,   "username",      "",                "SASL Username")
    flag.StringVar(&password,   "password",      "",                "SASL Password")

    flag.BoolVar(&tlsEnable,    "tls",          false,              "TLS enable")
    flag.StringVar(&clientcert, "cert",         "cert.pem",         "Client Certificate")
    flag.StringVar(&clientkey,  "key",          "key.pem",          "Client Key")
    flag.StringVar(&cacert,     "ca",           "ca.pem",           "CA Certificate")
    flag.Parse()

    config := sarama.NewConfig()
    if saslEnable {
        config.Net.SASL.Enable   = true
        config.Net.SASL.User     = username
        config.Net.SASL.Password = password
    }

    if tlsEnable {
        //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
        tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
        if err != nil {
            log.Fatal(err)
        }

        config.Net.TLS.Enable = true
        config.Net.TLS.Config = tlsConfig
    }

    client, err := sarama.NewClient(strings.Split(hosts, ","), config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    if command == "consumer" {
        consumer, err := sarama.NewConsumerFromClient(client)
        if err != nil {
            log.Fatal(err)
        }
        defer consumer.Close()
        loopConsumer(consumer, topic, partition)
    } else {
        producer, err := sarama.NewAsyncProducerFromClient(client)
        if err != nil {
            log.Fatal(err)
        }
        defer producer.Close()
        loopProducer(producer, topic, partition)
    }
}

func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
    // load client cert
    clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
    if err != nil {
        return nil, err
    }

    // load ca cert pool
    cacert, err := ioutil.ReadFile(cacertfile)
    if err != nil {
        return nil, err
    }
    cacertpool := x509.NewCertPool()
    cacertpool.AppendCertsFromPEM(cacert)

    // generate tlcconfig
    tlsConfig := tls.Config{}
    tlsConfig.RootCAs = cacertpool
    tlsConfig.Certificates = []tls.Certificate{clientcert}
    tlsConfig.BuildNameToCertificate()
 // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
    return &tlsConfig, err
}

func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
    scanner := bufio.NewScanner(os.Stdin)
    fmt.Print("> ")
    for scanner.Scan() {
        text := scanner.Text()
        if text == "" {
        } else if text == "exit" || text == "quit" {
            break
        } else {
            producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
            log.Printf("Produced message: [%s]\n",text)
        }
        fmt.Print("> ")
    }
}

func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
    partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
    if err != nil {
        log.Println(err)
        return
    }
    defer partitionConsumer.Close()

    for {
        msg := <-partitionConsumer.Messages()
        log.Printf("Consumed message: [%s], offset: [%d]\n", msg.Value, msg.Offset)
    }
}

编译:
$ go build kafkaclient.go

以上所述就是小编给大家介绍的《golang如何使用sarama访问kafka》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ajax修炼之道

Ajax修炼之道

(美)哥特兰、高伯瑞斯、艾米亚 / 徐锋,胡冰 / 电子工业出版社 / 2006-4 / 29.8

Ajax将静态Web页面转变为充满交互的应用。现在您不需要牺牲Web应用程序部署的简单性,就可以将“胖”客户端应用程序部署到客户端。不过对于很多人业说,Ajax看起来很难。这就是我们撰写本书的原因。作为实践的指导,本书揭开了Ajax神秘的面纱,教您如何以简单的方式使用Ajax。本书内容覆盖了DHTML、Javascript和闻名已久的XmlHttp Request回调技术的基础知识。您将了解如何将......一起来看看 《Ajax修炼之道》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

随机密码生成器
随机密码生成器

多种字符组合密码

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具