KubeEdge 源码分析之(四)edgehub

阅读数:247 2019 年 12 月 17 日 14:31

KubeEdge源码分析之(四)edgehub

前言

本系列的源码分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上进行的。

cloudcore 部分的源码分析是在 kubeedge 源码分析系列之整体架构基础上展开的,如果没有阅读过 kubeedge 源码分析系列之整体架构,直接阅读本文,会感觉比较突兀。

本文对 edgecore 的 edgehub 模块进行剖析,edgehub 作为 edge 部分与 cloud 部分进行交互的门户,有必要将 edgehub 相关内容彻底分析清楚,为使用过程中的故障排查和未来的功能扩展与性能优化都会有很大的帮助。对 edgehub 的剖析,具体包括如下几部分:

  1. edgehub 的 struct 调用链剖析
  2. edgehub 的具体逻辑剖析

edgehub 的 struct 组成剖析

从 edgecore 模块注册函数入手:

kubeedge/edge/cmd/edgecore/app/server.go

// registerModules register all the modules started in edgecore

func registerModules() {

devicetwin.Register()

}

进入 registerModules() 函数中的 devicetwin.Register(),具体如下:

kubeedge/edge/pkg/devicetwin/devicetwin.go

// Register register edgehub

func Register() {

core.Register(&EdgeHub{

controller: NewEdgeHubController(),

})

}

顺着 Register() 函数中 EdgeHub struct 的实例化语句进入 EdgeHub struct 定义:

kubeedge/edge/pkg/edgehub/module.go

//EdgeHub defines edgehub object structure

type EdgeHub struct {

context *context.Context

controller *Controller

}

EdgeHub struct 中包含 *context.Context 和 *Controller 两个属性:

  • *context.Context 在 kubeedge 源码分析系列之 cloudcore 中,笔者已经分析过 *context.Context,它是一个基于 go-channels 的消息框架,edgecore 用它作为各功能模块之间通信的消息管道。
  • Controller edgehub 的主要功能载体;

进入 Controller struct 的定义:

kubeedge/edge/pkg/edgehub/controller.go

//Controller is EdgeHub controller object

type Controller struct {

context *context.Context

chClient clients.Adapter

config *config.ControllerConfig

stopChan chan struct{}

syncKeeper map[string]chan model.Message

keeperLock sync.RWMutex

}

从 Controller struct 的定义可以确定,Controller struct 是 edgehub 核心功能载体没错了。

到此,edgehub 的 struct 组成剖析就结束了,接下来剖析 edgehub 的具体逻辑。

edgehub 的具体逻辑剖析

回到 edgehub 的注册函数,开始剖析 edgehub 相关的逻辑:

kubeedge/edge/pkg/edgehub/module.go

// Register register edgehub

func Register() {

core.Register(&EdgeHub{

controller: NewEdgeHubController(),

})

}

在 Register() 函数中对 EdgeHub struct 的初始化只是对 EdgeHub struct 中的 controller 进行了初始化,进入 controller 的初始化函数:

kubeedge/edge/pkg/edgehub/controller.go

//NewEdgeHubController creates and returns a EdgeHubController object

func NewEdgeHubController() *Controller {

return &Controller{

config: &config.GetConfig().CtrConfig,

stopChan: make(chan struct{}),

syncKeeper: make(map[string]chan model.Message),

}

}

NewEdgeHubController() 函数中嵌套了一个获取配置信息的函数调用:

config: &config.GetConfig().CtrConfig

进入 config.GetConfig() 函数定义:

kubeedge/edge/pkg/edgehub/config/config.go

var edgeHubConfig EdgeHubConfig

//GetConfig returns the EdgeHub configuration object

func GetConfig() *EdgeHubConfig {

return &edgeHubConfig

}

GetConfig() 函数只返回了 &edgeHubConfig,而 edgeHubConfig 是一个 EdgeHubConfig struct 类型的全局变量,至于该变量是在哪里被赋值,怎么赋值的,暂且立个疑问 flag:

EdgeHubConfig 赋值疑问

到此,EdgeHub struct 的初始化就告一段落了,下面分析 EdgeHub 的启动逻辑:

kubeedge/edge/pkg/edgehub/module.go

//Start sets context and starts the controller

func (eh *EdgeHub) Start(c *context.Context) {

eh.context = c

eh.controller.Start©

}

EdgeHub 启动函数 Start(…) 只做了 2 件事:

1. 接收并存储传入的消息管道

eh.context = c

2. 启动 EdgeHub 的 controller

eh.controller.Start©

由前面的分析可知 EdgeHub 的 controller 作为 EdgeHub 功能的主要载体,其启动函数也必将囊括 EdgeHub 绝大部分启动逻辑,继续进入 controller 的启动函数定义:

kubeedge/edge/pkg/edgehub/controller.go

//Start will start EdgeHub

func (ehc *Controller) Start(ctx *context.Context) {

config.InitEdgehubConfig()

for {

err := ehc.initial(ctx)

err = ehc.chClient.Init()

// execute hook func after connect

ehc.pubConnectInfo(true)

go ehc.routeToEdge()

go ehc.routeToCloud()

go ehc.keepalive()

// wait the stop singal

// stop authinfo manager/websocket connection

<-ehc.stopChan

ehc.chClient.Uninit()

// execute hook fun after disconnect

ehc.pubConnectInfo(false)

// sleep one period of heartbeat, then try to connect cloud hub again

time.Sleep(ehc.config.HeartbeatPeriod * 2)

// clean channel

clean:

for {

select {

case <-ehc.stopChan:

default:

break clean

}

}

}

}

从 Controller 的启动函数 Start(…) 的定义,可以清楚地看到,其中包含了 EdgeHub 的初始化、各种业务 go routine 的启动和最后的退出清理,下面逐个深入剖析:

初始化 EdgehubConfig

config.InitEdgehubConfig()

进入 config.InitEdgehubConfig() 函数定义:

kubeedge/edge/pkg/edgehub/config/config.go

// InitEdgehubConfig init edgehub config

func InitEdgehubConfig() {

err := getControllerConfig()

if edgeHubConfig.CtrConfig.Protocol == protocolWebsocket {

err = getWebSocketConfig()

} else if edgeHubConfig.CtrConfig.Protocol == protocolQuic {

err = getQuicConfig()

} else {

}

}

InitEdgehubConfig() 函数首先通过 err := getControllerConfig() 获得 EdgeHub Controller 的配置信息,然后通过获得的配置信息中的 Protocol 字段来判断是哪个协议,最后根据判断结果获取相应的协议绑定的配置信息或报错。

针对以上获取配置的操作,重点分析获得 EdgeHub Controller 的配置信息,进入 getControllerConfig():

kubeedge/edge/pkg/edgehub/config/config.go

var edgeHubConfig EdgeHubConfig

func getControllerConfig() error {

protocol, err := config.CONFIG.GetValue(“edgehub.controller.protocol”).ToString()

edgeHubConfig.CtrConfig.Protocol = protocol

heartbeat, err := config.CONFIG.GetValue(“edgehub.controller.heartbeat”).ToInt()

edgeHubConfig.CtrConfig.HeartbeatPeriod = time.Duration(heartbeat) * time.Second

projectID, err := config.CONFIG.GetValue(“edgehub.controller.project-id”).ToString()

edgeHubConfig.CtrConfig.ProjectID = projectID

nodeID, err := config.CONFIG.GetValue(“edgehub.controller.node-id”).ToString()

edgeHubConfig.CtrConfig.NodeID = nodeID

return nil

}

getControllerConfig() 获取 edgehub.controller.* 相关的配置信息并赋值给变量 edgeHubConfig,到此前面立的“EdgeHubConfig 赋值疑问”flag 也得到了解答。

EdgeHub Controller 初始化

err := ehc.initial(ctx)

进入 ehc.initial(…) 函数定义:

kubeedge/edge/pkg/edgehub/controller.go

func (ehc *Controller) initial(ctx *context.Context) (err error) {

config.GetConfig().WSConfig.URL, err = bhconfig.CONFIG.GetValue(“edgehub.websocket.url”).ToString()

cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())

ehc.context = ctx

ehc.chClient = cloudHubClient

return nil

}

  1. 第一行单独获取 edgehub.websocket.url 感觉在前面“初始化 EdgehubConfig“中的 websocket 配置信息初始化部分重复,在此立一个源码问题 flag:
  2. 获取 websocket 配置信息重复
  3. 获取 cloudhub client

cloudHubClient, err := clients.GetClient(ehc.config.Protocol, config.GetConfig())

进入 clients.GetClient(…) 定义:

kubeedge/edge/pkg/edgehub/factory.go

//GetClient returns an Adapter object with new web socket

func GetClient(clientType string, config *config.EdgeHubConfig) (Adapter, error) {

switch clientType {

case ClientTypeWebSocket:

websocketConf := wsclient.WebSocketConfig{

}

return wsclient.NewWebSocketClient(&websocketConf), nil

case ClientTypeQuic:

quicConfig := quicclient.QuicConfig{

}

return quicclient.NewQuicClient(&quicConfig), nil

default:

klog.Errorf(“Client type: %s is not supported”, clientType)

}

return nil, ErrorWrongClientType

}

从 GetClient(…) 函数定义可以知道,该函数定义了 ClientTypeWebSocket、ClientTypeQuic 两种 client 类型,两者都实现了 Adapter interface,下面遇到 Adapter 类型的 client 变量时,记得对应此处的 ClientTypeWebSocket、ClientTypeQuic。

cloud client 初始化

err = ehc.chClient.Init()

ehc.chClient.Init() 函数对应”获取 cloudhub client“中 ClientTypeWebSocket、ClientTypeQuic 的 Init() 方法,想要了解 Init() 具体做了哪些事情,感兴趣的同学可以在本文的基础上自行剖析。

1. 向 edgecore 各模块广播已经连接成功的消息

// execute hook func after connect

ehc.pubConnectInfo(true)

2. 将从 cloud 部分收到的消息转发给指定 edge 部分的指定模块

go ehc.routeToEdge()

3. 将从 edge 部分的消息转发给 cloud 部分

go ehc.routeToCloud()

4. 向 cloud 部分发送心跳信息

go ehc.keepalive()

5. 剩下的步骤都是在 edgehub 模块退出时的一些清理操作。

到此 edgecore 组件的 edgehub 模块的剖析就结束了,由于篇幅原因,很多思路比较清楚的就没有展开分析,感兴趣的同学可以在本文的基础上自习剖析。

免费直播课

《KubeEdge 技术详解与实战》

今晚 8:00 开播

第 5 课:KubeEdge EdgeMesh 设计原理

直播链接

https://huaweicloud.bugu.mudu.tv/watch/rm2jzlo5

作者 | 之江实验室端边云操作系统团队

原文链接: http://suo.im/6aIHSZ

评论

发布