kubernetes-notes
  • 目录
    • 序言
  • 云原生体系
    • 12-Factor
    • K8S知识体系
  • 安装与配置
    • 部署k8s集群
      • 使用kubeadm安装生产环境kubernetes
      • 使用kubespray安装kubernetes
      • 使用minikube安装kubernetes
      • 使用kind安装kubernetes
    • k8s证书及秘钥
    • k8s版本说明
  • 基本概念
    • kubernetes架构
      • Kubernetes总架构图
      • 基于Docker及Kubernetes技术构建容器云(PaaS)平台概述
    • kubernetes对象
      • 理解kubernetes对象
      • kubernetes常用对象说明
    • Pod
      • Pod介绍
      • Pod定义文件
      • Pod生命周期
      • Pod健康检查
      • Pod存储卷
      • Pod控制器
      • Pod伸缩与升级
    • 配置
      • ConfigMap
    • Workload
  • 核心原理
    • 核心组件
      • Api Server
      • Controller Manager
      • Scheduler
      • Kubelet
    • 流程图
      • Pod创建流程
      • PVC创建流程
  • 容器网络
    • Docker网络
    • K8S网络
    • 网络插件
      • Flannel介绍
    • CNI
      • CNI接口介绍
      • Macvlan介绍
  • 容器存储
    • 存储卷概念
      • Volume
      • Persistent Volume
      • Persistent Volume Claim
      • Storage Class
      • Dynamic Volume Provisioning
    • CSI
      • csi-cephfs-plugin
      • 部署csi-cephfs
      • 部署cephfs-provisioner
      • FlexVolume介绍
  • 资源隔离
    • 资源配额
    • Pod限额
    • 资源服务质量
    • Lxcfs资源视图隔离
  • 运维指南
    • kubectl工具
      • kubectl安装与配置
      • kubectl命令说明
      • kubectl命令别名
    • kubernetes集群问题排查
    • 节点调度
      • 安全迁移节点
      • 指定Node调度与隔离
    • 镜像仓库配置
      • 配置私有的镜像仓库
      • 拉取私有镜像
  • 开发指南
    • client-go的使用及源码分析
    • CSI插件开发
      • nfs-client-provisioner源码分析
      • csi-provisioner源码分析
    • operator开发
      • kubebuilder的使用
  • 问题排查
    • 节点相关问题
      • keycreate permission denied
      • Cgroup不支持pid资源
      • Cgroup子系统无法挂载
    • Pod驱逐
    • 镜像拉取失败问题
    • PVC Terminating
  • 源码分析
    • Kubernetes源码分析笔记
    • kubelet
      • NewKubeletCommand
      • NewMainKubelet
      • startKubelet
      • syncLoopIteration
      • syncPod
    • kube-controller-manager
      • NewControllerManagerCommand
      • DeploymentController
      • Informer机制
    • kube-scheduler
      • NewSchedulerCommand
      • registerAlgorithmProvider
      • scheduleOne
      • findNodesThatFit
      • PrioritizeNodes
      • preempt
    • kube-apiserver
      • NewAPIServerCommand
  • Runtime
    • Runtime
      • Runc和Containerd概述
    • Containerd
      • 安装Containerd
    • Docker
      • Docker学习笔记
    • Kata Container
      • kata容器简介
      • kata配置
    • GPU
      • nvidia-device-plugin介绍
  • Etcd
    • Etcd介绍
    • Raft算法
    • Etcd启动配置参数
    • Etcd访问控制
    • etcdctl命令工具
      • etcdctl命令工具-V3
      • etcdctl命令工具-V2
    • Etcd中的k8s数据
    • Etcd-Operator的使用
  • 多集群管理
    • k8s多集群管理的思考
    • Virtual Kubelet
      • Virtual Kubelet介绍
      • Virtual Kubelet 命令
    • Karmada
      • Karmada介绍
  • 边缘容器
    • KubeEdge介绍
    • KubeEdge源码分析
      • cloudcore
      • edgecore
    • OpenYurt部署
  • 虚拟化
    • 虚拟化相关概念
    • KubeVirt
      • KubeVirt的介绍
      • KubeVirt的使用
  • 监控体系
    • 监控体系介绍
    • cAdvisor介绍
    • Heapster介绍
    • Influxdb介绍
由 GitBook 提供支持

www.huweihuang.com

在本页
  • kubeedge源码分析之cloudcore
  • 1. main函数
  • 2. NewCloudCoreCommand
  • 3. registerModules
  • 4. core.Run
  • 5. StartModules
  • 6. GracefulShutdown

这有帮助吗?

在GitHub上编辑
  1. 边缘容器
  2. KubeEdge源码分析

cloudcore

上一页KubeEdge源码分析下一页edgecore

最后更新于2年前

这有帮助吗?

kubeedge源码分析之cloudcore

本文源码分析基于

本文主要分析cloudcore中CloudCoreCommand的基本流程,具体的cloudhub、edgecontroller、devicecontroller模块的实现逻辑待后续单独文章分析。

目录结构:

cloud/cmd/cloudcore

cloudcore
├── app
│   ├── options
│   │   └── options.go
│   └── server.go # NewCloudCoreCommand、registerModules
└── cloudcore.go # main函数

cloudcore部分包含以下模块:

  • cloudhub

  • edgecontroller

  • devicecontroller

1. main函数

kubeedge的代码采用cobra命令框架,代码风格与k8s源码风格类似。cmd目录主要为cobra command的基本内容及参数解析,pkg目录包含具体的实现逻辑。

cloud/cmd/cloudcore/cloudcore.go

func main() {
	command := app.NewCloudCoreCommand()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

2. NewCloudCoreCommand

NewCloudCoreCommand为cobra command的构造函数,该类函数一般包含以下部分:

  • 构造option

  • 添加Flags

  • 运行Run函数(核心)

cloud/cmd/cloudcore/app/server.go

func NewCloudCoreCommand() *cobra.Command {
	opts := options.NewCloudCoreOptions()
	cmd := &cobra.Command{
		Use: "cloudcore",
		Long: `CloudCore is the core cloud part of KubeEdge, which contains three modules: cloudhub,
edgecontroller, and devicecontroller. Cloudhub is a web server responsible for watching changes at the cloud side,
caching and sending messages to EdgeHub. EdgeController is an extended kubernetes controller which manages 
edge nodes and pods metadata so that the data can be targeted to a specific edge node. DeviceController is an extended 
kubernetes controller which manages devices so that the device metadata/status date can be synced between edge and cloud.`,
		Run: func(cmd *cobra.Command, args []string) {
			verflag.PrintAndExitIfRequested()
			flag.PrintFlags(cmd.Flags())

			// To help debugging, immediately log version
			klog.Infof("Version: %+v", version.Get())
			registerModules()
			// start all modules
			core.Run()
		},
	}
	fs := cmd.Flags()
	namedFs := opts.Flags()
	verflag.AddFlags(namedFs.FlagSet("global"))
	globalflag.AddGlobalFlags(namedFs.FlagSet("global"), cmd.Name())
	for _, f := range namedFs.FlagSets {
		fs.AddFlagSet(f)
	}

	usageFmt := "Usage:\n  %s\n"
	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
	cmd.SetUsageFunc(func(cmd *cobra.Command) error {
		fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
		cliflag.PrintSections(cmd.OutOrStderr(), namedFs, cols)
		return nil
	})
	cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
		fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
		cliflag.PrintSections(cmd.OutOrStdout(), namedFs, cols)
	})

	return cmd
}

核心代码:

// 构造option
opts := options.NewCloudCoreOptions()
// 执行run函数
registerModules()
core.Run()
// 添加flags
fs.AddFlagSet(f)

3. registerModules

其中cloudcore部分涉及的模块有:

  • cloudhub

  • edgecontroller

  • devicecontroller

cloud/cmd/cloudcore/app/server.go

// registerModules register all the modules started in cloudcore
func registerModules() {
	cloudhub.Register()
	edgecontroller.Register()
	devicecontroller.Register()
}

以下以cloudhub为例说明注册的过程。

cloudhub结构体主要包含:

  • context:上下文,用来传递消息上下文

  • stopChan:go channel通信

beehive框架中的模块需要实现Module接口,因此cloudhub也实现了该接口,其中核心方法为Start,用来启动相应模块的运行。

vendor/github.com/kubeedge/beehive/pkg/core/module.go

// Module interface
type Module interface {
	Name() string
	Group() string
	Start(c *context.Context)
	Cleanup()
}

以下为cloudHub结构体及注册函数。

cloud/pkg/cloudhub/cloudhub.go

type cloudHub struct {
	context  *context.Context
	stopChan chan bool
}

func Register() {
	core.Register(&cloudHub{})
}

具体的注册实现函数为core.Register,注册过程实际上就是将具体的模块结构体放入一个以模块名为key的map映射中,待后续调用。

vendor/github.com/kubeedge/beehive/pkg/core/module.go

// Register register module
func Register(m Module) {
	if isModuleEnabled(m.Name()) {
		modules[m.Name()] = m  //将具体的模块结构体放入一个以模块名为key的map映射中
		log.LOGGER.Info("module " + m.Name() + " registered")
	} else {
		disabledModules[m.Name()] = m
		log.LOGGER.Info("module " + m.Name() +
			" is not register, please check modules.yaml")
	}
}

4. core.Run

CloudCoreCommand命令的Run函数实际上是运行beehive框架中注册的所有模块。

其中包括两部分逻辑:

  • 启动运行所有注册模块

  • 监听信号并做优雅清理

vendor/github.com/kubeedge/beehive/pkg/core/core.go

//Run starts the modules and in the end does module cleanup
func Run() {
	//Address the module registration and start the core
	StartModules()
	// monitor system signal and shutdown gracefully
	GracefulShutdown()
}

5. StartModules

StartModules获取context上下文,并以goroutine的方式运行所有已注册的模块。其中Start函数即每个模块的具体实现Module接口中的Start方法。不同模块各自定义自己的具体Start方法实现。

coreContext := context.GetContext(context.MsgCtxTypeChannel)
go module.Start(coreContext)

具体实现如下:

vendor/github.com/kubeedge/beehive/pkg/core/core.go

// StartModules starts modules that are registered
func StartModules() {
	coreContext := context.GetContext(context.MsgCtxTypeChannel)

	modules := GetModules()
	for name, module := range modules {
		//Init the module
		coreContext.AddModule(name)
		//Assemble typeChannels for send2Group
		coreContext.AddModuleGroup(name, module.Group())
		go module.Start(coreContext)
		log.LOGGER.Info("starting module " + name)
	}
}

6. GracefulShutdown

当收到相关信号,则执行各个模块实现的Cleanup方法。

vendor/github.com/kubeedge/beehive/pkg/core/core.go

// GracefulShutdown is if it gets the special signals it does modules cleanup
func GracefulShutdown() {
	c := make(chan os.Signal)
	signal.Notify(c, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM,
		syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT)
	select {
	case s := <-c:
		log.LOGGER.Info("got os signal " + s.String())
		//Cleanup each modules
		modules := GetModules()
		for name, module := range modules {
			log.LOGGER.Info("Cleanup module " + name)
			module.Cleanup()
		}
	}
}

参考:

  • https://github.com/kubeedge/kubeedge/tree/release-1.1/cloud/cmd/cloudcore

  • https://github.com/kubeedge/kubeedge/tree/release-1.1/vendor/github.com/kubeedge/beehive/pkg/core

由于kubeedge的代码的大部分模块都采用了基于go-channel的消息通信框架(待后续单独文章分析),因此在各模块启动之前,需要将该模块注册到beehive的框架中。

kubeedge v1.1.0
Beehive