go-micro-config源码分析

June 28, 2020

go-micro/config基础特性及用法,参考官方文档

核心数据结构

感觉go-micro的设计理念应该是一切皆是Interface,所以除了Pluggable Sources等特性外,内部的一些数据结构上也预留了插件化的可能。几个数据结构在官网上都介绍比较清楚了,主要关注下几个核心数据结构的方法。

Config

暴露给用户的对象,用于加载、读取和同步数据源。

[go]
// 动态配置的抽象接口。可以作为一个单独组建给业务使用,但框架自身依赖**DefaultConfig**实现
// 将多个source聚合成一个对外的统一数据源,支持加载、同步、观测一个或多个source。
type Config interface {
    // provide the reader.Values interface
    reader.Values
    // Init the config
    Init(opts ...Option) error
    // Options in the config
    Options() Options
    // Stop the config loader/watcher
    Close() error
    // Load config sources
    Load(source ...source.Source) error
    // Force a source changeset sync
    Sync() error
    // Watch a value for changes
    Watch(path ...string) (Watcher, error)
}
  • 通过Load加载数据源(source)到config内部
  • 通过Watch返回的wather观察数据更新
  • Sync写回给数据源

虽然gomicro中一切皆Interface,但内部实现依赖DefaultConfig。Config的实现依赖两个核心数据结构:

[go]
func (c *config) Init(opts ...Option) error {
    c.opts = Options{
        Loader: memory.NewLoader(), // Loader
        Reader: json.NewReader(),   // Reader
    }
    c.exit = make(chan bool)
    for _, o := range opts {
        o(&c.opts)
    }

    // ...
}

Loader负责加载Source、管理watcher链表、管理sets数据、执行Merge操作等,为Config屏蔽了不少下层逻辑,使对外的接口尽可能干净。

Reader就比较好理解了,参见官方文档。

注意Config中存在一个唯一的goroutine,用来聚合各个Source的变更。每个Source在Loader中也维护了一个goroutine,详见后面Loader部分。

[go]
func (c *config) run() {
    watch := func(w loader.Watcher) error {
        for {
            // get changeset
            snap, err := w.Next()
            if err != nil {
                return err
            }

            c.Lock()

            // save
            c.snap = snap

            // set values
            c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)

            c.Unlock()
        }
    }

Source

配置内容源,支持以下格式:

  • configmap - k8s configmap
  • consul - consul
  • etcd - etcd v3
  • env - 环境变量
  • file - 配置文件
  • flag - 命令行标识
  • grpc - grpc
  • memory - 内存...
[text]
// Source is the source from which config is loaded
type Source interface {
    Read() (*ChangeSet, error) // 提供给Loadder使用,通过Read读取到ChangeSet
    Write(*ChangeSet) error
    Watch() (Watcher, error) // 创建一个Source对应的Watcher
    String() string
}

// 代表从Source读取的内容。可能为全量,也可能为增量,取决于Source的实现
type ChangeSet struct {
    Data      []byte
    Checksum  string
    Format    string
    Source    string
    Timestamp time.Time
}

// Watcher watches a source for changes
type Watcher interface {
    Next() (*ChangeSet, error) // 无变化时会block
    Stop() error
}

Watcher的底层实现还是依赖各Source的watcher,比如file source,就引入了"fsnotify/fsnotify"来监控文件的变化。每个Source被Loader Load时会生成一个对应的source watcher goroutine,用于监控source的变更。

各种Source的实现就没有具体关注了。

Loader

用于加载Source,实现的核心数据结构。

[text]
// Loader manages loading sources
type Loader interface {
    // Stop the loader
    Close() error
    // Load the sources
    Load(...source.Source) error
    // A Snapshot of loaded config
    Snapshot() (*Snapshot, error)
    // Force sync of sources
    Sync() error
    // Watch for changes
    Watch(...string) (Watcher, error)
    // Name of loader
    String() string
}

// Watcher lets you watch sources and returns a merged ChangeSet
type Watcher interface {
    // First call to next may return the current Snapshot
    // If you are watching a path then only the data from
    // that path is returned.
    Next() (*Snapshot, error)
    // Stop watching for changes
    Stop() error
}

// 功能:ChangeSet的聚合。
// 场景:当loader重新加载数据之后会保存一份snapshot
type Snapshot struct {
    // The merged ChangeSet
    ChangeSet *source.ChangeSet
    // Deterministic and comparable version of the snapshot
    Version string
}

框架中提供了一个缺省的memory loader,Loader的存在主要基于以下几个原因:

1、Source是可以聚合(Merge)的,在Reader中提供了Merge的实现。当系统存在多份Source时,需要将Source聚合好之后给到Config,否则Config中就得处理此类逻辑。

2、对Source的failover。当Load失败时,提供了Snapshot能力。

Loader还维护了核心的数据结构:watcher列表。source watcher goroutine发现内容变更时会将变更的内容通知给所有goroutine(通过Get读取对应的path值),“通知”只是将内容写到了update 队列。具体的变更逻辑还得外部调用方调用Next感知。

注意调用方增加Watcher时,Loader只是在watcher list中增加一个节点,没有goroutine的开销。但是Suorce的内容变更会通知所有wathcher,wather会根据内容的变化情况(bytes.Equal)来决定是否通知上层变更。

[go]
    // Loader中通知各个watcher,内容可能已变化,写入changeset
    for _, w := range watchers {
        select {
        case w.updates <- m.vals.Get(w.path...):
        default:
        }
    }

    // ...


// Loader提供的变更获取接口,读取changeset
func (w *watcher) Next() (*loader.Snapshot, error) {
    for {
        select {
        case <-w.exit:
            return nil, errors.New("watcher stopped")
        case v := <-w.updates:
            if bytes.Equal(w.value.Bytes(), v.Bytes()) {
                continue
            }
            w.value = v

            cs := &source.ChangeSet{
                Data:      v.Bytes(),
                Format:    w.reader.String(),
                Source:    "memory",
                Timestamp: time.Now(),
            }
            cs.Sum()

            return &loader.Snapshot{
                ChangeSet: cs,
                Version:   fmt.Sprintf("%d", time.Now().Unix()),
            }, nil
        }
    }
}

// config暴露给外部的watcher接口, 封装Loader的Next接口
func (w *watcher) Next() (reader.Value, error) {
    for {
        s, err := w.lw.Next()   // 内部调用loader的Next获取ChangeSet
        if err != nil {
            return nil, err
        }

        // only process changes
        if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) {
            continue
        }

        v, err := w.rd.Values(s.ChangeSet)
        if err != nil {
            return nil, err
        }

        w.value = v.Get()
        return w.value, nil
    }
}

另上面也提供,config中也维护了一个gouroutine,watch的path是各个source的聚合。当一个Source的内容变更时需要将各个Source内容重新聚合后返回给config,以更新Config中的snapshot和value值。

总结

图中画得比较明白了,比较难以理解的地方也在于Loader为支持多个Source的一些操作,包括sets列表、Merge操作。当理解需要支持多个Source的聚合之后就比较好理解了。

整体感觉config的封装过于复杂了,包括的viper的引入还进行了改造,在代码实现中有比较多的类似的逻辑,包括命名也有不少重复,阅读起来有点心累。估计代码圈复杂度比较高。。

See all postsSee all posts