东亚银行、岚图汽车带你解锁 AIGC 时代的数字化人才培养各赛道新模式! 了解详情
写点什么

Kube-Proxy IPVS 模式源码分析

  • 2019-10-14
  • 本文字数:6664 字

    阅读完需:约 22 分钟

Kube-Proxy IPVS模式源码分析

kube-proxy 整体逻辑结构


这张时序图描述了 kube-proxy 的整体逻辑结构,由于 kub-proxy 组件和其它的 kube-* 组件一样都是使用 pflag 和 cobra 库去构建命令行应用程序。所以先简单介绍下该包的基本使用方式:


func main() {  command := &cobra.Command{    Use:   "echo [string to echo]",    Short: "Echo anything to the screen",    Long: `echo is for echoing anything back.Echo works a lot like print, except it has a child command.`,    Args: cobra.MinimumNArgs(1),    Run: func(cmd *cobra.Command, args []string) {      fmt.Println("Print: " + strings.Join(args, " "))    },  }
command.Execute()}
复制代码


上面这段代码就是使用 cobra 包的一个最简单的例子,首先初始化 Command 结构,其中该结构中的 Run 就是最终要执行的真正逻辑。当初始化完成 Command 之后,通过 commnad.Execute 去启动应用程序。


现在看上面的图就能比较直观的理解程序的启动机制了,这张图的整体过程就是对 Commnad 结构中的 Run 进行核心逻辑实现。也就是说 kube-proxy 核心逻辑入口就是从这里开始(Command.Run)。


在 Command.Run 中主要做了如下几件事,看下面的代码:


// Run runs the specified ProxyServer.func (o *Options) Run() error {  defer close(o.errCh)    //....  proxyServer, err := NewProxyServer(o)  if err != nil {    return err  }
if o.CleanupAndExit { return proxyServer.CleanupAndExit() }
o.proxyServer = proxyServer return o.runLoop()}
复制代码


1.对 ProxyServer 实例进行初始化。


2.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 true,则会将 userspace, iptables, ipvs 三种模式之前设置的所有规则清除掉,然后直接退出。


3.如果在启动 kube-proxy 服务时,CleanupAndExit 参数设置为 flase,则会调用 runLoop 来启动 ProxyServer 服务。


首先先来看看 ProxyServer 的结构定义:


type ProxyServer struct {  Client                 clientset.Interface   EventClient            v1core.EventsGetter  IptInterface           utiliptables.Interface  IpvsInterface          utilipvs.Interface  IpsetInterface         utilipset.Interface  execer                 exec.Interface  Proxier                proxy.ProxyProvider  Broadcaster            record.EventBroadcaster  Recorder               record.EventRecorder  ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration  Conntracker            Conntracker // if nil, ignored  ProxyMode              string  NodeRef                *v1.ObjectReference  CleanupIPVS            bool  MetricsBindAddress     string  EnableProfiling        bool  OOMScoreAdj            *int32  ConfigSyncPeriod       time.Duration  HealthzServer          *healthcheck.HealthzServer}
复制代码


在 ProxyServer 结构中包含了与 kube-apiserver 通信的 Client、操作 Iptables 的 IptInterface、操作 IPVS 的 IpvsInterface、操作 IpSet 的 IpsetInterface,以及通过 ProxyMode 参数获取基于 userspace, iptables, ipvs 三种方式中的哪种使用的 Proxier。


接下来重点介绍基于 ipvs 模式实现的 Proxier, 在 ipvs 模式下 Proxier 结构的定义:


type Proxier struct {  endpointsChanges *proxy.EndpointChangeTracker  serviceChanges   *proxy.ServiceChangeTracker
//... serviceMap proxy.ServiceMap endpointsMap proxy.EndpointsMap portsMap map[utilproxy.LocalPort]utilproxy.Closeable //... syncRunner *async.BoundedFrequencyRunner // governs calls to syncProxyRules
//... iptables utiliptables.Interface ipvs utilipvs.Interface ipset utilipset.Interface exec utilexec.Interface //... ipvsScheduler string}
复制代码


在 Proxier 结构中,先介绍下 async.BoundedFrequencyRunner,其它的在介绍 ProxyServer.Run 的时候介绍。


BoundedFrequencyRunner 的定义结构如下:


type BoundedFrequencyRunner struct {  name        string        // the name of this instance  minInterval time.Duration // the min time between runs, modulo bursts  maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations fn func() // function to run lastRun time.Time // time of last run timer timer // timer for deferred runs limiter rateLimiter // rate limiter for on-demand runs}
复制代码


BoundedFrequencyRunner 结构中的 run 会异步的去定期的执行任务 fn,比如定期的执行 proxier.syncProxyRules 去创建或者更新 VirtuaServer 和 RealServer 并将 VirtualServer 的 VIP 绑定到 dummy interface(kube-ipvs0)。


下面是在 NewProxier 方法中初始化 BoundedFrequencyRunner 对象的示例:


proxier.syncRunner = async.NewBoundedFrequencyRunner(    "sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
复制代码


其中:


minSyncPeriod: 规则最小的更新时间


syncPeriod: 规则最大更新时间


proxier.syncProxyRules: 同步规则的实现函数(也是 kube-proxy 基于 ipvs 同步规则的核心实现)

ProxyServer 启动流程

这部分介绍下 ProxyServer.Run 的逻辑实现,ProxyServer 启动流程如下图所示:



在启动过程中,主要做了下面这几件事情:


  1. 启动健康检查服务 HealthzServer.

  2. 启动暴露监控指标的 MetricsServer.

  3. 如果需要调整系统的 conntrack 相关参数,则对系统的 conntrack 进行参数调整.

  4. 创建一个 informerFactory 实例,后面去通过 informerFactory 获取 kubernetes 的各类资源数据.

  5. 创建一个 ServiceConfig 实例,这个实例主要作用是实时的 WATCH kubernetes Service 资源的变化,并加入队列中,用于后续对变化的 Service 进行规则同步。

  6. 注册 servier event hander 到 Proxier.

  7. 启动 serviceConfig.


接下来详细的介绍下[4-7]这几步的流程。


ServiceConfig 的结构定义如下:


type ServiceConfig struct {  listerSynced  cache.InformerSynced  eventHandlers []ServiceHandler}
复制代码


ServiceHandler 的结构定义如下:


type ServiceHandler interface {  // OnServiceAdd is called whenever creation of new service object  // is observed.  OnServiceAdd(service *v1.Service)  // OnServiceUpdate is called whenever modification of an existing  // service object is observed.  OnServiceUpdate(oldService, service *v1.Service)  // OnServiceDelete is called whenever deletion of an existing service  // object is observed.  OnServiceDelete(service *v1.Service)  // OnServiceSynced is called once all the initial even handlers were  // called and the state is fully propagated to local cache.  OnServiceSynced()}
复制代码


创建 ServiceConfig 实例对象的具体实现如下:


func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {  result := &ServiceConfig{    listerSynced: serviceInformer.Informer().HasSynced,  }
serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: result.handleAddService, UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService, }, resyncPeriod, )
return result}
复制代码


  • 首先通过执行 serviceInformer.Informer().HasSynced 来将 kubernetes 下的所有 Service 资源同步到缓存 listerSynced 中。

  • 其次为 AddEventHandlerWithResyncPeriod 添加针对 Service 对象,添加,更新,删除的事件触发函数。当 Service 有相应的触发动作,就会调用相应的函数:handleAddService、handleUpdateService 和 handleDeleteService。


我们看看 handleAddService 触发函数的实现逻辑,具体代码如下:


func (c *ServiceConfig) handleAddService(obj interface{}) {  service, ok := obj.(*v1.Service)  if !ok {    utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))    return  }  for i := range c.eventHandlers {    klog.V(4).Info("Calling handler.OnServiceAdd")    c.eventHandlers[i].OnServiceAdd(service)  }}
复制代码


当 watch 到 kubernetes 集群中有新的 Service 被创建之后,会触发 handleAddService 函数,并在该函数中遍历 eventHandlers 分别去调用 OnServiceAdd 来对 proxier 结构中的 serviceChanages 进行更新并去同步相应的规则。


OnServiceAdd 的具体实现逻辑如下:


// OnServiceAdd is called whenever creation of new service object is observed.func (proxier *Proxier) OnServiceAdd(service *v1.Service) {  proxier.OnServiceUpdate(nil, service)}
// OnServiceUpdate is called whenever modification of an existing service object is observed.func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() }}
复制代码


ServiceChangeTracker 的结构定义如下:


// ServiceChangeTracker carries state about uncommitted changes to an arbitrary number of// Services, keyed by their namespace and name.type ServiceChangeTracker struct {  // lock protects items.  lock sync.Mutex  // items maps a service to its serviceChange.  items map[types.NamespacedName]*serviceChange  // makeServiceInfo allows proxier to inject customized information when processing service.  makeServiceInfo makeServicePortFunc  // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.  isIPv6Mode *bool  recorder   record.EventRecorder}
复制代码


serviceChanage 的结构定义如下:


// serviceChange contains all changes to services that happened since proxy rules were synced.  For a single object,// changes are accumulated, i.e. previous is state from before applying the changes,// current is state after applying all of the changes.type serviceChange struct {  previous ServiceMap  current  ServiceMap}
复制代码


到这里在回过头来看上面的基于 IPVS 实现的 Proxier 的整体流程就完全通了,ProxyServer.Run 函数在启动时,通过 kubernetes LIST/WATCH 机制去实时的感知 kubernetes 集群 Service 资源的变化,然后不断的在更新 Proxier 结构中的 ServiceChanges,然后将变化的 Service 保存在 ServiceChanges 结构中的 ServiceMap 中,给后续的 async.BoundedFrequencyRunner 去执行同步规则函数 syncProxyRules 来使用。


8. endpointConfig 的实现机制和 serviceConfig 的机制完全一样,这里就不在详细的介绍了。


9.上面做的所有预处理工作,会在 informerFactory.Start 这步生效。


10. birthCry 的作用就是通过 event 的方式通知 kubernetes, kube-proxy 这边的所有准备工作都处理好了,我要启动了。


  s.Recorder.Eventf(s.NodeRef, api.EventTypeNormal, "Starting", "Starting kube-proxy.")}
复制代码


11. 最终通过 SyncLoop 启动 kube-proxy 服务,并立刻执行 syncProxyRules 先来一遍同步再说.之后便会通过异步的方式定期的去同步 IPVS, Iptables, Ipset 的规则。


而 syncProxyRules 函数是 kube-proxy 实现的核心。主体逻辑是遍历 ServiceMap 并遍历 ServiceMap 下的 endpointsMap 及创建的 Service 类型(如: CLusterIP, Loadbalancer, NodePort)去分别创建相应的 IPVS 规则。


syncProxyRules 的函数实现定义如下:


func (proxier *Proxier) syncProxyRules() {  //.....
// Build IPVS rules for each service. for svcName, svc := range proxier.serviceMap { //......
// Handle traffic that loops back to the originator with SNAT. for _, e := range proxier.endpointsMap[svcName] { //.... }
// Capture the clusterIP. // ipset call entry := &utilipset.Entry{ IP: svcInfo.ClusterIP().String(), Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name)) continue } proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) // ipvs call serv := &utilipvs.VirtualServer{ Address: svcInfo.ClusterIP(), Port: uint16(svcInfo.Port()), Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } // Set session affinity flag and timeout for IPVS service if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcNameString, serv, true); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. if err := proxier.syncEndpoint(svcName, false, serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { klog.Errorf("Failed to sync service: %v, err: %v", serv, err) }
// Capture externalIPs. for _, externalIP := range svcInfo.ExternalIPStrings() { //.... }
// Capture load-balancer ingress. for _, ingress := range svcInfo.LoadBalancerIPStrings() { //..... }
if svcInfo.NodePort() != 0 { //.... } }
// sync ipset entries for _, set := range proxier.ipsetList { set.syncIPSetEntries() }
// Tail call iptables rules for ipset, make sure only call iptables once // in a single loop per ip set. proxier.writeIptablesRules()
// Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes())
}
复制代码

总结

kube-proxy 的代码逻辑还是比较简洁的,整体的思想就是 kube-proxy 服务去 watch kubernetes 集群的 Service 和 Endpoint 对象,当这两个资源对象有状态变化时,会把它们保存在 ServiceMap 和 EndPonintMap 中,然后会通过 async.BoundedFrequencyRunner 去异步的执行 syncProxyRules 去下发规则。


本文转载自公众号 360 云计算(ID:hulktalk)


原文链接


https://mp.weixin.qq.com/s?__biz=MzU4ODgyMDI0Mg==&mid=2247486894&idx=1&sn=c39bafbcc79e6ea0a25fcb077a0b1128&chksm=fdd7b7d3caa03ec520bb4ef2ec98c498a1646e38f66d684b1124fe1aa2eb841bf4f2f080dec9&scene=27#wechat_redirect


2019-10-14 08:001904

评论

发布
暂无评论
发现更多内容

2023-12-09:用go语言,给你两个整数数组 arr1 和 arr2, 返回使 arr1 严格递增所需要的最小「操作」数(可能为 0)。 每一步「操作」中,你可以分别从 arr1 和 arr2

福大大架构师每日一题

福大大架构师每日一题

qemu 单步调试linux driver

无人知晓

qemu driver 单步调试

一款LED大灯泡设计方案

二哈侠

23 | 二叉树基础(上):什么样的二叉树适合用数组来存储

鲁米

通义家族大模型总结

多啦A梦

大模型

Java http 接口请求详解。

百度搜索:蓝易云

Java Linux HTTP 云服务器

专业直观Git客户端:Fork 免激活最新

胖墩儿不胖y

git Mac软件

广西汽车集团携手时习知打造数字化学习平台,加速人才转型

轶天下事

一种在数据量比较大、字段变化频繁场景下的大数据架构设计方案

编程攻略

大数据 架构设计

qemu + vscode图形化调试linux kernel

无人知晓

Linux Kenel qemu 内核调试 linux

闲聊ArrayList的那些事儿

是月月啊2023

Java 面试题

在大数据量中Spark数据倾斜问题定位排查及解决

五分钟学大数据

大数据 spark 年度总结 数据倾斜

24 | 二叉树基础(下):有了如此高效的散列表,为什么还需要二叉树?

鲁米

qemu搭建arm64 linux kernel调试环境

无人知晓

qemu调试kernel启动(从第一行汇编开始)

无人知晓

qemu kernel 内核调试 linux

20 | 散列表(下):为什么散列表和链表经常会一起使用

鲁米

ADA算力DAPP挖矿系统开发丨详情

l8l259l3365

qemu单步调试arm64 linux kernel

无人知晓

qemu 内核调试 linux

mac重复文件识别软件 Advanced Duplicate Cleaner直装最新

mac大玩家j

Mac软件 重复识别软件 重复查找工具

Helio 升级为 LISTA DAO,开启多链时代新篇章并宣布积分空投计划

股市老人

开发体育竞赛直播系统平台:创新与活力的释放,引爆比赛激情

软件开发-梦幻运营部

SQL PRIMARY KEY 约束- 唯一标识表中记录的关键约束

小万哥

MySQL 数据库 sql 程序员 后端开发

我与Stable Diffusion的“缘”

AI 大模型训练 Stable Diffustion 大模型推理

面试官:线程崩了,会导致 JVM 崩溃吗?

是月月啊2023

JVM;

Dockerfile和搭建 Docker 私有仓库:让容器化部署变得更简单

百度搜索:蓝易云

Docker Linux 运维 Dockerfile 云服务器

手撸了一个API网关,代码已上传,自取~

是月月啊2023

Java

SpringSecurity全限验证实战!

是月月啊2023

Java 面试题

2023年大数据个人技术能力提升心得体会

大数据技术指南

大数据 技术总结

25 | 红黑树(上):为什么工程中都用红黑树这种二叉树

鲁米

app开发

Geek_8da502

Helio 升级为 LISTA DAO,开启多链时代新篇章并宣布积分空投计划

EOSdreamer111

Kube-Proxy IPVS模式源码分析_语言 & 开发_王希刚_InfoQ精选文章