写点什么

KubeEdge 源码分析之(六)metamanager

  • 2020-01-03
  • 本文字数:3874 字

    阅读完需:约 13 分钟

KubeEdge源码分析之(六)metamanager

本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。

cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 metamanager 模块进行剖析,metamanager 作为 edgecore 中的 edged 模块与 edgehub 模块进行交互的桥梁,除了将 edgehub 的消息转发给 edged,还对一些必要的数据通过 SQLite 进行了缓存,在某种程度上实现了 kubeedge 的 offline mode。本文就对 metamanager 所涉及的 SQLite 数据库相关逻辑和业务逻辑进行剖析:


1、metamanager 数据库相关逻辑剖析


2、metamanager 业务逻辑剖析

metamanager 数据库相关逻辑剖析

从 eventbus 的模块注册函数入手:


kubeedge/edge/pkg/eventbus/event_bus.go


//constant metamanager module name const (                        MetaManagerModuleName = "metaManager" ) ... // Register register metamanager func Register() {                          dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))               core.Register(&metaManager{}) }
复制代码


在注册函数 Register()中,做了 2 件事:


  1. 在 SQLite 中的数据库中初始化 metaManager 表


dbm.RegisterModel(MetaManagerModuleName, new(dao.Meta))
复制代码


  1. 注册已经初始化的 metamanager


core.Register(&metaManager{})
复制代码


下面深入剖析 “1.在 SQLite 中的数据库中初始化 metaManager 表” 相关内容,进入 dbm.RegisterModel(…):


kubeedge/edge/pkg/common/dbm/db.go


//RegisterModel registers the defined model in the orm if model is enabled func RegisterModel(moduleName string, m interface{}) {               if isModuleEnabled(moduleName) {                        orm.RegisterModel(m)                        ...              } else {                       ...            } }

复制代码


RegisterModel(…)函数是对 github.com/astaxie/bee…的封装,本文就不跟进去剖析了,感兴趣的同学可以在本文的基础上自行剖析。


回到“1.在 SQLite 中的数据库中初始化 metaManager 表”,深入剖析 metaManager 表的具体定义 dao.Meta,进入 dao.Meta 定义:


kubeedge/edge/pkg/metamanager/dao/meta.go


// Meta metadata object type Meta struct {              // ID    int64  `orm:"pk; auto; column(id)"`            Key   string `orm:"column(key); size(256); pk"`             Type  string `orm:"column(type); size(32)"`             Value string `orm:"column(value); null; type(text)"`}
复制代码


metaManager 表的具体定义包含 Key、Type 和 Value 三个字段,具体含义如下:


  • Key meta 的名字;

  • Type meta 对应的操作类型;

  • Value 具体的 meta 值;


与 Meta Struct 的定义在同一文件内,还有对 metaManager 表的一些操作定义,如 SaveMeta、DeleteMetaByKey、UpdateMeta、InsertOrUpdate、UpdateMetaField、UpdateMetaFields、QueryMeta、QueryAllMeta,本文不对具体操作的定义进行深入剖析,感兴趣的同学可以在本文的基础上自行剖析。

metamanager 业务逻辑剖析

从 metamanager 的模块启动函数入手:


kubeedge/edge/pkg/metamanager/module.go


func (m *metaManager) Start(c *context.Context) {              m.context = c              InitMetaManagerConfig()              go func() {                           period := getSyncInterval()                           timer := time.NewTimer(period)                          for {                                      select {                                      case <-timer.C:                                                  timer.Reset(period)                                                   msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                                  m.context.Send(MetaManagerModuleName, *msg)                                     }                        }            }()            m.mainLoop() }
复制代码


启动函数 Start(…)做了如下 4 件事:


  1. 接收并保存模块启动时传入的*context.Context 实例


m.context = c
复制代码


  1. 初始化 metamanager 配置


InitMetaManagerConfig()
复制代码


  1. 启动一个 goroutine 同步心跳信息


go func() {...}
复制代码


  1. 启动一个循环处理各种事件


m.mainLoop()
复制代码


接下来展开分析 2、3、4。


初始化 metamanager 配置


进入 InitMetaManagerConfig()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


// InitMetaManagerConfig init meta config func InitMetaManagerConfig() {                    var err error                    groupName, err := config.CONFIG.GetValue("metamanager.context-send-group").ToString()  ...                  edgeSite, err := config.CONFIG.GetValue("metamanager.edgesite").ToBool() ...                  moduleName, err := config.CONFIG.GetValue("metamanager.context-send-module").ToString()... }

复制代码


在初始化 metamanager 配置时,从配置文件中获取了 metamanager.context-send-group、metamanager.edgesite、metamanager.context-send-module,根据获取的值对相关变量进行设置。


启动一个 goroutine 同步心跳信息


kubeedge/edge/pkg/metamanager/module.go


go func() {              period := getSyncInterval()               timer := time.NewTimer(period)               for {                           select {                           case <-timer.C:                                          timer.Reset(period)                                          msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                                          m.context.Send(MetaManagerModuleName, *msg)                         }                    }           }
复制代码


在同步心跳信息的 goroutine 中,做了如下 2 件事:


  1. 获取通信心跳的时间间隔


period := getSyncInterval()
复制代码


  1. 创建定时器,并定时发送心跳信息


timer := time.NewTimer(period) for {...}
复制代码


启动一个循环处理各种事件


进入 m.mainLoop()定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) mainLoop() {               go func() {                            for {                                                 if msg, err := m.context.Receive(m.Name()); err == nil {                                                 ...                                                m.process(msg)                            } else {                                                 ...                            }             }             }() }

复制代码


mainLoop()函数启动了一个 for 循环,在循环中主要做了 2 件事:


  1. 接收信息


msg, err := m.context.Receive(m.Name())
复制代码


  1. 对接收到的信息进行处理


m.process(msg)
复制代码


想弄明白对信息的处理过程,需要进入 m.process(…)的定义:


kubeedge/edge/pkg/metamanager/msg_processor.go


func (m *metaManager) process(message model.Message) {                    operation := message.GetOperation()              switch operation {              case model.InsertOperation:                                m.processInsert(message)              case model.UpdateOperation:                               m.processUpdate(message)            case model.DeleteOperation:                                m.processDelete(message)            case model.QueryOperation:                                m.processQuery(message)             case model.ResponseOperation:                              m.processResponse(message)             case messagepkg.OperationNodeConnection:                               m.processNodeConnection(message)            case OperationMetaSync:                               m.processSync(message)            case OperationFunctionAction:                               m.processFunctionAction(message)            case OperationFunctionActionResult:                               m.processFunctionActionResult(message)             case constants.CSIOperationTypeCreateVolume,                     constants.CSIOperationTypeDeleteVolume,                    constants.CSIOperationTypeControllerPublishVolume,                    constants.CSIOperationTypeControllerUnpublishVolume:                    m.processVolume(message)}}
复制代码


process(…)函数中主要做了如下 2 件事:


  1. 获取消息的操作的类型


operation := message.GetOperation()
复制代码


  1. 根据信息操作类型对信息进行相应处理


switch operation { ... }
复制代码


信息的操作类型包括 insert、update、delete、query、response、publish、meta-internal-sync、action、action_result 等,本文不对信息的具体处理过程剖析,感兴趣的同学可以在本文的基础上自行剖析。


2020-01-03 10:481861

评论

发布
暂无评论
发现更多内容

华为云FusionInsight连续三次获得第一,加速释放数据要素价值

华为云开发者联盟

大数据 数据湖 云原生 FusionInsight 华为云

微信朋友圈架构设计

刘洋

#架构实战营

华山论“件”:Kafka、RabbitMQ、RocketMQ技能大比拼

华为云开发者联盟

kafka RocketMQ RabbitMQ 华为云 消息中间件

IM单聊和群聊中的在线状态同步应该用“推”还是“拉”?

BeeWorks

写了这么多年后端,你知道事务脚本模式吗?

蜜糖的代码注释

Java 互联网 后端

ChaosCraft:和女朋友一起来 Hackathon 表演绝活丨滑滑蛋团队访谈

PingCAP

CVE-2021-4034 Linux Polkit 权限提升漏洞挖掘思路解读

腾讯安全云鼎实验室

云原生 漏洞分析

一起玩转LiteOS组件:TinyFrame

华为云开发者联盟

LiteOS 串口 LiteOS组件 TinyFrame

架构训练营模块一作业

苍狼

模块一作业--

Leo

「架构实战营」

ReactNative进阶(三十六):ES8 中 async 与 await 使用方法详解

No Silver Bullet

Async React Native await 1月月更

架构实战营5期模块1作业

lovles

「架构实战营」

微信业务架构 & 学生管理系统架构

凌波微步

「架构实战营」

我的架构学习之始

浪飞

「架构实战营」模块一作业

hxb

「架构实战营」

Linux之ps命令

入门小站

Linux

LabVIEW仪表盘识别(实战篇—6)

不脱发的程序猿

机器视觉 图像处理 LabVIEW 仪表盘识别

什么时候该减少质量投入?

QualityFocus

质量管理 软件测试 测试思维

Centos7下Nginx编译安装与脚本安装的记录

edd

复古冰雪传奇H5游戏详细图文架设教程

echeverra

游戏开发 游戏

模块六作业

novoer

「架构实战营」

[架构实战营]-架构实训一

邹玉麒

「架构实战营」

音视频技术如何为元宇宙提供全真稳的全新体验之漫话腾讯云音视频 | 社区征文

liuzhen007

音视频 1月月更 新春征文

获奖作品公布,快来看看有没有你!

InfoQ写作社区官方

新春征文 热门活动

WorkPlus赋能数字政府迈入发展新阶段

BeeWorks

git 使用总结

麦可

git 开发工具

高效管理邮件的方式

NinetyH

工具软件 办公效率 邮件管理

模块六

Only

架构师实战营 「架构实战营」

JavaScript 之 Proxy

编程三昧

JavaScript 前端 Proxy 1月月更

小程序电商业务微服务拆分及基础设施选型

swallowluo

架构实战营 #架构实战营 「架构实战营」

Android Studio开发flutter快捷键及文本显示技巧。

坚果

flutter 1月月更

KubeEdge源码分析之(六)metamanager_架构_华为云原生团队_InfoQ精选文章