golang nsq延迟消息存在线程不安全

栏目: Go · 发布时间: 6年前

内容简介:前言:

前言:

Golang社区里有个曾经很火的消息队列nsq,现在貌似不热了。这边有很多的业务都依赖于nsq, python和golang项目还好说,直接TCP长连接挂上去。php是通过nsqd提供的http接口来投递数据。 nsq有两种投递发布功能,一种是常规的消息推送(PublishMsg),另一个是延迟消息推送(DeferredPublish),比如我们可以指定一个任务在多久之后才可以被被消费。

说正题,在用nsq的中间遇到了一个问题,就是并发安全的问题。根据我的各方面测试,普通的PublishMsg并发推送是没有问题的,哪怕多协程粗暴的共用一个连接。但是延迟推送(DeferredPublish)是存在延迟效果失效的问题。 不管是多协程共用一个连接,还是一个协程绑定一个连接,延迟消息都存在问题的。当然只是延迟时间失效,但是消息不会丢失。

2018-11-02 更新

问题已经修复了。

我分析的源代码是nsq的relase版本,但是线上延迟消息异常的nsq版本是rc版本的,不知道哪个神人配置的。。。  至此,nsq的延迟消息问题解决了。。。更新nsq服务端的版本就OK了。 下面的文章其实看不看都可以了….

golang nsq延迟消息存在线程不安全

测试

// xiaorui.cc
 
import (
	"fmt"
	"sync"
	"time"
 
	"github.com/bitly/go-nsq"
)
 
var lock sync.Mutex
var incrlock sync.Mutex
var counter int
 
func main() {
	config := nsq.NewConfig()
	q, _ := nsq.NewConsumer("aa", "ch", config)
	q.AddHandler(nsq.HandlerFunc(handle))
	q.ConnectToNSQD("xxx")
 
	delayTS := time.Duration(20) * time.Second
	for index := 0; index < 20; index++ {
		val := fmt.Sprintf("hello id: %d", index)
		go func(idx string) {
			conf := nsq.NewConfig()
			conn, _ := nsq.NewProducer("xxxx", conf)
			conn.DeferredPublish("aa", delayTS, []byte(idx))
            // conn.Close()
		}(val)
 
		// ok
		// go tool.Nsq.PublishMsg("aa", val)
	}
 
	go func() {
		for {
			fmt.Println(counter)
			time.Sleep(1 * time.Second)
		}
	}()
	select {}
}
 
func handle(msg *nsq.Message) error {
	incrlock.Lock()
	defer incrlock.Unlock()
 
	counter++
	fmt.Println("recv new msg: ", string(msg.Body), time.Now().String())
	return nil
}

下面是tcpdump的抓包,我们能看到确实多个客户端连接,包里面能看到 DPUB的命令及后面的时间戳参数。但问题来了,id:.17这个任务没有了延迟效果, 立马就可以收到。

// xiaorui.cc
 
14:00:32.618457 IP 192.168.116.205.53061 > xxxx: Flags [P.], seq 339:371, ack 281, win 8192, length 32
    0x0000:  4500 0048 0000 4000 4006 9dc5 c0a8 74cd  E..H..@.@.....t.
    0x0010:  2f68 380d cf45 1036 f816 c627 7e33 bda7  /h8..E.6...'~3..
    0x0020:  5018 2000 a164 0000 4450 5542 2061 6120  P....d..DPUB.aa.
    0x0030:  3230 3030 300a 0000 000e 2268 656c 6c6f  20000....."hello
    0x0040:  2069 643a 2031 3122                      .id:.11"
14:00:32.618458 IP 192.168.116.205.53064 > xxxx: Flags [P.], seq 339:371, ack 281, win 8192, length 32
    0x0000:  4500 0048 0000 4000 4006 9dc5 c0a8 74cd  E..H..@.@.....t.
    0x0010:  2f68 380d cf48 1036 afe5 ff88 6441 69ec  /h8..H.6....dAi.
    0x0020:  5018 2000 17df 0000 4450 5542 2061 6120  P.......DPUB.aa.
    0x0030:  3230 3030 300a 0000 000e 2268 656c 6c6f  20000....."hello
    0x0040:  2069 643a 2031 3722                      .id:.17"
14:00:32.618496 IP 192.168.116.205.53059 > xxxx: Flags [P.], seq 339:370, ack 281, win 8192, length 31
    0x0000:  4500 0047 0000 4000 4006 9dc6 c0a8 74cd  E..G..@.@.....t.
    0x0010:  2f68 380d cf43 1036 28a5 719a 065d 678b  /h8..C.6(.q..]g.
    0x0020:  5018 2000 a275 0000 4450 5542 2061 6120  P....u..DPUB.aa.
    0x0030:  3230 3030 300a 0000 000d 2268 656c 6c6f  20000....."hello
    0x0040:  2069 643a 2038 22                        .id:.8"
...
...
...

但还是出现了问题… 

分析go nsq源码

遇到这么诡异的问题,git issue里没人说明。只能看go nsq客户端的源码了, 我们发现其实他的逻辑很简单,每个nsq客户端连接都会 go 一个router()的协程,这个协程是可以保证消息推送的原子性。  Publish和DeferredPublish调用的都是sendCommandAsync函数, sendCommandAsync会把命令cmd结构发到一个channel里,然后由客户端自己的router协程去消费。 另外可以注意到,WriteCommand往socket write数据的时候,也会尝试拿本客户端相关的锁。也就是说,go nsq客户端看起来从两个方面保证了协程 | 线程安全。

// xiaorui.cc
 
 
func (w *Producer) Publish(topic string, body []byte) error {
	return w.sendCommand(Publish(topic, body))
}
 
func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
	return w.sendCommand(DeferredPublish(topic, delay, body))
}
 
func (w *Producer) sendCommand(cmd *Command) error {
    doneChan := make(chan *ProducerTransaction)
    err := w.sendCommandAsync(cmd, doneChan, nil)
    if err != nil {
        close(doneChan)
        return err
    }
    t := <-doneChan
    return t.Error
}
 
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
    ...
    if atomic.LoadInt32(&w.state) != StateConnected {
        err := w.connect()
        if err != nil {
            return err
        }
    }
 
    t := &ProducerTransaction{
        cmd:      cmd,
        doneChan: doneChan,
        Args:     args,
    }
 
    select {
    case w.transactionChan <- t:
    case <-w.exitChan:
        return ErrStopped
    }
 
    return nil
}
 
func (w *Producer) connect() error {
	w.guard.Lock()
	defer w.guard.Unlock()
    ...
	go w.router()
 
	return nil
}
 
func (w *Producer) router() {
    for {
        select {
        case t := <-w.transactionChan:
            w.transactions = append(w.transactions, t)
            err := w.conn.WriteCommand(t.cmd)
            if err != nil {
                w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
                w.close()
            }
    ...
}
 
 
func (c *Conn) WriteCommand(cmd *Command) error {
    c.mtx.Lock()
 
    fmt.Printf("%v cccc %v \n", cmd, string(cmd.Body))
    _, err := cmd.WriteTo(c)
    if err != nil {
        goto exit
    }
    err = c.Flush()
 
exit:
    c.mtx.Unlock()
    if err != nil {
        c.log(LogLevelError, "IO error - %s", err)
        c.delegate.OnIOError(c, err)
    }
    return err
}
 
// xiaorui.cc

很奇怪在多协程多连接下存在异常,go nsq client源码看起来又是合理的,没有发现奇怪的逻辑。 我尝试在所有的DeferredPublish逻辑前后共用一把全局锁,居然可以了。 那么说明,貌似存在并发投递下的消息被串改写问题。 但接下来我对这两个情况进行tcpdump抓包对比,发现很有意思的事情。加锁,所有的延迟任务是ok的,不加锁,有一些任务没有延迟效果,但是消息不丢。但通过抓包看到他们的body内容是一样的,没有什么字段被丢失或者覆盖。 记得以前给go-nsq和nsq都发过issue,询问他们是否可以加dpub和DeferredPublish的multi批量方法,最后没回复我,不知道有没有一些关联。

nsqd server 延迟消息源码

我们再来看看nsqd延迟消息的实现原理。nsq对于延迟消息没有用高大上的手段,直接存到本地内存的优先级队列里,因为没有持久化,所以逻辑相对简单。 当nsqd节点发生异常crash,那么数据自然丢失。下面nsqd的源码描述的很清楚,对于普通消息来说,当memoryMsgChan满了后会落盘持久化。对于延迟消息来说,直接就是内存里的优先级队列,没有做什么持久化方案。疑惑的是,这个事情在nsq的文档里没有标注。

// xiaorui.cc
 
// PutMessage writes a Message to the queue
func (c *Channel) PutMessage(m *Message) error {
	c.RLock()
	defer c.RUnlock()
	if c.Exiting() {
		return errors.New("exiting")
	}
	err := c.put(m)
	if err != nil {
		return err
	}
	atomic.AddUint64(&c.messageCount, 1)
	return nil
}
 
func (c *Channel) put(m *Message) error {
	select {
	case c.memoryMsgChan <- m:
	default:
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, c.backend)
		bufferPoolPut(b)
		c.ctx.nsqd.SetHealth(err)
		if err != nil {
			c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
				c.name, err)
			return err
		}
	}
	return nil
}
 
func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
	atomic.AddUint64(&c.messageCount, 1)
	c.StartDeferredTimeout(msg, timeout)
}
 
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
	absTs := time.Now().Add(timeout).UnixNano()
	item := &pqueue.Item{Value: msg, Priority: absTs}
	err := c.pushDeferredMessage(item)
	if err != nil {
		return err
	}
	c.addToDeferredPQ(item)
	return nil
}

需要关注的是nsqd对于存放延迟消息的优先级队列操作都有加锁。看起来也没有问题。

// xiaorui.cc
 
func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
	c.deferredMutex.Lock()
	heap.Push(&c.deferredPQ, item)
	c.deferredMutex.Unlock()
}
 
 
func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
	c.deferredMutex.Lock()
	id := item.Value.(*Message).ID
	_, ok := c.deferredMessages[id]
    ...
	c.deferredMutex.Unlock()
	return nil
}

上面有说过,只需要在go-nsq客户端发送延迟消息时,统一加一把锁,让操作串行化就可以解决延迟失效的问题。问题又来了,在集群环境中又出现该问题了, 简单说单机测试脚本是没有问题,但并发执行脚本就又有出现延迟失效的问题。我个人怀疑是nsq的问题,但是nsqd服务端没有任何错误日志。

// xiaorui.cc
 
go func(idx string) {
    conf := nsq.NewConfig()
    conn, _ := nsq.NewProducer("ip:port", conf)
    lock.Lock() // 加锁
    conn.DeferredPublish("aa", delayTS, []byte(idx))
    lock.Unlock()
}(val)

最后的解决方法

个人能力有限,搞不定这nsq问题。索性直接抛弃nsq延迟消息的方案,使用redis zset自己实现一个延迟消息机制。 redis最少比nsq的持久化机制靠谱一些,另外nsq延迟消息不能主动去删除,只能等待消费ack删除。而 redis 是可以直接zrem删除的。

总结:

现在不确定是go nsq客户端还是nsqd server的问题,单纯看源代码是没什么问题。其实确定是不是客户端的问题,可以用其他原因的客户端,尴尬的是本想用python nsq client测试一波,但pynsq太难用了。好吧,知道怎么该解决问题的朋友可以联系我,也让我学习学习。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Cracking the Coding Interview

Cracking the Coding Interview

Gayle Laakmann McDowell / CareerCup / 2015-7-1 / USD 39.95

Cracking the Coding Interview, 6th Edition is here to help you through this process, teaching you what you need to know and enabling you to perform at your very best. I've coached and interviewed hund......一起来看看 《Cracking the Coding Interview》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

html转js在线工具
html转js在线工具

html转js在线工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具