【AICon】AI 基础设施、LLM运维、大模型训练与推理,一场会议,全方位涵盖! >>> 了解详情
写点什么

KubeEdge 源码分析之(四)edgehub

  • 2019-12-17
  • 本文字数:4137 字

    阅读完需:约 14 分钟

KubeEdge源码分析之(四)edgehub

前言


本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。


cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。


本文对 edgecore 的 edgehub 模块进行剖析,edgehub 作为 edge 部分与 cloud 部分进行交互的门户,有必要将 edgehub 相关内容彻底分析清楚,为使用过程中的故障排查和未来的功能扩展与性能优化都会有很大的帮助。对 edgehub 的剖析,具体包括如下几部分:


  1. edgehub 的 struct 调用链剖析

  2. edgehub 的具体逻辑剖析


edgehub 的 struct 组成剖析


从 edgecore 模块注册函数入手:


kubeedge/edge/cmd/edgecore/app/server.go


// registerModules register all the modules started in edgecore


func registerModules() {


devicetwin.Register()



}


进入 registerModules()函数中的 devicetwin.Register(),具体如下:


kubeedge/edge/pkg/devicetwin/devicetwin.go


// Register register edgehub


func Register() {


core.Register(&EdgeHub{


controller: NewEdgeHubController(),


})


}


顺着 Register()函数中 EdgeHub struct 的实例化语句进入 EdgeHub struct 定义:


kubeedge/edge/pkg/edgehub/module.go


//EdgeHub defines edgehub object structure


type EdgeHub struct {


context *context.Context


controller *Controller


}


EdgeHub struct 中包含*context.Context 和*Controller 两个属性:


  • *context.Context 在 kubeedge 源码分析系列之 cloudcore 中,笔者已经分析过*context.Context,它是一个基于 go-channels 的消息框架,edgecore 用它作为各功能模块之间通信的消息管道。

  • Controller edgehub 的主要功能载体;


进入 Controller struct 的定义:


kubeedge/edge/pkg/edgehub/controller.go


//Controller is EdgeHub controller object


type Controller struct {


context *context.Context


chClient clients.Adapter


config *config.ControllerConfig


stopChan chan struct{}


syncKeeper map[string]chan model.Message


keeperLock sync.RWMutex


}


从 Controller struct 的定义可以确定,Controller struct 是 edgehub 核心功能载体没错了。


到此,edgehub 的 struct 组成剖析就结束了,接下来剖析 edgehub 的具体逻辑。


edgehub 的具体逻辑剖析


回到 edgehub 的注册函数,开始剖析 edgehub 相关的逻辑:


kubeedge/edge/pkg/edgehub/module.go


// Register register edgehub


func Register() {


core.Register(&EdgeHub{


controller: NewEdgeHubController(),


})


}


在 Register()函数中对 EdgeHub struct 的初始化只是对 EdgeHub struct 中的 controller 进行了初始化,进入 controller 的初始化函数:


kubeedge/edge/pkg/edgehub/controller.go


//NewEdgeHubController creates and returns a EdgeHubController object


func NewEdgeHubController() *Controller {


return &Controller{


config: &config.GetConfig().CtrConfig,


stopChan: make(chan struct{}),


syncKeeper: make(map[string]chan model.Message),


}


}


NewEdgeHubController()函数中嵌套了一个获取配置信息的函数调用:


config: &config.GetConfig().CtrConfig


进入 config.GetConfig()函数定义:


kubeedge/edge/pkg/edgehub/config/config.go


var edgeHubConfig EdgeHubConfig



//GetConfig returns the EdgeHub configuration object


func GetConfig() *EdgeHubConfig {


return &edgeHubConfig


}


GetConfig()函数只返回了 &edgeHubConfig,而 edgeHubConfig 是一个 EdgeHubConfig struct 类型的全局变量,至于该变量是在哪里被赋值,怎么赋值的,暂且立个疑问 flag:


EdgeHubConfig 赋值疑问


到此,EdgeHub struct 的初始化就告一段落了,下面分析 EdgeHub 的启动逻辑:


kubeedge/edge/pkg/edgehub/module.go


//Start sets context and starts the controller


func (eh *EdgeHub) Start(c *context.Context) {


eh.context = c


eh.controller.Start©


}


EdgeHub 启动函数 Start(…)只做了 2 件事:


1. 接收并存储传入的消息管道


eh.context = c


2. 启动 EdgeHub 的 controller


eh.controller.Start©


由前面的分析可知 EdgeHub 的 controller 作为 EdgeHub 功能的主要载体,其启动函数也必将囊括 EdgeHub 绝大部分启动逻辑,继续进入 controller 的启动函数定义:


kubeedge/edge/pkg/edgehub/controller.go


//Start will start EdgeHub


func (ehc *Controller) Start(ctx *context.Context) {


config.InitEdgehubConfig()


for {


err := ehc.initial(ctx)



err = ehc.chClient.Init()



// execute hook func after connect


ehc.pubConnectInfo(true)


go ehc.routeToEdge()


go ehc.routeToCloud()


go ehc.keepalive()


// wait the stop singal


// stop authinfo manager/websocket connection


<-ehc.stopChan


ehc.chClient.Uninit()


// execute hook fun after disconnect


ehc.pubConnectInfo(false)


// sleep one period of heartbeat, then try to connect cloud hub again


time.Sleep(ehc.config.HeartbeatPeriod * 2)


// clean channel


clean:


for {


select {


case <-ehc.stopChan:


default:


break clean


}


}


}


}


从 Controller 的启动函数 Start(…)的定义,可以清楚地看到,其中包含了 EdgeHub 的初始化、各种业务 go routine 的启动和最后的退出清理,下面逐个深入剖析:


初始化 EdgehubConfig


config.InitEdgehubConfig()


进入 config.InitEdgehubConfig()函数定义:


kubeedge/edge/pkg/edgehub/config/config.go


// InitEdgehubConfig init edgehub config


func InitEdgehubConfig() {


err := getControllerConfig()



if edgeHubConfig.CtrConfig.Protocol == protocolWebsocket {


err = getWebSocketConfig()



} else if edgeHubConfig.CtrConfig.Protocol == protocolQuic {


err = getQuicConfig()



} else {



}


}


InitEdgehubConfig()函数首先通过 err := getControllerConfig()获得 EdgeHub Controller 的配置信息,然后通过获得的配置信息中的 Protocol 字段来判断是哪个协议,最后根据判断结果获取相应的协议绑定的配置信息或报错。


针对以上获取配置的操作,重点分析获得 EdgeHub Controller 的配置信息,进入 getControllerConfig():


kubeedge/edge/pkg/edgehub/config/config.go


var edgeHubConfig EdgeHubConfig



func getControllerConfig() error {


protocol, err := config.CONFIG.GetValue(“edgehub.controller.protocol”).ToString()



edgeHubConfig.CtrConfig.Protocol = protocol


heartbeat, err := config.CONFIG.GetValue(“edgehub.controller.heartbeat”).ToInt()



edgeHubConfig.CtrConfig.HeartbeatPeriod = time.Duration(heartbeat) * time.Second


projectID, err := config.CONFIG.GetValue(“edgehub.controller.project-id”).ToString()



edgeHubConfig.CtrConfig.ProjectID = projectID


nodeID, err := config.CONFIG.GetValue(“edgehub.controller.node-id”).ToString()



edgeHubConfig.CtrConfig.NodeID = nodeID


return nil


}


getControllerConfig()获取 edgehub.controller.*相关的配置信息并赋值给变量 edgeHubConfig,到此前面立的“EdgeHubConfig 赋值疑问”flag 也得到了解答。


EdgeHub Controller 初始化


err := ehc.initial(ctx)


进入 ehc.initial(…)函数定义:


kubeedge/edge/pkg/edgehub/controller.go


func (ehc *Controller) initial(ctx *context.Context) (err error) {


config.GetConfig().WSConfig.URL, err = bhconfig.CONFIG.GetValue(“edgehub.websocket.url”).ToString()



cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())



ehc.context = ctx


ehc.chClient = cloudHubClient


return nil


}


  1. 第一行单独获取 edgehub.websocket.url 感觉在前面“初始化 EdgehubConfig“中的 websocket 配置信息初始化部分重复,在此立一个源码问题 flag:

  2. 获取 websocket 配置信息重复

  3. 获取 cloudhub client


cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())


进入 clients.GetClient(…)定义:


kubeedge/edge/pkg/edgehub/factory.go


//GetClient returns an Adapter object with new web socket


func GetClient(clientType string, config *config.EdgeHubConfig) (Adapter, error) {


switch clientType {


case ClientTypeWebSocket:


websocketConf := wsclient.WebSocketConfig{



}


return wsclient.NewWebSocketClient(&websocketConf), nil


case ClientTypeQuic:


quicConfig := quicclient.QuicConfig{



}


return quicclient.NewQuicClient(&quicConfig), nil


default:


klog.Errorf(“Client type: %s is not supported”, clientType)


}


return nil, ErrorWrongClientType


}


从 GetClient(…)函数定义可以知道,该函数定义了 ClientTypeWebSocket、ClientTypeQuic 两种 client 类型,两者都实现了 Adapter interface,下面遇到 Adapter 类型的 client 变量时,记得对应此处的 ClientTypeWebSocket、ClientTypeQuic。


cloud client 初始化


err = ehc.chClient.Init()


ehc.chClient.Init()函数对应”获取 cloudhub client“中 ClientTypeWebSocket、ClientTypeQuic 的 Init()方法,想要了解 Init()具体做了哪些事情,感兴趣的同学可以在本文的基础上自行剖析。


1. 向 edgecore 各模块广播已经连接成功的消息


// execute hook func after connect


ehc.pubConnectInfo(true)


2. 将从 cloud 部分收到的消息转发给指定 edge 部分的指定模块


go ehc.routeToEdge()


3. 将从 edge 部分的消息转发给 cloud 部分


go ehc.routeToCloud()


4. 向 cloud 部分发送心跳信息


go ehc.keepalive()


5. 剩下的步骤都是在 edgehub 模块退出时的一些清理操作。


到此 edgecore 组件的 edgehub 模块的剖析就结束了,由于篇幅原因,很多思路比较清楚的就没有展开分析,感兴趣的同学可以在本文的基础上自习剖析。


免费直播课


《KubeEdge 技术详解与实战》


今晚 8:00 开播


第 5 课:KubeEdge EdgeMesh 设计原理


直播链接


https://huaweicloud.bugu.mudu.tv/watch/rm2jzlo5


作者 | 之江实验室端边云操作系统团队


原文链接:http://suo.im/6aIHSZ


2019-12-17 14:311817

评论

发布
暂无评论
发现更多内容

火山引擎云原生数据仓库ByteHouse技术白皮书V1.0 (Ⅲ)

字节跳动数据平台

数据仓库 云原生 白皮书 数据仓库服务 企业号 4 月 PK 榜

Kurator v0.3.0版本发布!助力企业实现多云异构管理

华为云开发者联盟

开源 后端 华为云 华为云开发者联盟 企业号 4 月 PK 榜

【堡垒机小知识】堡垒机能记录操作时间、操作数据等等吗?

行云管家

网络安全 堡垒机

集简云开放平台是什么?

集简云开放平台

简单的视频格式转换器:MacX Video Converter Pro中文版

真大的脸盆

Mac Mac 软件 视频格式转换 格式转换器

火山引擎DataTester:让企业“无代码”也能用起来的A/B实验平台

字节跳动数据平台

AB testing实战 无代码 A/B 测试 企业号 4 月 PK 榜 企业增长

女朋友要我讲解@Controller注解的原理,真是难为我了

Java你猿哥

Java spring Spring 配置解析

行云管家堡垒机有免费的吗?谁能告诉一下!

行云管家

高新企业 堡垒机 行云管家

BT!GitHub开源阿里Java性能调优百宝书仅3小时,标星竟超过30k

Java你猿哥

Java JVM 性能调优 SSM框架 Java工程师

前端沙箱利用这些特性实现代码的隔离与限制

没有用户名丶

GitHub和 Gitee联合编写最新版20w字Java全栈面试手册,简直无敌!

Java你猿哥

Java java面试 SSM框架 Java面经

硬核!万字神文精解高并发高可用系统实战,分布式系统一致性文档

做梦都在改BUG

Java 高可用 高并发 分布式一致性

阿里大神整理的Java核心知识点和面试官常问到的知识点,压压惊

会踢球的程序源

Java 面试 求职 java面试 Java构架

字节面试官:你没有高并发、性能调优经验,为什么录取你?

做梦都在改BUG

Java 高并发 性能调优

Scrum敏捷研发和项目管理

顿顿顿

Scrum 敏捷开发 敏捷开发流程 leangoo 敏捷开发管理工具

挑战 30 天学完 Python:Day9 条件语句

MegaQi

Python 挑战30天学完Python 三周年连更

Java中线程的6种状态详解(NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING、TERMINATED)

共饮一杯无

Java 线程 线程状态 三周年连更

GitHub上线重量级分布式架构原理设计笔记,开源的东西看着就是爽

Java你猿哥

架构 分布式 分布式架构

阅读完synchronized和ReentrantLock的源码后,我竟发现其完全相似

做梦都在改BUG

Java 源码 synchronized ReentrantLock

华为云新一代iPaaS全域融合集成平台全新升级

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 4 月 PK 榜

火山引擎 DataLeap下Notebook系列文章一:技术选型之路

字节跳动数据平台

notebook 数据研发 企业号 4 月 PK 榜

热榜!Alibaba最新发布「10亿级并发系统设计文档」Git狂揽9000星

Java你猿哥

数据库 架构 分布式 架构设计 并发系统

带你一同认识和使用JPA框架进行开发你的应用服务

Java你猿哥

Java SSM框架 jpa Java工程师

安装Zookeeper和Kafka集群

Java你猿哥

Java kafka zookeeper SSM框架 Java工程师

从零学习SDK(7)如何打包SDK

MobTech袤博科技

Linux:管道命令与文本处理三剑客(grep、sed、awk)

会踢球的程序源

Java Linux

全量通过,华为云GaussDB首批完成信通院全密态数据库评测

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 4 月 PK 榜

阅读完synchronized和ReentrantLock的源码后,竟发现其完全相似

Java你猿哥

并发编程 并发 synchronized SSM框架 ReentrantLock

如何用scrum敏捷工具做迭代规划及迭代执行。

顿顿顿

Scrum Sprint 敏捷开发管理工具 敏捷工具 迭代规划

Gradio:快速构建你的webApp

AIWeker

Python 三周年连更 Gradio

ChatGPT无需API开发连接第三方系统,让舆情自动监控

集简云开放平台

数据集成 数据集成平台 Chat

KubeEdge源码分析之(四)edgehub_云原生_华为云原生团队_InfoQ精选文章