Golang调用RabbitMQ的案例

栏目: Go · 发布时间: 5年前

内容简介:参考链接1、安装Golang 1.10.1 版本

参考链接

https://mshk.top/2016/07/ubuntu-rabbitmq-golang/

1、安装Golang 1.10.1 版本

1.1、创建 go 的工作环境

注意:我将go的工作环境放在了$HOME/gowork

root@rabbitmq-1:~# mkdir -p $HOME/gowork

root@rabbitmq-1:~# cd $HOME/gowork

root@rabbitmq-1:~/gowork# mkdir src pkg bin

1.2、下载安装包并解压至/usr/local/ 下

root@rabbitmq-1:~/gowork# wget [https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz](https://storage.googleapis.com/golang/go1.10.1.linux-amd64.tar.gz)

root@rabbitmq-1:~/gowork# tar -C /usr/local -xzf go1.10.1.linux-amd64.tar.gz

1.3、添加环境变量

在~/.bahsrc文件中添加下面内容:

root@rabbitmq-1:/usr/local/go# vim ~/.bashrc
export GOROOT=/usr/local/go //go的安装目录。也就是刚才指定的路径
export GOPATH=$HOME/gowork //这里的路径是你go语言的工作环境,按照自己的路径配置。
export GOBIN=$GOPATH/bin //编译后的可执行文件存放的目录
export PATH=$GOPATH:$GOBIN:$GOROOT:$PATH //添加进PATH路径

注意 :这个地方一定要配置对,配置仔细,要不然在启动网络的时候会出现找不到文件的问题。

使环境变量生效

root@rabbitmq-1:/usr/local/go# source ~/.bashrc

查看是否安装成功

root@rabbitmq-1:/usr/local/go/bin# /usr/local/go/bin/go version
go version go1.10.1 linux/amd64

2、Golang调用RabbitMQ的案例

我们先将包下载到本地,然后就可以直接使用了:

root@rabbitmq-1:~# /usr/local/go/bin/go get github.com/streadway/amqp

2.1、使用Golang来发送第一个hello idoall.org

在第一个教程中,我们写程序从一个命名的队列(test-idoall-queues)中发送和接收消息。

producer_hello.go(消息生产者):

root@rabbitmq-1:~# vim producer_hello.go

package main

import (

 "fmt"

 "log"

 "github.com/streadway/amqp"

)

const (

 //AMQP URI

 uri = "amqp://guest:guest@rabbitmq-1:5672/" # rabbitmq-1为主机名

 //Durable AMQP exchange name

 exchangeName = ""

 //Durable AMQP queue name

 queueName = "test-idoall-queues"

 //Body of message

 bodyMsg string = "hello idoall.org"

)

//如果存在错误,则输出

func failOnError(err error, msg string) {

 if err != nil {

 log.Fatalf("%s: %s", msg, err)

 panic(fmt.Sprintf("%s: %s", msg, err))

 }

}

func main(){

 //调用发布消息函数

 publish(uri, exchangeName, queueName, bodyMsg)

 log.Printf("published %dB OK", len(bodyMsg))

}

//发布者的方法

//

//@amqpURI, amqp的地址

//@exchange, exchange的名称

//@queue, queue的名称

//@body, 主体内容

func publish(amqpURI string, exchange string, queue string, body string){

 //建立连接

 log.Printf("dialing %q", amqpURI)

 connection, err := amqp.Dial(amqpURI)

 failOnError(err, "Failed to connect to RabbitMQ")

 defer connection.Close()

 //创建一个Channel

 log.Printf("got Connection, getting Channel")

 channel, err := connection.Channel()

 failOnError(err, "Failed to open a channel")

 defer channel.Close()

 log.Printf("got queue, declaring %q", queue)

 //创建一个queue

 q, err := channel.QueueDeclare(

 queueName, // name

 false, // durable

 false, // delete when unused

 false, // exclusive

 false, // no-wait

 nil, // arguments

 )

 failOnError(err, "Failed to declare a queue")

 log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

 // Producer只能发送到exchange,它是不能直接发送到queue的。

 // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。

 // routing_key就是指定的queue名字。

 err = channel.Publish(

 exchange, // exchange

 q.Name, // routing key

 false, // mandatory

 false, // immediate

 amqp.Publishing {

 Headers: amqp.Table{},

 ContentType: "text/plain",

 ContentEncoding: "",

 Body: []byte(body),

 })

 failOnError(err, "Failed to publish a message")

}

consumer_hello.go(消息消费者):

root@rabbitmq-1:~# vim consumer_hello.go

package main

import (

 "fmt"

 "log"

 "github.com/streadway/amqp"

)

const (

 //AMQP URI

 uri = "amqp://guest:guest@rabbitmq-1:5672/"

 //Durable AMQP exchange nam

 exchangeName = ""

 //Durable AMQP queue name

 queueName = "test-idoall-queues"

)

//如果存在错误,则输出

func failOnError(err error, msg string) {

 if err != nil {

 log.Fatalf("%s: %s", msg, err)

 panic(fmt.Sprintf("%s: %s", msg, err))

 }

}

func main(){

 //调用消息接收者

 consumer(uri, exchangeName, queueName)

}

//接收者方法

//

//@amqpURI, amqp的地址

//@exchange, exchange的名称

//@queue, queue的名称

func consumer(amqpURI string, exchange string, queue string){

 //建立连接

 log.Printf("dialing %q", amqpURI)

 connection, err := amqp.Dial(amqpURI)

  failOnError(err, "Failed to connect to RabbitMQ")

 defer connection.Close()

 //创建一个Channel

 log.Printf("got Connection, getting Channel")

 channel, err := connection.Channel()

 failOnError(err, "Failed to open a channel")

 defer channel.Close()

 log.Printf("got queue, declaring %q", queue)

 //创建一个queue

 q, err := channel.QueueDeclare(

 queueName, // name

 false, // durable

 false, // delete when unused

 false, // exclusive

 false, // no-wait

 nil, // arguments

 )

 failOnError(err, "Failed to declare a queue")

 log.Printf("Queue bound to Exchange, starting Consume")

 //订阅消息

 msgs, err := channel.Consume(

 q.Name, // queue

 "", // consumer

 true, // auto-ack

 false, // exclusive

 false, // no-local

 false, // no-wait

 nil, // args

 )

 failOnError(err, "Failed to register a consumer")

 //创建一个channel

 forever := make(chan bool)

 //调用gorountine

 go func() {

 for d := range msgs {

 log.Printf("Received a message: %s", d.Body)

 }

 }()

 log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

 //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出

 <-forever

}

查看消息(开打两个控制台console)

Console1(运行producer):

root@rabbitmq-1:~# /usr/local/go/bin/go run producer_hello.go

2018/12/14 03:41:32 dialing "amqp://guest:guest@rabbitmq-1:5672/"

2018/12/14 03:41:32 got Connection, getting Channel

2018/12/14 03:41:32 got queue, declaring "test-idoall-queues"

2018/12/14 03:41:32 declared queue, publishing 16B body ("hello idoall.org")

2018/12/14 03:41:32 published 16B OK

然后运行一下命令,可以看到我们刚才创建的queues在列表中

root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues

Listing queues

test-idoall-queues  1

Console2(运行consumer)打印消息到屏幕,可以看到刚才我们通过producer发送的消息hello idoall.org

root@rabbitmq-1:/usr/sbin# /usr/local/go/bin/go run /root/consumer_hello.go

2018/12/14 03:46:01 dialing "amqp://guest:guest@rabbitmq-1:5672/"

2018/12/14 03:46:01 got Connection, getting Channel

2018/12/14 03:46:01 got queue, declaring "test-idoall-queues"

2018/12/14 03:46:01 Queue bound to Exchange, starting Consume

2018/12/14 03:46:01 [*] Waiting for messages. To exit press CTRL+C

2018/12/14 03:46:01 Received a message: hello idoall.org

此时,web界面上也出现了信息。

Golang调用RabbitMQ的案例

image.png

2.2、Rabbitmq的任务分发机制

在2.1章节中,我们写程序从一个命名的队列中发送和接收消息。在这个章节中,我们将创建一个工作队列,将用于分配在多个工人之间的耗时的任务。

RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果任务队伍过多,那么只需要创建更多的Consumer来进行任务处理即可。

producer_task.go(消息生产者):

root@rabbitmq-1:~# vim producer_task.go
package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri          =  "amqp://guest:guest@rabbitmq-1:5672/"
  //Durable AMQP exchange name
  exchangeName =  ""
  //Durable AMQP queue name
  queueName    = "test-idoall-queues-task"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //调用发布消息函数
  publish(uri, exchangeName, queueName, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
  //建立连接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()

  //创建一个Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()

  log.Printf("got queue, declaring %q", queue)

  //创建一个queue
  q, err := channel.QueueDeclare(
    queueName, // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

  // Producer只能发送到exchange,它是不能直接发送到queue的。
  // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
  // routing_key就是指定的queue名字。
  err = channel.Publish(
    exchange,     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_task.go(消息消费者)

root@rabbitmq-1:~# vim consumer_task.go
package main

import (
  "fmt"
  "log"
  "bytes"
  "time"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@rabbitmq-1:5672/"
  //Durable AMQP exchange nam
  exchangeName  = ""
  //Durable AMQP queue name
  queueName     = "test-idoall-queues-task"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //调用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
  //建立连接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()

  //创建一个Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()

  log.Printf("got queue, declaring %q", queue)

  //创建一个queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //订阅消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //创建一个channel
  forever := make(chan bool)

  //调用gorountine
  go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
  <-forever
}

查看结果

Console1(consumer):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:28:59 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:28:59 got Connection, getting Channel
2018/12/14 05:28:59 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:28:59 Queue bound to Exchange, starting Consume
2018/12/14 05:28:59  [*] Waiting for messages. To exit press CTRL+C

Console2(consumer):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:29:04 got Connection, getting Channel
2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:29:04 Queue bound to Exchange, starting Consume
2018/12/14 05:29:04  [*] Waiting for messages. To exit press CTRL+C

在第三个窗口,这个时候我们使用Producer 来 Publish Message:

root@rabbitmq-1:~# /usr/local/go/bin/go run producer_task.go First message. &&/usr/local/go/bin/go run producer_task.go Second message.. && /usr/local/go/bin/go run producer_task.go Third message... && /usr/local/go/bin/go run producer_task.go Fourth message.... && /usr/local/go/bin/go run producer_task.go Fifth message.....
2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:16 got Connection, getting Channel
2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:16 declared queue, publishing 14B body ("First message.")
2018/12/14 05:34:16 published 14B OK
2018/12/14 05:34:16 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:16 got Connection, getting Channel
2018/12/14 05:34:16 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:16 declared queue, publishing 16B body ("Second message..")
2018/12/14 05:34:16 published 16B OK
2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:17 got Connection, getting Channel
2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:17 declared queue, publishing 16B body ("Third message...")
2018/12/14 05:34:17 published 16B OK
2018/12/14 05:34:17 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:17 got Connection, getting Channel
2018/12/14 05:34:17 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:17 declared queue, publishing 18B body ("Fourth message....")
2018/12/14 05:34:17 published 18B OK
2018/12/14 05:34:18 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:34:18 got Connection, getting Channel
2018/12/14 05:34:18 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:34:18 declared queue, publishing 18B body ("Fifth message.....")
2018/12/14 05:34:18 published 18B OK

这时我们再看刚才打开的两个Consumer的结果:

Console1(consumer):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:32:36 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:32:36 got Connection, getting Channel
2018/12/14 05:32:36 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:32:36 Queue bound to Exchange, starting Consume
2018/12/14 05:32:36  [*] Waiting for messages. To exit press CTRL+C
2018/12/14 05:34:16 Received a message: Second message..
2018/12/14 05:34:18 Done
2018/12/14 05:34:18 Received a message: Fourth message....
2018/12/14 05:34:22 Done

Console2(consumer):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_task.go
2018/12/14 05:29:04 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 05:29:04 got Connection, getting Channel
2018/12/14 05:29:04 got queue, declaring "test-idoall-queues-task"
2018/12/14 05:29:04 Queue bound to Exchange, starting Consume
2018/12/14 05:29:04  [*] Waiting for messages. To exit press CTRL+C
2018/12/14 05:34:16 Received a message: First message.
2018/12/14 05:34:17 Done
2018/12/14 05:34:17 Received a message: Third message...
2018/12/14 05:34:20 Done
2018/12/14 05:34:20 Received a message: Fifth message.....
2018/12/14 05:34:25 Done

默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin,也叫消息轮询

Web页面情况

Golang调用RabbitMQ的案例

image.png

2.3、Message acknowledgment 消息确认

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们的代码,一旦RabbitMQ Server发送给Consumer消息后,会立即把这个Message标记为完成,然后从queue中删除。我们将无法再操作这个尚未处理完成的消息。

实际场景中,如果一个Consumer异常退出了,我们希望它处理的数据能够被另外的Consumer处理,这样数据在这种情况下(通道关闭、连接关闭、TCP连接丢失等情况)就不会丢失了。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,ack(nowledgments)是从Consumer消费后发送到一个特定的消息告诉RabbitMQ已经收到、处理结束,RabbitMQ可以去安全的删除它了。

如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message重新排进队列,发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

消息确认默认是关闭的,我们需要通过,d.ACK(false)来告诉RabbitMQ我们已经完成任务。

producer_acknowledgments(消息生产者).go:

package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri          =  "amqp://guest:guest@rabbitmq-1:5672/"
  //Durable AMQP exchange name
  exchangeName =  ""
  //Durable AMQP queue name
  queueName    = "test-idoall-queues-acknowledgments"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //调用发布消息函数
  publish(uri, exchangeName, queueName, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//发布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string){
  //建立连接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()

  //创建一个Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()

  log.Printf("got queue, declaring %q", queue)

  //创建一个queue
  q, err := channel.QueueDeclare(
    queueName, // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

  // Producer只能发送到exchange,它是不能直接发送到queue的。
  // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
  // routing_key就是指定的queue名字。
  err = channel.Publish(
    exchange,     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}
  

consumer_acknowledgments(消息消费者).go

package main

import (
  "fmt"
  "log"
  "bytes"
  "time"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@rabbitmq-1:5672/"
  //Durable AMQP exchange nam
  exchangeName  = ""
  //Durable AMQP queue name
  queueName     = "test-idoall-queues-acknowledgments"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //调用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string){
  //建立连接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()

  //创建一个Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()

  log.Printf("got queue, declaring %q", queue)

  //创建一个queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //订阅消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //创建一个channel
  forever := make(chan bool)

  //调用gorountine
  go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
        d.Ack(false)
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
  <-forever
}

查看结果

我们先使用Producer来发送一列消息:

root@rabbitmq-1:~# /usr/local/go/bin/go run producer_acknowledgments.go First message. &&/usr/local/go/bin/go run producer_acknowledgments.go Second message.. && /usr/local/go/bin/go run producer_acknowledgments.go Third message... && /usr/local/go/bin/go run producer_acknowledgments.go Fourth message.... && /usr/local/go/bin/go run producer_acknowledgments.go Fifth message.....
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:10 got Connection, getting Channel
2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:10 declared queue, publishing 14B body ("First message.")
2018/12/14 07:30:10 published 14B OK
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:10 got Connection, getting Channel
2018/12/14 07:30:10 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:10 declared queue, publishing 16B body ("Second message..")
2018/12/14 07:30:10 published 16B OK
2018/12/14 07:30:10 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 16B body ("Third message...")
2018/12/14 07:30:11 published 16B OK
2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 18B body ("Fourth message....")
2018/12/14 07:30:11 published 18B OK
2018/12/14 07:30:11 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:11 got Connection, getting Channel
2018/12/14 07:30:11 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:11 declared queue, publishing 18B body ("Fifth message.....")
2018/12/14 07:30:11 published 18B OK

通过rabbitmqctl命令,来看下messages_unacknowledged的情况:

root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
test-idoall-queues-task 0   0
test-idoall-queues-acknowledgments  5   0
test-idoall-queues  0   0

使用Consumer来订阅消息操作到第四条的时候,我们按CTRL+C退出,这个时候相当于消息已经被读取,但是未发送d.ACK(false):

root@rabbitmq-1:~# /usr/local/go/bin/go run consumer_acknowledgments.go 
2018/12/14 07:30:25 dialing "amqp://guest:guest@rabbitmq-1:5672/"
2018/12/14 07:30:25 got Connection, getting Channel
2018/12/14 07:30:25 got queue, declaring "test-idoall-queues-acknowledgments"
2018/12/14 07:30:25 Queue bound to Exchange, starting Consume
2018/12/14 07:30:25  [*] Waiting for messages. To exit press CTRL+C
2018/12/14 07:30:25 Received a message: First message.
2018/12/14 07:30:26 Done
2018/12/14 07:30:26 Received a message: Second message..
2018/12/14 07:30:28 Done
2018/12/14 07:30:28 Received a message: Third message...
2018/12/14 07:30:31 Done
2018/12/14 07:30:31 Received a message: Fourth message....
^Csignal: interrupt

再通过rabbitmqctl命令可以看到,还是有2条消息未处理

root@rabbitmq-1:/usr/sbin# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
test-idoall-queues-task 0   0
test-idoall-queues-acknowledgments  2   0
test-idoall-queues  0   0

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

精通正则表达式

精通正则表达式

弗里德尔 / 东南大学出版社 / 2005-9 / 72.00元

正则表达式是一种用来操作文本和数据的强大工具。近年来,它们快速广泛传播,并被多种流行工具和语言作为标准特性提供,如Perl、Java、VB.NET、C#(及任何使用.NET框架的语言)、PHP、Python、Ruby、Tcl、MySQL、awk、Emacs等。 如果还未使用过正则表达式,从本书中您将发现一个掌控数据的全新世界。如果使用过它们,您将会充分意识到本书空前的深度和广度。如果您认为自己已经......一起来看看 《精通正则表达式》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

URL 编码/解码
URL 编码/解码

URL 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器