记一次 goroutine 泄漏问题查找原因与解决

栏目: Go · 发布时间: 6年前

内容简介:问题描述: 有个需求要实现根据url参数在反向代理负载均衡处分发到不同的k8s service,反向代理负载均衡用的是caddy,于是将caddy源码拉下来,在原来proxy插件的基础上修改成以下idproxy插件,实现根据id参数分发到id-xxx.namespace的k8s服务部署上去后,用了个每秒都有请求过来的服务测试,发现pod隔一段时间就会被k8s OOMKill,重启了几十次,猜想可能是内存泄漏,查看了下内存和CPU使用图,发现内存在一直增长,但速度不快对于向代理负载均衡的服务,负载均衡是客户端

记一次goroutine泄漏问题查找原因与解决

问题描述: 有个需求要实现根据url参数在反向代理负载均衡处分发到不同的k8s service,反向代理负载均衡用的是caddy,于是将caddy源码拉下来,在原来proxy插件的基础上修改成以下idproxy插件,实现根据id参数分发到id-xxx.namespace的k8s服务

func init() {
  httpserver.RegisterDevDirective("idproxy", "proxy")
  caddy.RegisterPlugin("idproxy", caddy.Plugin{
    ServerType: "http",
    Action:     setup,
  })
}

type Upstream struct {
  proxy.Upstream
}

func (u Upstream) Select(r *http.Request) *proxy.UpstreamHost {
  uh := u.Upstream.Select(r)
  if uh == nil {
    return nil
  }
  if len(idproxyQueryUrl) > 0 && len(idproxyPaths) > 0 {
    for _, path := range idproxyPaths {
      if strings.HasPrefix(r.URL.Path, path) {
        id, _ := strconv.ParseInt(r.URL.Query().Get("id"), 10, 64)
        baseURL, _ := url.Parse(uh.Name)
        baseURL.Host = fmt.Sprintf("id-%d.%s", id, idproxyNamespace)
        return &proxy.UpstreamHost{
          Name:                         baseURL.String(),
          FailTimeout:                  uh.FailTimeout,
          UpstreamHeaders:              uh.UpstreamHeaders,
          DownstreamHeaders:            uh.DownstreamHeaders,
          WithoutPathPrefix:            uh.WithoutPathPrefix,
          MaxConns:                     uh.MaxConns,
          UpstreamHeaderReplacements:   uh.UpstreamHeaderReplacements,
          DownstreamHeaderReplacements: uh.DownstreamHeaderReplacements,
        }
      }
      return uh
    }
  }
  return uh
}

// setup configures a new Proxy middleware instance.
func setup(c *caddy.Controller) error {
  upstreams, err := proxy.NewStaticUpstreams(c.Dispenser, httpserver.GetConfig(c).Host())
  if err != nil {
    return err
  }
  for i := range upstreams {
    upstreams[i] = Upstream{upstreams[i]}
  }
  httpserver.GetConfig(c).AddMiddleware(func(next httpserver.Handler) httpserver.Handler {
    return proxy.Proxy{Next: next, Upstreams: upstreams}
  })

  // Register shutdown handlers.
  for _, upstream := range upstreams {
    c.OnShutdown(upstream.Stop)
  }

  return nil
}

部署上去后,用了个每秒都有请求过来的服务测试,发现pod隔一段时间就会被k8s OOMKill,重启了几十次,猜想可能是内存泄漏,查看了下内存和CPU使用图,发现内存在一直增长,但速度不快 记一次 goroutine 泄漏问题查找原因与解决 于是马上加了下net/http/pprof包的handler来查看运行状况,发现goroutine在一直增加,并且发现有两万多goroutine卡在http包的serve这里,readRequest一直在等待,所以应该是tcp连接泄露了,但一直保持连接状态没有关闭,可能是http的连接池那里出问题了 0x7db6c2 net/http.(*conn).readRequest+0x162 /usr/local/go/src/net/http/server.go:967 0x7dfe30 net/http.(*conn).serve+0x850 /usr/local/go/src/net/http/server.go:1878 记一次 goroutine 泄漏问题查找原因与解决 查看标准库源码查到Request都是由Transport处理的,并且tcp连接也是在这里建立并复用,所以有可能是客户端的Transport出问题了 file: src/net/http/transport.go

func (t *Transport) roundTrip(req *Request) (*Response, error) {
  code...
  // Get the cached or newly-created connection to either the
  // host (for http or https), the http proxy, or the http proxy
  // pre-CONNECTed to https server. In any case, we'll be ready
  // to send it requests.
  pconn, err := t.getConn(treq, cm)
  if err != nil {
    t.setReqCanceler(req, nil)
    req.closeBody()
    return nil, err
  }
  
  var resp *Response
  if pconn.alt != nil {
    // HTTP/2 path.
    t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host
    t.setReqCanceler(req, nil)   // not cancelable with CancelRequest
    resp, err = pconn.alt.RoundTrip(req)
  } else {
    resp, err = pconn.roundTrip(treq)
  }
  if err == nil {
    return resp, nil
  }
  code...
}
// getConn dials and creates a new persistConn to the target as
// specified in the connectMethod. This includes doing a proxy CONNECT
// and/or setting up TLS.  If this doesn't return an error, the persistConn
// is ready to write requests to.
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
  req := treq.Request
  trace := treq.trace
  ctx := req.Context()
  if trace != nil && trace.GetConn != nil {
    trace.GetConn(cm.addr())
  }
  if pc, idleSince := t.getIdleConn(cm); pc != nil {
    if trace != nil && trace.GotConn != nil {
      trace.GotConn(pc.gotIdleConnTrace(idleSince))
    }
    // set request canceler to some non-nil function so we
    // can detect whether it was cleared between now and when
    // we enter roundTrip
    t.setReqCanceler(req, func(error) {})
    return pc, nil
  }

  type dialRes struct {
    pc  *persistConn
    err error
  }
  dialc := make(chan dialRes)
  cmKey := cm.key()

  // Copy these hooks so we don't race on the postPendingDial in
  // the goroutine we launch. Issue 11136.
  testHookPrePendingDial := testHookPrePendingDial
  testHookPostPendingDial := testHookPostPendingDial

  handlePendingDial := func() {
    testHookPrePendingDial()
    go func() {
      if v := <-dialc; v.err == nil {
        t.putOrCloseIdleConn(v.pc)
      } else {
        t.decHostConnCount(cmKey)
      }
      testHookPostPendingDial()
    }()
  }

  cancelc := make(chan error, 1)
  t.setReqCanceler(req, func(err error) { cancelc <- err })

  if t.MaxConnsPerHost > 0 {
    select {
    case <-t.incHostConnCount(cmKey):
      // count below conn per host limit; proceed
    case pc := <-t.getIdleConnCh(cm):
      if trace != nil && trace.GotConn != nil {
        trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
      }
      return pc, nil
    case <-req.Cancel:
      return nil, errRequestCanceledConn
    case <-req.Context().Done():
      return nil, req.Context().Err()
    case err := <-cancelc:
      if err == errRequestCanceled {
        err = errRequestCanceledConn
      }
      return nil, err
    }
  }

  go func() {
    pc, err := t.dialConn(ctx, cm)
    dialc <- dialRes{pc, err}
    }()
  code...
}

对于向代理负载均衡的服务,负载均衡是客户端,于是回头查看caddy proxy的源码 file: vendor/github.com/mholt/caddy/caddyhttp/proxy/proxy.go

// ServeHTTP satisfies the httpserver.Handler interface.
func (p Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) {
  code...
  for {
    // since Select() should give us "up" hosts, keep retrying
    // hosts until timeout (or until we get a nil host).
    host := upstream.Select(r)
    if host == nil {
        if backendErr == nil {
            backendErr = errors.New("no hosts available upstream")
        }
        if !keepRetrying(backendErr) {
            break
        }
        continue
    }
    if rr, ok := w.(*httpserver.ResponseRecorder); ok && rr.Replacer != nil {
        rr.Replacer.Set("upstream", host.Name)
    }
  
    proxy := host.ReverseProxy
  
    // a backend's name may contain more than just the host,
    // so we parse it as a URL to try to isolate the host.
    if nameURL, err := url.Parse(host.Name); err == nil {
        outreq.Host = nameURL.Host
        if proxy == nil {
            proxy = NewSingleHostReverseProxy(nameURL,
                host.WithoutPathPrefix,
                http.DefaultMaxIdleConnsPerHost,
                upstream.GetTimeout(),
                upstream.GetFallbackDelay(),
            )
        }
  
        // use upstream credentials by default
        if outreq.Header.Get("Authorization") == "" && nameURL.User != nil {
            pwd, _ := nameURL.User.Password()
            outreq.SetBasicAuth(nameURL.User.Username(), pwd)
        }
    } else {
        outreq.Host = host.Name
    }
    if proxy == nil {
        return http.StatusInternalServerError, errors.New("proxy for host '" + host.Name + "' is nil")
    }
    code...
    }
  }
}

在Proxy.ServeHTTP中,因为重写的Select函数返回的UpstreamHost没有ReverseProxy,所以每次都会新建一个,但是UpstreamHost是没有存下来的,另外ReverseProxy里面又会新建Transport,所以每次新建的Transport就泄露掉了,但是处理请求建立的tcp连接因为没有主动关闭,所以服务端一直保持着长链接,直到tcp链接超时才能被关闭,但定时的请求一直在新建连接,最后导致内存超出限制,被k8s kill调 file: vendor/github.com/mholt/caddy/caddyhttp/proxy/reverseproxy.go

func NewSingleHostReverseProxy(target *url.URL, without string, keepalive int, timeout, fallbackDelay time.Duration) *ReverseProxy {
  code...
  rp := &ReverseProxy{
      Director:      director,
      FlushInterval: 250 * time.Millisecond, // flushing good for streaming & server-sent events
      srvResolver:   net.DefaultResolver,
      dialer:        &dialer,
  }

  if target.Scheme == "unix" {
      rp.Transport = &http.Transport{
          Dial: socketDial(target.String(), timeout),
      }
  } else if target.Scheme == "quic" {
      rp.Transport = &h2quic.RoundTripper{
          QuicConfig: &quic.Config{
              HandshakeTimeout: defaultCryptoHandshakeTimeout,
              KeepAlive:        true,
          },
      }
  } else if keepalive != http.DefaultMaxIdleConnsPerHost || strings.HasPrefix(target.Scheme, "srv") {
      dialFunc := rp.dialer.Dial
      if strings.HasPrefix(target.Scheme, "srv") {
          dialFunc = rp.srvDialerFunc(target.String(), timeout)
      }

      transport := &http.Transport{
          Proxy:                 http.ProxyFromEnvironment,
          Dial:                  dialFunc,
          TLSHandshakeTimeout:   defaultCryptoHandshakeTimeout,
          ExpectContinueTimeout: 1 * time.Second,
      }
      if keepalive == 0 {
          transport.DisableKeepAlives = true
      } else {
          transport.MaxIdleConnsPerHost = keepalive
      }
      if httpserver.HTTP2 {
          if err := http2.ConfigureTransport(transport); err != nil {
              log.Println("[ERROR] failed to configure transport to use HTTP/2: ", err)
          }
      }
      rp.Transport = transport
  } else {
      transport := &http.Transport{
          Proxy: http.ProxyFromEnvironment,
          Dial:  rp.dialer.Dial,
      }
      if httpserver.HTTP2 {
          if err := http2.ConfigureTransport(transport); err != nil {
              log.Println("[ERROR] failed to configure transport to use HTTP/2: ", err)
          }
      }
      rp.Transport = transport
  }
  return rp
}

解决: 将带ReverseProxy的UpstreamHost存下来复用,caddy插件改成这样

type Upstream struct {
    proxy.Upstream
    hostPool map[string]*proxy.UpstreamHost
    lock     sync.RWMutex
}

type NewHoster interface {
    NewHost(host string) (*proxy.UpstreamHost, error)
}

func (u *Upstream) getUpstreamHost(host string) *proxy.UpstreamHost {
    u.lock.RLock()
    defer u.lock.RUnlock()
    return u.hostPool[host]
}

func (u *Upstream) addUpstreamHost(host string, uh *proxy.UpstreamHost) {
    u.lock.Lock()
    defer u.lock.Unlock()
    u.hostPool[host] = uh
}

func (u *Upstream) Select(r *http.Request) *proxy.UpstreamHost {
    uh := u.Upstream.Select(r)
    if uh == nil {
        return nil
    }
    if len(idproxyQueryUrl) > 0 && len(idproxyPaths) > 0 {
        for _, path := range idproxyPaths {
            if strings.HasPrefix(r.URL.Path, path) {
                id, _ := strconv.ParseInt(r.URL.Query().Get("id"), 10, 64)
                host := fmt.Sprintf("id-%d.%s", id, idproxyNamespace)
                uh := u.getUpstreamHost(host)
                if uh != nil {
                    return uh
                }
                uh, err := u.Upstream.(NewHoster).NewHost(host)
                if err != nil {
                    log.Println(err)
                    return nil
                }
                u.addUpstreamHost(host, uh)
                return uh
            }
        }
    }
    return uh
}

// setup configures a new Proxy middleware instance.
func setup(c *caddy.Controller) error {
    upstreams, err := proxy.NewStaticUpstreams(c.Dispenser, httpserver.GetConfig(c).Host())
    if err != nil {
        return err
    }
    for i := range upstreams {
        upstreams[i] = &Upstream{
            Upstream: upstreams[i],
            hostPool: make(map[string]*proxy.UpstreamHost),
        }
    }
    httpserver.GetConfig(c).AddMiddleware(func(next httpserver.Handler) httpserver.Handler {
        return proxy.Proxy{Next: next, Upstreams: upstreams}
    })

    // Register shutdown handlers.
    for _, upstream := range upstreams {
        c.OnShutdown(upstream.Stop)
    }

    return nil
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Linux Command Line

The Linux Command Line

William E. Shotts Jr. / No Starch Press, Incorporated / 2012-1-17 / USD 39.95

You've experienced the shiny, point-and-click surface of your Linux computer-now dive below and explore its depths with the power of the command line. The Linux Command Line takes you from your very ......一起来看看 《The Linux Command Line》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具