KubeEdge 源码分析之(六)metamanager

阅读数:1 2020 年 1 月 3 日 10:48

KubeEdge源码分析之(六)metamanager

本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。
cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。

本文对 edgecore 的 metamanager 模块进行剖析,metamanager 作为 edgecore 中的 edged 模块与 edgehub 模块进行交互的桥梁,除了将 edgehub 的消息转发给 edged,还对一些必要的数据通过 SQLite 进行了缓存,在某种程度上实现了 kubeedge 的 offline mode。本文就对 metamanager 所涉及的 SQLite 数据库相关逻辑和业务逻辑进行剖析:
1、metamanager 数据库相关逻辑剖析
2、metamanager 业务逻辑剖析

metamanager 数据库相关逻辑剖析

从 eventbus 的模块注册函数入手:

kubeedge/edge/pkg/eventbus/event_bus.go

复制代码
//constant metamanager module name
const (
MetaManagerModuleName = "metaManager"
)
...
// Register register metamanager
func Register() {
dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))
core.Register(&metaManager{})
}

在注册函数 Register() 中,做了 2 件事:

  1. 在 SQLite 中的数据库中初始化 metaManager 表
复制代码
dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))
  1. 注册已经初始化的 metamanager
复制代码
core.Register(&metaManager{})

下面深入剖析 “1. 在 SQLite 中的数据库中初始化 metaManager 表” 相关内容,进入 dbm.RegisterModel(…):

kubeedge/edge/pkg/common/dbm/db.go

复制代码
//RegisterModel registers the defined model in the orm if model is enabled
func RegisterModel(moduleName string, m interface{}) {
if isModuleEnabled(moduleName) {
orm.RegisterModel(m)
...
} else {
...
}
}

RegisterModel(…) 函数是对 github.com/astaxie/bee…的封装,本文就不跟进去剖析了,感兴趣的同学可以在本文的基础上自行剖析。

回到“1. 在 SQLite 中的数据库中初始化 metaManager 表”,深入剖析 metaManager 表的具体定义 dao.Meta,进入 dao.Meta 定义:

kubeedge/edge/pkg/metamanager/dao/meta.go

复制代码
// Meta metadata object
type Meta struct {
// ID int64 `orm:"pk; auto; column(id)"`
Key string `orm:"column(key); size(256); pk"`
Type string `orm:"column(type); size(32)"`
Value string `orm:"column(value); null; type(text)"`
}

metaManager 表的具体定义包含 Key、Type 和 Value 三个字段,具体含义如下:

  • Key meta 的名字;
  • Type meta 对应的操作类型;
  • Value 具体的 meta 值;

与 Meta Struct 的定义在同一文件内,还有对 metaManager 表的一些操作定义,如 SaveMeta、DeleteMetaByKey、UpdateMeta、InsertOrUpdate、UpdateMetaField、UpdateMetaFields、QueryMeta、QueryAllMeta,本文不对具体操作的定义进行深入剖析,感兴趣的同学可以在本文的基础上自行剖析。

metamanager 业务逻辑剖析

从 metamanager 的模块启动函数入手:

kubeedge/edge/pkg/metamanager/module.go

复制代码
func (m *metaManager) Start(c *context.Context) {
m.context = c
InitMetaManagerConfig()
go func() {
period := getSyncInterval()
timer := time.NewTimer(period)
for {
select {
case <-timer.C:
timer.Reset(period)
msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)
m.context.Send(MetaManagerModuleName, *msg)
}
}
}()
m.mainLoop()
}

启动函数 Start(…) 做了如下 4 件事:

  1. 接收并保存模块启动时传入的 *context.Context 实例
复制代码
m.context = c
  1. 初始化 metamanager 配置
复制代码
InitMetaManagerConfig()
  1. 启动一个 goroutine 同步心跳信息
复制代码
go func() {...}
  1. 启动一个循环处理各种事件
复制代码
m.mainLoop()

接下来展开分析 2、3、4。

初始化 metamanager 配置
进入 InitMetaManagerConfig() 定义:

kubeedge/edge/pkg/metamanager/msg_processor.go

复制代码
// InitMetaManagerConfig init meta config
func InitMetaManagerConfig() {
var err error
groupName, err := config.CONFIG.GetValue("metamanager.context-send-group").ToString()
...
edgeSite, err := config.CONFIG.GetValue("metamanager.edgesite").ToBool()
...
moduleName, err := config.CONFIG.GetValue("metamanager.context-send-module").ToString()...
}

在初始化 metamanager 配置时,从配置文件中获取了 metamanager.context-send-group、metamanager.edgesite、metamanager.context-send-module,根据获取的值对相关变量进行设置。

启动一个 goroutine 同步心跳信息

kubeedge/edge/pkg/metamanager/module.go

复制代码
go func() {
period := getSyncInterval()
timer := time.NewTimer(period)
for {
select {
case <-timer.C:
timer.Reset(period)
msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)
m.context.Send(MetaManagerModuleName, *msg)
}
}
}

在同步心跳信息的 goroutine 中,做了如下 2 件事:

  1. 获取通信心跳的时间间隔
复制代码
period := getSyncInterval()
  1. 创建定时器,并定时发送心跳信息
复制代码
timer := time.NewTimer(period) for {...}

启动一个循环处理各种事件

进入 m.mainLoop() 定义:

kubeedge/edge/pkg/metamanager/msg_processor.go

复制代码
func (m *metaManager) mainLoop() {
go func() {
for {
if msg, err := m.context.Receive(m.Name()); err == nil {
...
m.process(msg)
} else {
...
}
}
}()
}

mainLoop() 函数启动了一个 for 循环,在循环中主要做了 2 件事:

  1. 接收信息
复制代码
msg, err := m.context.Receive(m.Name())
  1. 对接收到的信息进行处理
复制代码
m.process(msg)

想弄明白对信息的处理过程,需要进入 m.process(…) 的定义:

kubeedge/edge/pkg/metamanager/msg_processor.go

复制代码
func (m *metaManager) process(message model.Message) {
operation := message.GetOperation()
switch operation {
case model.InsertOperation:
m.processInsert(message)
case model.UpdateOperation:
m.processUpdate(message)
case model.DeleteOperation:
m.processDelete(message)
case model.QueryOperation:
m.processQuery(message)
case model.ResponseOperation:
m.processResponse(message)
case messagepkg.OperationNodeConnection:
m.processNodeConnection(message)
case OperationMetaSync:
m.processSync(message)
case OperationFunctionAction:
m.processFunctionAction(message)
case OperationFunctionActionResult:
m.processFunctionActionResult(message)
case constants.CSIOperationTypeCreateVolume,
constants.CSIOperationTypeDeleteVolume,
constants.CSIOperationTypeControllerPublishVolume,
constants.CSIOperationTypeControllerUnpublishVolume:
m.processVolume(message)}}

process(…) 函数中主要做了如下 2 件事:

  1. 获取消息的操作的类型
复制代码
operation := message.GetOperation()
  1. 根据信息操作类型对信息进行相应处理
复制代码
switch operation { ... }

信息的操作类型包括 insert、update、delete、query、response、publish、meta-internal-sync、action、action_result 等,本文不对信息的具体处理过程剖析,感兴趣的同学可以在本文的基础上自行剖析。

评论

发布