DI-engine框架分为3个重要的模块,分别是coordinator、collector和learner。一般情况下,一个DI-engine训练任务只有一个coordinator,learner和collector的数目可以变化。三个模块的作用分别为:
有关DI-engine的详细介绍可参考DI-engine developer tutorial。
为了提供DI-engine在Kubernetes(K8s)中运行的支持,我们设计了DI Orchestrator,本文将说明利用DI Orchestrator,DI-engine各个模块在K8s系统上如何被创建、如何相互发现、如何开始训练等。DI Orchestrator的架构如下图所示:
整体分为两大模块:di-server
和di-operator
,DDPL
指ddp learner,Lm
指Learner,Cn
指Collector,Aggregator+DDPL
构成一个logic learner。接下来将首先介绍一个DI-engine任务提交到K8s之后DI Orchestrator如何将DI-engine的各个模块(在K8s中就是一个pod)创建并启动,然后将对di-server和di-operator进行介绍。
这里介绍任务创建流程,说明一个DI-engine任务在K8s中从创建到执行完成的一整个生命周期
di-operator是在一个负责在K8s系统中编排DIJob的组件,采用K8s operator pattern,通过controller pattern中的控制循环监听K8s集群中DIJob的状态,并在有需要的时候对DIJob的状态进行修改,使得DIJob的实际状态与我们预定义的状态尽可能保持一致。
根据DI框架中每个模块的特性,我们定义了两种自定义资源(Custom Resource),分别是DIJob和AggregatorConfig。前者用来定义一个RL任务的coordinator、collector和learner运行所需的必备条件,包括镜像、启动命令、所需计算和存储资源、环境变量等;后者用来定义一个RL任务的aggregator运行所需的必备条件。
DIJob定义:
type DIJobSpec struct {
// Group is a collection of DIJobs
Group string `json:"group,omitempty"`
//Priority labels the priority of DIJob
PriorityClassName PriorityClassName `json:"priorityClassName,omitempty"`
// CleanPodPolicy defines the policy to clean pods after DIJob completed
CleanPodPolicy CleanPodPolicy `json:"cleanPodPolicy,omitempty"`
// Volumes defines the shared volumes for DI-engine components
Volumes []corev1.Volume `json:"volumes,omitempty"`
Coordinator CoordinatorSpec `json:"coordinator"`
Collector CollectorSpec `json:"collector,"`
Learner LearnerSpec `json:"learner,"`
}
AggregatorConfig定义:
type AggregatorConfigSpec struct {
Aggregator AggregatorSpec `json:"aggregator,"`
}
为什么aggregator单独定义? aggregator对所有使用DI-engine框架进行RL训练的任务都是通用的,因此我们将aggregator定义为一个全局的、共享的资源AggregatorConfig,所有RL任务提交后,di-server将通过读取集群中唯一的AggregatorConfig来创建aggregator。另外,aggregator只是针对最常见的数据并行训练,如果是其他并行训练方法,需要定义新的Custom Resource。
用户提交DIJob后,di-operator便接管了DIJob的生命周期的管理,为了便于用户了解DIJob的状态,我们定义了以下阶段(phase):
const (
// JobCreated means the job has been submitted to the cluster,
// but not all the pods and services have been created,
// or no pods are running
JobCreated Phase = "Created"
// JobRunning means all the pods are in running state
JobRunning Phase = "Running"
// JobSucceeded means job completed without error
JobSucceeded Phase = "Succeeded"
// JobFailed means some pods failed, job is also considered failed
JobFailed Phase = "Failed"
// JobUnknown means the job is in unknown state
JobUnknown Phase = "Unknown"
)
一个正常运行并结束的DIJob会经历Created、Running和Succeeded三个阶段:
Unknown阶段暂时未作定义。
使用kubebuilder v3构建项目,operator所需的reflector、informer、indexer、controller等组件都由controller-runtime封装到manager中,将调谐(Reconcile)函数暴露给我们实现调谐逻辑,如下代码所示:
func (r *DIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// your reconcile logic here
return ctrl.Result{}, nil
}
当用户提交DIJob后,informer获取到该提交事件后触发handler,之后Reconcile函数被调用;Reconcile函数中调用list pod方法发现coordinator未创建,则读取DIJob中关于coordinator的定义模板,创建相应的coordinator pod(coordinator程序在其中运行)和service(用于pod间通信),并将一些环境变量写入pod中,包括pod的名称、pod的命名空间、访问coordinator的URL等环境变量。
其中,DI-engine框架的每个模块占用的端口都有一个默认值,如下所示:
DefaultCollectorPort = 22270
DefaultLearnerPort = 22271
DefaultAggregatorPort = 22272
DefaultCoordinatorPort = 22273
coordinator创建之后,di-operator将监听pod的状态并修改DIJob的状态。等到DIJob完成后(Succeeded或者Failed),di-operator默认会将DIJob的所有处于Running阶段的pod和所有的service都删除,coordinator pod会保留。
用户提交DIJob时,可能存在yaml文件里的某些字段输入错误的问题,导致DIJob的运行状态达不到预期,影响用户排查问题;或者需要为DIJob的某些字段设置默认值。如果在DIJob提交到K8s集群前能为DIJob设置默认值,以及做一次正确性校验,有助于用户提前发现问题。
在K8s中,可以配置webhook在DIJob提交到K8s集群之前对其进行正确性校验。K8s webhook分为MutatingWebhook和ValidatingWebhook,前者用于修改K8s资源对象的值,后者用于验证K8s资源对象的正确性。
di-operator中实现了webhook校验方法,创建MutatingWebhook用于设置DIJob的默认值;创建ValidatingWebhook用于校验DIJob的正确性。比如对CleanPodPolicy
字段,我们在MutatingWebhook中设置其默认值为Running
,表示DIJob完成后将Running的pod都删除;我们在ValidatingWebhook中校验CleanPodPolicy
字段的值,如果用户设置的值不等于None
、ALL
、Running
中的任何一个,则拒绝提交该DIJob。
di-server是一个为DI-engine框架定制的http服务器,提供新增、删除和查询collector、learner、aggregator的功能。通过调用di-server的相关接口,di-server为DIJob提供了动态增删collector和learner的能力。下面将对di-server的设计进行简要介绍,包括存储AggregatorConfig、DIJob以及DIJob所有pod的本地cache;用于动态新增、删除和查询collector、learner和aggregator的http接口设计。
为了减少di-server与K8s api server之间查询的频率,从而减轻K8s api server的负担,我们利用client-go提供的informer机制将AggregatorConfig、DIJob和DIJob的所有pod存储在本地cache,如下图所示
上图中我们只关注上半部分:reflector通过list & watch接受到新的资源实例存在的通知,就将新资源实例放到Delta Fifo queue中,informer从Delta Fifo queue中获取新资源实例并通过indexer存放到本地cache中。查询操作都可以通过查询本地cache来完成,减少向K8s api server的请求次数。如下命令:
genericInformer.Informer().GetIndexer().GetByKey(key)
当资源对象有变更时,reflector同样会接受到通知并更新本地cache;另外,informer也会定期向api server同步本地cache,与K8s集群中的资源对象保持一致。
为了支持DIJob动态增删collector/learner的需求,di-server提供http接口用于对collector/learner进行新增、删除和查询的功能,如下图所示:
提供如下接口:
method | path | description |
---|---|---|
GET | /v1alpha1/replicas | list all collectors and learners |
GET | /v1alpha1/replicas?namespace=xxx | list all collectors and learners in namespace |
GET | /v1alpha1/replicas?namespace=xxx&coordinator=xxx | list all replicas belongs to coordinator |
GET | /v1alpha1/replicas?namespace=xxx&aggregator=xxx | get learners belongs to aggregator |
DELETE | /v1alpha1/replicas | delete some replicas. put data in request body |
POST | /v1alpha1/replicas | create replicas. put data in request body |
POST | /v1alpha1/replicas/failed | post failed replicas and request for recreation. put data in request body |
DI Orchestrator为DI-engine框架提供了分布式场景下基于K8s的容器运行方案。对于用户提交的DIJob,di-operator负责对DI-engine的各个模块进行编排,使得各个模块可以正常运行并执行训练任务。通过调用di-server的接口,赋予coordinator新增、删除和查询其所有的collector、learner和aggregator的功能,提升DI-engine框架资源动态分配的能力。总结DI Orchestrator提供了以下优势:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。