NSQ--nsqd初始化启动

栏目: JavaScript · 发布时间: 5年前

内容简介:启动的main函数在nsq/apps/nsqd/nsqd.go里.初始化处理启动处理

启动的main函数在nsq/apps/nsqd/nsqd.go里.

func main() {   
    prg := &program{}   
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {      
        log.Fatal(err)   
    }
}


使用了svc的库,程序安全退出、服务控制两个方面。
核心在于系统信号获取、Go Concurrency Patterns、以及基本的代码封装。
import "github.com/judwhite/go-svc/svc"
复制代码

初始化处理

func (p *program) Init(env svc.Environment) error 复制代码

启动处理

func (p *program) Start() error 复制代码

结束处理

func (p *program) Start() error复制代码

主要看下Start():

func (p *program) Start() error { 
    //调用nsqFlagset和Parse进行命令行参数集初始化  
    opts := nsqd.NewOptions()   
    flagSet := nsqdFlagSet(opts)   
    flagSet.Parse(os.Args[1:])   
    rand.Seed(time.Now().UTC().UnixNano())   
    if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {      
        fmt.Println(version.String("nsqd"))      
        os.Exit(0)   
    }   

    //判断config参数是否存在,若存在的话还需进行配置文件的读取
    var cfg config   
    configFile := flagSet.Lookup("config").Value.String()   
    if configFile != "" {      
        _, err := toml.DecodeFile(configFile, &cfg)      
        if err != nil {         
        log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())      
        }   
    }  
    cfg.Validate()  
    //配置文件检查通过后,创建默认配置opts,并于命令行参数和配置文件进行合并。
    options.Resolve(opts, flagSet, cfg) 
    //nsgd的代码在nsg/nsgd/nsgd.go的里面
    //创建一个nsgd  
    nsqd := nsqd.New(opts)   
    err := nsqd.LoadMetadata()   
    if err != nil {      
        log.Fatalf("ERROR: %s", err.Error())   
    }   
    err = nsqd.PersistMetadata()   
    if err != nil {      
        log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())   
    }   
    nsqd.Main()   
    p.nsqd = nsqd   
    return nil
}复制代码

LoadMetadata(),负责加载保存在文件中的Topic出来:

func (n *NSQD) LoadMetadata() error {   
    //使用atomic包中的方法来保证方法执行前和执行后isLoading值的改变
    atomic.StoreInt32(&n.isLoading, 1)   
    defer atomic.StoreInt32(&n.isLoading, 0)
    //元数据以json格式保存在nsqd可执行文件目录下的nsqd.%d.dat中。其中%d为代表该程序的ID,     //通过在启动时的命令行worker-id或者配置文件中的id指定。默认ID是通过对主机名散列后获得。     //因此保证了同一台机器每次启动的ID相同。   
    fn := newMetadataFile(n.getOpts())   
    data, err := readOrEmpty(fn)   
    if err != nil {      
        return err   
    }   
    if data == nil {      
        return nil // fresh start   
    }   
    var m meta   
    err = json.Unmarshal(data, &m)   
    if err != nil {      
        return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)   
    }   
    for _, t := range m.Topics {      
        //检查topic名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$)      
        //,若不合法则忽略      
        if !protocol.IsValidTopicName(t.Name) {         
            n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)         
            continue      
        }      
        //使用GetTopic函数通过名字获得topic对象      
        topic := n.GetTopic(t.Name)      
        //判断当前topic对象是否处于暂停状态,是的话调用Pause函数暂停topic      
        if t.Paused {         
            topic.Pause()      
        }     
         //获取当前topic下所有的channel,并且遍历channel,执行的操作与topic基本一致      
        for _, c := range t.Channels {         
            //检查channel名称是否合法(长度在1-64之间,满足正则表达式^[\.a-zA-Z0-9_-]+(#ephemeral)?$)         
            //,若不合法则忽略         
            if !protocol.IsValidChannelName(c.Name) {            
                n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)            
                continue         
            }         
            //使用GetChannel函数通过名字获得channel对象         
            channel := topic.GetChannel(c.Name)         
            //判断当前channel对象是否处于暂停状态,是的话调用Pause函数暂停channel         
            if c.Paused {            
                channel.Pause()         
            }      
        }      
        topic.Start()   
    }   
    return nil
}复制代码

PersistMetadata()将当前的topic和channel信息写入nsqd.%d.dat文件中:

func (n *NSQD) PersistMetadata() error {   
    // persist metadata about what topics/channels we have, across restarts   
    fileName := newMetadataFile(n.getOpts())   
    n.logf(LOG_INFO, "NSQ: persisting topic/channel metadata to %s", fileName)   
    js := make(map[string]interface{})   
    topics := []interface{}{}   
    //将topicMap转成map[string]interface{}的map,转成Json   
    for _, topic := range n.topicMap {      
        if topic.ephemeral {         
        continue      
        }      
        topicData := make(map[string]interface{})      
        topicData["name"] = topic.name      
        topicData["paused"] = topic.IsPaused()      
        channels := []interface{}{}      
        topic.Lock()      
        for _, channel := range topic.channelMap {         
           channel.Lock()         
            if channel.ephemeral {            
                channel.Unlock()            
                continue         
            }         
            channelData := make(map[string]interface{})         
            channelData["name"] = channel.name         
            channelData["paused"] = channel.IsPaused()         
            channels = append(channels, channelData)         
            channel.Unlock()      
        }      
        topic.Unlock()      
        topicData["channels"] = channels      
        topics = append(topics, topicData)   
    }   
    js["version"] = version.Binary   
    js["topics"] = topics   
    data, err := json.Marshal(&js)   
    if err != nil {      
        return err   
    }   
    //写入文件时先创建扩展名为tmp的临时文件   
    tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())   
    //将topic和channel列表json序列化后写回文件中   
    err = writeSyncFile(tmpFileName, data)   
    if err != nil {      
        return err   
    }   
    // 写入内容后并保存后再调用atomicRename函数将tmp文件重命名为nsqd.%d.dat。   
    err = os.Rename(tmpFileName, fileName)   
    if err != nil {      
        return err   
    }   
    // technically should fsync DataPath here   
    return nil
}复制代码

参考至https://www.cnblogs.com/zhangboyu/p/7457061.html


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

HTML5与CSS3基础教程(第8版)

HTML5与CSS3基础教程(第8版)

[美] Elizabeth Castro、[美] Bruce Hyslop / 望以文 / 人民邮电出版社 / 2014-5 / 69.00元

本书是风靡全球的HTML和CSS入门教程的最新版,至第6版累积销量已超过100万册,被翻译为十多种语言,长期雄踞亚马逊书店计算机图书排行榜榜首。 第8版秉承作者直观透彻、循序渐进、基础知识与案例实践紧密结合的讲授特色,采用独特的双栏图文并排方式,手把手指导读者从零开始轻松入门。相较第7版,全书2/3以上的内容进行了更新,全面反映了HTML5和CSS3的最新特色,细致阐述了响应式Web设计与移......一起来看看 《HTML5与CSS3基础教程(第8版)》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码