[Golang 游戏leaf系列(五) chanrpc三种调用模式

栏目: Go · 发布时间: 4年前

内容简介:在看Client里的属性,有同步调用chan,异步调用chan,还有一个异步记数器。然后就是一些方法:也就是Open方法传入的参数,决定了这两个通道的长度。

Golang 游戏leaf系列(三) NewAgent在chanrpc和skeleton中怎么通讯 (下文简称系列三)中,主要是分析了Server结构体,并没有涉及Client。本文将深入分析剩余部分。

//chanrpc.go
type RetInfo struct {
    ret interface{}
    err error
    cb interface{}
}

type CallInfo struct {
    f       interface{}
    args    []interface{}
    chanRet chan *RetInfo
    cb      interface{}
}

type Server struct {
    functions map[interface{}]interface{}
    ChanCall  chan *CallInfo
}

type Client struct {
    s               *Server
    chanSyncRet     chan *RetInfo
    ChanAsynRet     chan *RetInfo
    pendingAsynCall int
}

看Client里的属性,有同步调用chan,异步调用chan,还有一个异步记数器。然后就是一些方法:

func (s *Server) Open(l int) *Client {
    c := NewClient(l)
    c.Attach(s)
    return c
}

func NewClient(l int) *Client {
    c := new(Client)
    c.chanSyncRet = make(chan *RetInfo, 1)
    c.ChanAsynRet = make(chan *RetInfo, l)
    return c
}

func (c *Client) Attach(s *Server) {
    c.s = s
}

也就是Open方法传入的参数,决定了这两个通道的长度。

一、结合example_test.go来看一下使用方式

1.支持的参数类型

在源码中,有很多f0,f1,fn,相应的也有call0,call1,calln这些写法。实际上,针对的是不同参数类型:

//参数为[]interface,无返回值
func([]interface{})      
                     
//参数为[]interface,返回值为interface
func([]interface{}) interface{}           

//参数为[]interface,返回值为[]interface
func([]interface{}) []interface{}

2.注册

s := chanrpc.NewServer(10)

    var wg sync.WaitGroup
    wg.Add(1)

    // goroutine 1
    go func() {
        s.Register("f0", func(args []interface{}) {

        })

        s.Register("f1", func(args []interface{}) interface{} {
            return 1
        })

        s.Register("fn", func(args []interface{}) []interface{} {
            return []interface{}{1, 2, 3}
        })

        s.Register("add", func(args []interface{}) interface{} {
            n1 := args[0].(int)
            n2 := args[1].(int)
            return n1 + n2
        })

        wg.Done()

        for {
            s.Exec(<-s.ChanCall)
        }
    }()

    wg.Wait()
    wg.Add(1)

“add”那个,也算是一个Call1类型的,然后 s.Exec(<-s.ChanCall) 就相当于在等着ChanCall来数据了。

3.call系列

// goroutine 2
    go func() {
        c := s.Open(10)

        // sync
        err := c.Call0("f0")
        if err != nil {
            fmt.Println(err)
        }

        r1, err := c.Call1("f1")
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(r1)
        }

        rn, err := c.CallN("fn")
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(rn[0], rn[1], rn[2])
        }

        ra, err := c.Call1("add", 1, 2)
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(ra)
        }

        // asyn
        c.AsynCall("f0", func(err error) {
            if err != nil {
                fmt.Println(err)
            }
        })

        c.AsynCall("f1", func(ret interface{}, err error) {
            if err != nil {
                fmt.Println(err)
            } else {
                fmt.Println(ret)
            }
        })

        c.AsynCall("fn", func(ret []interface{}, err error) {
            if err != nil {
                fmt.Println(err)
            } else {
                fmt.Println(ret[0], ret[1], ret[2])
            }
        })

        c.AsynCall("add", 1, 2, func(ret interface{}, err error) {
            if err != nil {
                fmt.Println(err)
            } else {
                fmt.Println(ret)
            }
        })

        c.Cb(<-c.ChanAsynRet)
        c.Cb(<-c.ChanAsynRet)
        c.Cb(<-c.ChanAsynRet)
        c.Cb(<-c.ChanAsynRet)

        // go
        s.Go("f0")

        wg.Done()
    }()

    wg.Wait()

二、call系列

func (c *Client) Call0(id interface{}, args ...interface{}) error {
    f, err := c.f(id, 0)
    if err != nil {
        return err
    }

    err = c.call(&CallInfo{
        f:       f,
        args:    args,
        chanRet: c.chanSyncRet,
    }, true)
    if err != nil {
        return err
    }

    ri := <-c.chanSyncRet
    return ri.err
}

c.f(id,0)这个在call系列里都有,主要工作就是确认通过Register注册的function,参数的数量和Call系列是否一致。

然后就是真的要去执行了,这时候对比发现,call系列的执行都是一样的:

err = c.call(&CallInfo{
        f:       f,
        args:    args,
        chanRet: c.chanSyncRet,
    }, true)

func (c *Client) call(ci *CallInfo, block bool) (err error) {
    defer func() {
        if r := recover(); r != nil {
            err = r.(error)
        }
    }()

    if block {
        c.s.ChanCall <- ci
    } else {
        select {
        case c.s.ChanCall <- ci:
        default:
            err = errors.New("chanrpc channel full")
        }
    }
    return
}

看一下CallInfo

type CallInfo struct {
    f       interface{}
    args    []interface{}
    chanRet chan *RetInfo
    cb      interface{}
}

发现call系列没有给cb,这说明它们是不需要回调的。然后block传的全是true,说明call系列会直接向ChanCall写入callinfo。

example_test.go里call的太多了,可以先尝试运行其中一个

ra, err := c.Call1("add", 1, 2)
        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Println(ra)
        }

输出的是3,上面一直没提返回值的事情,就Call1来看,ra返回的正是结果3。其实看源码,是通过 ri := <-c.chanSyncRet 拿到这个值的。也就是说, s.Exec(<-s.ChanCall) 在拿到callinfo后,会写到chanSyncRet里。

在exec里有这样的代码:

// execute
    switch ci.f.(type) {
    case func([]interface{}):
        ci.f.(func([]interface{}))(ci.args)
        return s.ret(ci, &RetInfo{})
    case func([]interface{}) interface{}:
        ret := ci.f.(func([]interface{}) interface{})(ci.args)
        return s.ret(ci, &RetInfo{ret: ret})
    case func([]interface{}) []interface{}:
        ret := ci.f.(func([]interface{}) []interface{})(ci.args)
        return s.ret(ci, &RetInfo{ret: ret})
    }

ret方法是这样的:

func (s *Server) ret(ci *CallInfo, ri *RetInfo) (err error) {
    if ci.chanRet == nil {
        return
    }

    defer func() {
        if r := recover(); r != nil {
            err = r.(error)
        }
    }()

    ri.cb = ci.cb
    ci.chanRet <- ri
    return
}

三、异步AsynCall

现在来看一下异步调用AsynCall

c.AsynCall("add", 1, 2, func(ret interface{}, err error) {
            if err != nil {
                fmt.Println(err)
            } else {
                fmt.Println(ret)
            }
        })

这里在参数的最末尾,传入一个回调function,然后流程和同步类似

func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n int) {
    f, err := c.f(id, n)
    if err != nil {
        c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
        return
    }

    err = c.call(&CallInfo{
        f:       f,
        args:    args,
        chanRet: c.ChanAsynRet,
        cb:      cb,
    }, false)
    if err != nil {
        c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
        return
    }
}

只不过black参数是false的。

if block {
        c.s.ChanCall <- ci
    } else {
        select {
        case c.s.ChanCall <- ci:
        default:
            err = errors.New("chanrpc channel full")
        }
    }

再来对比一下,如果s.ChanCall读取太慢,已经写不进去了,在同步模式下,这个callinfo会一直阻塞等在那里;而异步模式,会走到default分支,也就是报个错。然后异步模式会把结果都存到ChanAsynRet里

if err != nil {
        c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
        return
    }

当然如果存不下,还在调用AsynCall,它会直接在AsynCall里执行回调

...
    // too many calls
    if c.pendingAsynCall >= cap(c.ChanAsynRet) {
        execCb(&RetInfo{err: errors.New("too many calls"), cb: cb})
        return
    }

    c.asynCall(id, args, cb, n)
    c.pendingAsynCall++
...

func execCb(ri *RetInfo) {
    defer func() {
        if r := recover(); r != nil {
            if conf.LenStackBuf > 0 {
                buf := make([]byte, conf.LenStackBuf)
                l := runtime.Stack(buf, false)
                log.Error("%v: %s", r, buf[:l])
            } else {
                log.Error("%v", r)
            }
        }
    }()

    // execute
    switch ri.cb.(type) {
    case func(error):
        ri.cb.(func(error))(ri.err)
    case func(interface{}, error):
        ri.cb.(func(interface{}, error))(ri.ret, ri.err)
    case func([]interface{}, error):
        ri.cb.(func([]interface{}, error))(assert(ri.ret), ri.err)
    default:
        panic("bug")
    }
    return
}

func (c *Client) Cb(ri *RetInfo) {
    c.pendingAsynCall--
    execCb(ri)
}

也就是说,异步模式会把结果存起来。什么时候执行Cb呢,别忘了skeleton.Run啊

// leaf\module\skeleton.go
func (s *Skeleton) Run(closeSig chan bool) {
    for {
        select {
        case <-closeSig:
            s.commandServer.Close()
            s.server.Close()
            for !s.g.Idle() || !s.client.Idle() {
                s.g.Close()
                s.client.Close()
            }
            return
        case ri := <-s.client.ChanAsynRet:
            s.client.Cb(ri)

        // 等待来自通道的数据
        case ci := <-s.server.ChanCall:
            s.server.Exec(ci)

        case ci := <-s.commandServer.ChanCall:
            s.commandServer.Exec(ci)
        case cb := <-s.g.ChanCb:
            s.g.Cb(cb)
        case t := <-s.dispatcher.ChanTimer:
            t.Cb()
        }
    }
}

四、总结

ChanRPC 的调用方有 3 种调用模式:

  • 同步模式,调用并等待 ChanRPC 返回
  • 异步模式,调用并提供回调函数,回调函数会在 ChanRPC 返回后被调用
  • Go 模式,调用并立即返回,忽略任何返回值和错误

看下来,还是 Go 模式最简单,没有回调,也没有ret返回信息。而同步模式和异步模式,暂时还没有看到使用示例。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web 2.0 Heroes

Web 2.0 Heroes

Bradley L. Jones / Wiley / 2008-04-14 / USD 24.99

Web 2.0 may be an elusive concept, but one thing is certain: using the Web as merely a means of retrieving and displaying information is history. Today?s Web is immediate, interactive, innovative. It ......一起来看看 《Web 2.0 Heroes》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具