etcd启动流程源码分析笔记(-)

栏目: 后端 · 发布时间: 6年前

内容简介:etcd启动流程源码分析笔记(-)

1.初始化etcdServer流程:

代码路径为:github.com\coreos\etcd\embed\etcd.go

StartEtcd(inCfg *Config) (e *Etcd, err error)

流程如下:

1.1: 参数校验:inCfg.Validate()

校验关注点1:

checkBindURLs(cfg.LPUrls):校验peer-urls schem相关信息,且在3.1版本之后不允许使用域名作为url来进行绑定操作。

checkBindURLs(cfg.LCUrls) : 校验client-urls schem相关信息,且在3.1版本之后不允许使用域名作为url来进行绑定操作。

使用域名对于性能上是有一定的影响,但是在实际生产环境中,是存在使用域名的场景,需要修改如下代码进行适配:

func checkBindURLs(urls []url.URL) error {

        //...

        if net.ParseIP(host) == nil {

            //取消err的return,改为打印告警信息,同3.1之前版本。

        return fmt.Errorf("expected IP in URL for binding (%s)", url.String())

    }

    

}

校验关注点2:由于实际现网的网络延迟各不相同,选举及心跳超时时间可作为调优适配的考虑范畴。

5*cfg.TickMs > cfg.ElectionMs :选举超时时间必须大于五倍于心跳超时时间。

cfg.ElectionMs > maxElectionMs:选举超时时间必须小于5000ms

1.2:初始化PeerListeners,ClientListeners,用于监听peers间及client端发送的http请求

PeerListeners:作为etcd member之间进行通信使用的listeners,为了性能考量,建议内部试用schema:http,由flag "listen-peer-urls"确定,

ClientListeners:作为接受外部请求的listerners,一般为了安全性考量,一般使用 schema:https,由flag "listen-client-urls"确定,

具体方法实现为:

transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout)

默认的读写超时均为5s:

ConnReadTimeout  = 5 * time.Second

        ConnWriteTimeout = 5 * time.Second

1.3: 获取PeerURLsMap以及cluster token

1.4: 生成new etcdServer所需的的ServerConfig结构体:

// ServerConfig holds the configuration of etcd as taken from the command line or discovery.

type ServerConfig struct {

        Name           string  // etcdserver 名称,对应flag "name“

        DiscoveryURL   string  // etcd 用于服务发现,无需知道具体etcd节点ip即可访问etcd 服务,对应flag  "discovery" 

        DiscoveryProxy string  // 供服务发现url的代理地址, 对应flag "discovery-proxy"

        ClientURLs     types.URLs // 由ip+port组成,默认DefaultListenClientURLs = "http://localhost:2379"; 实际情况使用https schema,供etcd member 通信,对应flag "listen-client-urls"

        PeerURLs       types.URLs // 由ip+port组成,默认DefaultListenPeerURLs   = "http://localhost:2380"; 实际生产环境使用http schema, 用以外部etcd client访问,对应flag "listen-client-urls"

        DataDir        string   // 数据目录地址,为全路径,对应flag "data-dir"

        // DedicatedWALDir config will make the etcd to write the WAL to the WALDir

        // rather than the dataDir/member/wal.

        DedicatedWALDir     string

        SnapCount           uint64  // 默认是10000次事件做一次快照:DefaultSnapCount = 100000可以作为调优参数进行参考,对应flag "snapshot-count", 

        MaxSnapFiles        uint  // 默认是5,这是v2的参数,v3内只有一个db文件,DefaultMaxSnapshots = 5,对应flag "max-snapshots"

        MaxWALFiles         uint  // 默认是5,DefaultMaxWALs      = 5,表示最大存储wal文件的个数,对应flag "max-wals",保留的文件可以作为etcd-dump-logs工具进行debug使用。

        InitialPeerURLsMap  types.URLsMap // peerUrl 与 etcd name对应的map,由方法cfg.PeerURLsMapAndToken("etcd")生成。

        InitialClusterToken string // etcd 集群token, 对应flang "initial-cluster-token"

        NewCluster          bool // 确定是否为新建集群,对应flag "initial-cluster-state",由方法func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew }确定;

        ForceNewCluster     bool // 对应flag "force-new-cluster",默认为false,若为true,在生产环境内,一般用于含v2数据的集群恢复,效果为以现有数据或者空数据新建一个单节点的etcd集群,如果存在数据,则会清楚数据内的元数据信息,并重建只包含该etcd的元数据信息。

        PeerTLSInfo         transport.TLSInfo // member间通信使用的证书信息,若peerURL为https时使用,对应flag "peer-ca-file","peer-cert-file", "peer-key-file"



        TickMs           uint // raft node 发送心跳信息的超时时间。 "heartbeat-interval"

        ElectionTicks    int  // raft node 发起选举的超时时间,最大为5000ms maxElectionMs = 50000, 对应flag "election-timeout", 选举时间与心跳时间在最佳实践内建议是10倍关系。

        BootstrapTimeout time.Duration // etcd server启动的超时时间,默认为1s, 由方法func (c *ServerConfig) bootstrapTimeout() time.Duration确定;



        AutoCompactionRetention int  // 默认为0,单位为小时,主要为了方便用户快速查询,定时对key进行合并处理,对应flag "auto-compaction-retention",由方法func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic确定,

                                    //具体compact的实现方法为:func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)

        QuotaBackendBytes       int64 // etcd后端数据文件的大小,默认为2GB,最大为8GB, v3的参数,对应flag  "quota-backend-bytes" ,具体定义:etcd\etcdserver\quota.go



        StrictReconfigCheck bool



        // ClientCertAuthEnabled is true when cert has been signed by the client CA.

        ClientCertAuthEnabled bool



        AuthToken string

    

}

1.5, 调用方法 func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) 初始化etcdServer:

// NewServer creates a new EtcdServer from the supplied configuration. The

    // configuration is considered static for the lifetime of the EtcdServer.

    func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {

}

1.5.1: 分配内存空间

st := store.New(StoreClusterPrefix, StoreKeysPrefix)

1.5.2: 检测并生成数据目录,生成向远端raft node peer listeners发送请求的Transport

其中的超时时间计算方法为: time.Second + time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond/5

1.5.3:

根据日志目录是否存在,对应生成raft node实体。

1.5.3.1: 若日志目录不存在且flag "initial-cluster-state"为'existing':

case !haveWAL && !cfg.NewCluster:

使用方法 func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) 生成raft node实体

id, n, s, w = startNode(cfg, cl, nil)

1.5.3.2: 若日志目录不存在且flag "initial-cluster-state"为'new':

case !haveWAL && cfg.NewCluster:

使用方法 func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) 生成raft node实体

id, n, s, w = startNode(cfg, cl, cl.MemberIDs())

1.5.3.3 若日志目录存在:

1.5.3.3.1 若flag "force-new-cluster" 为"false":

调用方法 func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) 生成raft node实体

id, cl, n, s, w = restartNode(cfg, snapshot)

1.5.3.3.2 若flag "force-new-cluster" 为"true":

调用方法 func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) 生成raft node实体

id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)

1.5.4 初始化EtcdServer:

srv = &EtcdServer{

                readych:     make(chan struct{}),

                Cfg:         cfg,

                snapCount:   cfg.SnapCount,

                errorc:      make(chan error, 1),

                store:       st,

                snapshotter: ss,

                r: *newRaftNode(

                    raftNodeConfig{

                        isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },

                        Node:        n,

                        heartbeat:   heartbeat,

                        raftStorage: s,

                        storage:     NewStorage(w, ss),

                    },

                ),

                id:            id,

                attributes:    membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},

                cluster:       cl,

                stats:         sstats,

                lstats:        lstats,

                SyncTicker:    time.NewTicker(500 * time.Millisecond),

                peerRt:        prt,

                reqIDGen:      idutil.NewGenerator(uint16(id), time.Now()),

                forceVersionC: make(chan struct{}),

                ....

            

}

在初始化EtcdServer过程中,会启动用于peer间发送及接收raft 消息的rafthttp transport,具体方法如下:

func (t *Transport) Start() error {

                var err error

                t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)

                if err != nil {

                    return err

                }

                t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)

                if err != nil {

                    return err

                }

                t.remotes = make(map[types.ID]*remote)

                t.peers = make(map[types.ID]Peer)

                t.prober = probing.NewProber(t.pipelineRt)

                return nil

2.1. 启动etcdServer

3.1. 为每个client url及peer url 启动一个client server的goroutine,以提供监听服务,这个动作在raft http transport启动之后:

peer server goroutine:

go func(l *peerListener) {

        e.errHandler(l.serve())

    }(pl)

client server goroutine:

go func(s *serveCtx) {

        e.errHandler(s.serve(e.Server, ctlscfg, v2h, e.errHandler))

    }(sctx)

若启动失败,则停止grpcServer:

defer func() {

    ...

    if !serving {

        // errored before starting gRPC server for serveCtx.grpcServerC

        for _, sctx := range e.sctxs {

            close(sctx.grpcServerC)

        }

    }

    ...

}()

暂时就启动流程进行粗略分享,后续将进一步分析 etcdServer 启动具体机制,及针对NewServer内针对生成raft node详细机制进行分析及基于k8s平台部署etcd 集群备份恢复方案进行探讨。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

用户思维+:好产品让用户为自己尖叫

用户思维+:好产品让用户为自己尖叫

[美] Kathy Sierra / 石航 / 人民邮电出版社 / 2017-9 / 69.00元

畅销产品与普通产品的本质区别是什么?若没有巨额预算、不爱营销噱头、不开奢华的产品发布会,如何打造可持续成功的产品?本书针对上述问题提出了新颖的观点:用户并不关心产品本身有多棒,而是关心使用产品时自己有多棒。作者利用其多年的交互设计经验,生动阐释了这一观点背后的科学。可贵的是,本书并不止步于解释“为什么”,还清晰呈现了“怎么做”。 本书风格活泼、图文并茂,其对话式内容既引人入胜,又引人深思,适......一起来看看 《用户思维+:好产品让用户为自己尖叫》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具