Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions pkg/controller/restore_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,13 @@ func (c *StashController) ensureRestoreJob(restoreSession *api_v1beta1.RestoreSe

if restoreSession.Spec.Task.Name != "" {
// Restore process follows Function-Task model. So, resolve Function and Task to get desired job definition.
jobTemplate, err = c.resolveRestoreTask(restoreSession, repository, jobMeta, ref, serviceAccountName)
jobTemplate, err = c.resolveRestoreTask(restoreSession, repository, ref, serviceAccountName)
if err != nil {
return err
}
} else {
// Restore process does not follow Function-Task model. So, generate simple volume restorer job definition.
jobTemplate, err = util.NewPVCRestorerJob(restoreSession, repository, image, jobMeta)
jobTemplate, err = util.NewPVCRestorerJob(restoreSession, repository, image)
if err != nil {
return err
}
Expand All @@ -267,7 +267,7 @@ func (c *StashController) ensureRestoreJob(restoreSession *api_v1beta1.RestoreSe
// If volumeClaimTemplate is not specified then we don't need any further processing. Just, create the job
if restoreSession.Spec.Target == nil ||
(restoreSession.Spec.Target != nil && len(restoreSession.Spec.Target.VolumeClaimTemplates) == 0) {
return c.createRestoreJob(jobTemplate, jobTemplate.ObjectMeta, ref, serviceAccountName)
return c.createRestoreJob(jobTemplate, jobMeta, ref, serviceAccountName)
}

// volumeClaimTemplate has been specified. Now, we have to do the following for each replica:
Expand Down Expand Up @@ -295,13 +295,27 @@ func (c *StashController) ensureRestoreJob(restoreSession *api_v1beta1.RestoreSe

// add PVCs as volume to the job
volumes := util.PVCListToVolumes(pvcList, ordinal)
jobTemplate.Spec.Volumes = core_util.UpsertVolume(jobTemplate.Spec.Volumes, volumes...)

// use copy of the original job template. otherwise, each iteration will append volumes in the same template
restoreJobTemplate := jobTemplate.DeepCopy()
restoreJobMeta := jobMeta.DeepCopy()
// add ordinal suffix to the job name so that multiple restore job can run concurrently
jobTemplate.Name = fmt.Sprintf("%s-%d", jobMeta.Name, ordinal)
restoreJobMeta.Name = fmt.Sprintf("%s-%d", jobMeta.Name, ordinal)

restoreJobTemplate.Spec.Volumes = core_util.UpsertVolume(restoreJobTemplate.Spec.Volumes, volumes...)

ordinalEnv := core.EnvVar{
Name: util.KeyPodOrdinal,
Value: fmt.Sprintf("%d", ordinal),
}

// insert POD_ORDINAL env in all containers.
for i, c := range restoreJobTemplate.Spec.Containers {
restoreJobTemplate.Spec.Containers[i].Env = core_util.UpsertEnvVars(c.Env, ordinalEnv)
}

// create restore job
err = c.createRestoreJob(jobTemplate, jobTemplate.ObjectMeta, ref, serviceAccountName)
err = c.createRestoreJob(restoreJobTemplate, *restoreJobMeta, ref, serviceAccountName)
if err != nil {
return err
}
Expand Down Expand Up @@ -329,7 +343,7 @@ func (c *StashController) createRestoreJob(jobTemplate *core.PodTemplateSpec, me

// resolveRestoreTask resolves Functions and Tasks then returns a job definition to restore the target.
func (c *StashController) resolveRestoreTask(restoreSession *api_v1beta1.RestoreSession,
repository *api_v1alpha1.Repository, meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string) (*core.PodTemplateSpec, error) {
repository *api_v1alpha1.Repository, ref *core.ObjectReference, serviceAccountName string) (*core.PodTemplateSpec, error) {

// resolve task template
explicitInputs := make(map[string]string)
Expand Down Expand Up @@ -384,8 +398,7 @@ func (c *StashController) resolveRestoreTask(restoreSession *api_v1beta1.Restore
}

podTemplate := &core.PodTemplateSpec{
ObjectMeta: meta,
Spec: podSpec,
Spec: podSpec,
}
return podTemplate, nil
}
Expand Down Expand Up @@ -602,7 +615,7 @@ func getPVCFromVolumeClaimTemplates(ordinal int32, claimTemplates []core.Persist
pvcList := make([]core.PersistentVolumeClaim, 0)
for _, claim := range claimTemplates {
inputs := make(map[string]string)
inputs["POD_ORDINAL"] = strconv.Itoa(int(ordinal))
inputs[util.KeyPodOrdinal] = strconv.Itoa(int(ordinal))
err := resolve.ResolvePVCSpec(&claim, inputs)
if err != nil {
return pvcList, err
Expand Down
16 changes: 11 additions & 5 deletions pkg/controller/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,21 +198,27 @@ func (c *StashController) getTotalHosts(target interface{}, namespace string, dr
return types.Int32P(1), nil
}
targetRef = t.Ref

case *api_v1beta1.RestoreTarget:
t := target.(*api_v1beta1.RestoreTarget)
if t == nil {
return types.Int32P(1), nil
}
targetRef = t.Ref

// for VolumeSnapshot, we consider each PVC as a separate host.
// hence, number of host = replica * number of PVC in each replica
if driver == api_v1beta1.VolumeSnapshotter {
def := int32(1)
replica := int32(1)
if t.Replicas != nil {
def = types.Int32(t.Replicas)
replica = types.Int32(t.Replicas)
}
return types.Int32P(def * int32(len(t.VolumeClaimTemplates))), nil
return types.Int32P(replica * int32(len(t.VolumeClaimTemplates))), nil
}
// if volumeClaimTemplates is specified when using Restic driver, we can calculate total host from it
if driver != api_v1beta1.VolumeSnapshotter && t.VolumeClaimTemplates != nil {

// if volumeClaimTemplates is specified when using Restic driver, restore is done through job.
// stash creates restore job for each replica. hence, number of total host is the number of replicas.
if len(t.VolumeClaimTemplates) != 0 || t.Replicas != nil {
if t.Replicas == nil {
return types.Int32P(1), nil
} else {
Expand Down
7 changes: 1 addition & 6 deletions pkg/util/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func NewRecoveryJob(stashClient cs.Interface, recovery *api_v1alpha1.Recovery, i
}

// NewPVCRestorerJob return a job definition to restore pvc.
func NewPVCRestorerJob(rs *api_v1beta1.RestoreSession, repository *api_v1alpha1.Repository, image docker.Docker, meta metav1.ObjectMeta) (*core.PodTemplateSpec, error) {
func NewPVCRestorerJob(rs *api_v1beta1.RestoreSession, repository *api_v1alpha1.Repository, image docker.Docker) (*core.PodTemplateSpec, error) {
container := core.Container{
Name: StashContainer,
Image: image.ToContainerImage(),
Expand All @@ -217,10 +217,6 @@ func NewPVCRestorerJob(rs *api_v1beta1.RestoreSession, repository *api_v1alpha1.
fmt.Sprintf("--enable-analytics=%v", cli.EnableAnalytics),
}, cli.LoggerOptions.ToFlags()...),
Env: []core.EnvVar{
{
Name: KeyPodName,
Value: meta.Name,
},
{
Name: KeyNodeName,
ValueFrom: &core.EnvVarSource{
Expand Down Expand Up @@ -276,7 +272,6 @@ func NewPVCRestorerJob(rs *api_v1beta1.RestoreSession, repository *api_v1alpha1.
}

jobTemplate := &core.PodTemplateSpec{
ObjectMeta: meta,
Spec: core.PodSpec{
Containers: []core.Container{container},
RestartPolicy: core.RestartPolicyNever,
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ const (
StashSecretVolume = "stash-secret-volume"
StashSecretMountDir = "/etc/stash/repository/secret"

KeyPodName = "POD_NAME"
KeyNodeName = "NODE_NAME"
KeyPodName = "POD_NAME"
KeyNodeName = "NODE_NAME"
KeyPodOrdinal = "POD_ORDINAL"

RetryInterval = 50 * time.Millisecond
ReadinessTimeout = 2 * time.Minute
Expand Down
30 changes: 19 additions & 11 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@ type RepoLabelData struct {
NodeName string
}

// GetHostName returns hostname for a target
func GetHostName(target interface{}) (string, error) {
// target nil for cluster backup
var targetRef api_v1beta1.TargetRef
if target == nil {
return "host-0", nil
}

// read targetRef field from BackupTarget or RestoreTarget
switch target.(type) {
case *api_v1beta1.BackupTarget:
t := target.(*api_v1beta1.BackupTarget)
Expand All @@ -60,33 +63,38 @@ func GetHostName(target interface{}) (string, error) {
if t == nil {
return "host-0", nil
}
// replicas is specified when restore StatefulSet volumes using job.
// so we have to handle this case too.
if t.Replicas != nil { // StatefulSet volumes.
podName := os.Getenv(KeyPodName)
if podName == "" {
return "", fmt.Errorf("missing podName for %s", apis.KindStatefulSet)

// if replicas or volumeClaimTemplate is specified then restore is done via job.
// in this case, we need to know the ordinal to use as host suffix.
// stash operator sets desired ordinal as 'POD_ORDINAL' env while creating the job.
if t.Replicas != nil || len(t.VolumeClaimTemplates) != 0 {
if os.Getenv(KeyPodOrdinal) != "" {
return "host-" + os.Getenv(KeyPodOrdinal), nil
}
podInfo := strings.Split(podName, "-")
podOrdinal := podInfo[len(podInfo)-1]
return "host-" + podOrdinal, nil
return "", fmt.Errorf("'target.replicas' or 'target.volumeClaimTemplate' has been specified in RestoreSession" +
" but 'POD_ORDINAL' env not found")
}
targetRef = t.Ref
}

// backup/restore is running through sidecar/init-container. identify hostname for them.
switch targetRef.Kind {
case apis.KindStatefulSet:
// for StatefulSet, host name is 'host-<pod ordinal>'. stash operator set pod's name as 'POD_NAME' env
// in the sidecar/init-container through downward api. we have to parse the pod name to get ordinal.
podName := os.Getenv(KeyPodName)
if podName == "" {
return "", fmt.Errorf("missing podName for %s", apis.KindStatefulSet)
return "", fmt.Errorf("missing 'POD_NAME' env in StatefulSet: %s", apis.KindStatefulSet)
}
podInfo := strings.Split(podName, "-")
podOrdinal := podInfo[len(podInfo)-1]
return "host-" + podOrdinal, nil
case apis.KindDaemonSet:
// for DaemonSet, host name is the node name. stash operator set the respective node name as 'NODE_NAME' env
// in the sidecar/init-container through downward api.
nodeName := os.Getenv(KeyNodeName)
if nodeName == "" {
return "", fmt.Errorf("missing nodeName for %s", apis.KindDaemonSet)
return "", fmt.Errorf("missing 'NODE_NAME' env for DaemonSet: %s", apis.KindDaemonSet)
}
return nodeName, nil
default:
Expand Down