Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions go/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ type TrainingJob struct {
// +k8s:deepcopy-gen=true
type TrainingJobSpec struct {
// General job attributes.
Image string `json:"image,omitempty"`
Port int `json:"port,omitempty"`
PortsNum int `json:"ports_num,omitempty"`
PortsNumForSparse int `json:"ports_num_for_sparse,omitempty"`
FaultTolerant bool `json:"fault_tolerant,omitempty"`
Passes int `json:"passes,omitempty"`
Image string `json:"image,omitempty"`
Port int `json:"port,omitempty"`
PortsNum int `json:"ports_num,omitempty"`
PortsNumForSparse int `json:"ports_num_for_sparse,omitempty"`
FaultTolerant bool `json:"fault_tolerant,omitempty"`
Passes int `json:"passes,omitempty"`
Volumes []v1.Volume `json:volumes`
VolumeMounts []v1.VolumeMount `json:VolumeMounts`
// Job components.
Trainer TrainerSpec `json:"trainer"`
Pserver PserverSpec `json:"pserver"`
Expand Down
58 changes: 58 additions & 0 deletions go/controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
batchv1 "k8s.io/client-go/pkg/apis/batch/v1"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/api"
)

Expand All @@ -49,6 +50,14 @@ func (c Cluster) GetTrainerJob(job *paddlejob.TrainingJob) (*batchv1.Job, error)
Get(fmt.Sprintf("%s-trainer", jobname), metav1.GetOptions{})
}

// GetTrainerJobByName gets the trainer job spec.
func (c Cluster) GetTrainerJobByName(namespace, name string) (*batchv1.Job, error) {
return c.clientset.
BatchV1().
Jobs(namespace).
Get((name), metav1.GetOptions{})
}

// UpdateTrainerJob updates the trainer job spec
// this will do the actual scale up/down.
func (c Cluster) UpdateTrainerJob(job *batchv1.Job) error {
Expand Down Expand Up @@ -184,3 +193,52 @@ func (c *Cluster) SyncResource() (res ClusterResource, err error) {
}
return
}

// CreateJob creates a Job.
func (c *Cluster) CreateJob(j *batchv1.Job) (*batchv1.Job, error) {
return c.clientset.
BatchV1().
Jobs(j.ObjectMeta.Namespace).
Create(j)
}

// CreateReplicaSet creates a ReplicaSet.
func (c *Cluster) CreateReplicaSet(r *v1beta1.ReplicaSet) (*v1beta1.ReplicaSet, error) {
return c.clientset.
ExtensionsV1beta1().
ReplicaSets(r.ObjectMeta.Namespace).
Create(r)
}

// GetReplicaSet gets a ReplicaSet.
func (c *Cluster) GetReplicaSet(namespace, name string) (*v1beta1.ReplicaSet, error) {
return c.clientset.
ExtensionsV1beta1().
ReplicaSets(namespace).
Get(name, metav1.GetOptions{})
}

// DeleteTrainerJob deletes a trainerjob and their pods.
// see: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
func (c *Cluster) DeleteTrainerJob(namespace, name string) error {
deletePolicy := metav1.DeletePropagationForeground
options := metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}
return c.clientset.
BatchV1().
Jobs(namespace).
Delete(name, &options)
}

// DeleteReplicaSet delete a ReplicaSet and their pods.
func (c *Cluster) DeleteReplicaSet(namespace, name string) error {
deletePolicy := metav1.DeletePropagationForeground
options := metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}
return c.clientset.
ExtensionsV1beta1().
ReplicaSets(namespace).
Delete(name, &options)
}
48 changes: 11 additions & 37 deletions go/controller/jobparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ func (p *DefaultJobParser) ParseToPserver(job *paddlejob.TrainingJob) *v1beta1.R
Labels: map[string]string{"paddle-job-pserver": job.ObjectMeta.Name},
},
Spec: v1.PodSpec{
// TODO: setup pserver volumes on cloud.
Volumes: podVolumes(job),
Volumes: job.Spec.Volumes,
Containers: []v1.Container{
v1.Container{
Name: "pserver",
Expand Down Expand Up @@ -138,14 +137,14 @@ func (p *DefaultJobParser) ParseToTrainer(job *paddlejob.TrainingJob) *batchv1.J
Labels: map[string]string{"paddle-job": job.ObjectMeta.Name},
},
Spec: v1.PodSpec{
Volumes: podVolumes(job),
Volumes: job.Spec.Volumes,
Containers: []v1.Container{
v1.Container{
Name: "trainer",
Image: job.Spec.Image,
ImagePullPolicy: imagePullPolicy,
Command: command,
VolumeMounts: podVolumeMounts(job),
VolumeMounts: job.Spec.VolumeMounts,
Ports: podPorts(job),
Env: podEnv(job),
Resources: job.Spec.Trainer.Resources,
Expand All @@ -162,13 +161,13 @@ func masterResource(job *paddlejob.TrainingJob) *v1.ResourceRequirements {
// TODO(gongwb): config master resource?
return &v1.ResourceRequirements{
Limits: v1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI),
"memory": apiresource.MustParse("500Mi"),
},
Requests: v1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(2), apiresource.DecimalSI),
"memory": apiresource.MustParse("1Gi"),
},
Requests: v1.ResourceList{
"cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI),
"memory": apiresource.MustParse("500Mi"),
},
}
}

Expand Down Expand Up @@ -213,18 +212,16 @@ func (p *DefaultJobParser) ParseToMaster(job *paddlejob.TrainingJob) *v1beta1.Re
Labels: map[string]string{"paddle-job-master": job.ObjectMeta.Name},
},
Spec: v1.PodSpec{
// TODO: setup pserver volumes on cloud.
Volumes: podVolumes(job),
Volumes: job.Spec.Volumes,
Containers: []v1.Container{
v1.Container{
Name: "master",
Image: job.Spec.Image,
ImagePullPolicy: imagePullPolicy,
Ports: masterPorts(job),
// TODO(gongwb):master env
Command: command,
VolumeMounts: podVolumeMounts(job),
Resources: *masterResource(job),
Command: command,
VolumeMounts: job.Spec.VolumeMounts,
Resources: *masterResource(job),
},
*getEtcdPodSpec(job),
},
Expand Down Expand Up @@ -315,29 +312,6 @@ func podEnv(job *paddlejob.TrainingJob) []v1.EnvVar {
}
}

func podVolumes(job *paddlejob.TrainingJob) []v1.Volume {
return []v1.Volume{
v1.Volume{
Name: job.ObjectMeta.Name + "-workspace",
// TODO(gongwb): add support to ceph fs and mount public path.
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: job.Spec.Trainer.Workspace,
},
},
},
}
}

func podVolumeMounts(job *paddlejob.TrainingJob) []v1.VolumeMount {
return []v1.VolumeMount{
v1.VolumeMount{
Name: job.ObjectMeta.Name + "-workspace",
MountPath: job.Spec.Trainer.Workspace,
},
}
}

// -----------------------------------------------------------------------
// general functions end
// -----------------------------------------------------------------------
Loading