Skip to content

Commit

Permalink
fix: enhance PITR function (apecloud#2744)
Browse files Browse the repository at this point in the history
  • Loading branch information
dengshaojiang authored Apr 20, 2023
1 parent 9435866 commit 321fdc3
Show file tree
Hide file tree
Showing 18 changed files with 322 additions and 177 deletions.
31 changes: 14 additions & 17 deletions apis/dataprotection/v1alpha1/backup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,20 @@ func (r *BackupSpec) Validate(backupPolicy *BackupPolicy) error {
// GetRecoverableTimeRange return the recoverable time range array
func GetRecoverableTimeRange(backups []Backup) []BackupLogStatus {
// filter backups with backupLog
backupsWithLog := make([]Backup, 0)
baseBackups := make([]Backup, 0)
var incrementalBackup *Backup
for _, b := range backups {
if b.Status.Phase == BackupCompleted &&
b.Status.Manifests != nil && b.Status.Manifests.BackupLog != nil {
backupsWithLog = append(backupsWithLog, b)
if b.Status.Manifests == nil || b.Status.Manifests.BackupLog == nil ||
b.Status.Manifests.BackupLog.StopTime == nil {
continue
}
if b.Spec.BackupType == BackupTypeIncremental {
incrementalBackup = &b
} else if b.Spec.BackupType != BackupTypeIncremental && b.Status.Phase == BackupCompleted {
baseBackups = append(baseBackups, b)
}
}
if len(backupsWithLog) == 0 {
if len(baseBackups) == 0 {
return nil
}
sort.Slice(backups, func(i, j int) bool {
Expand All @@ -230,18 +236,9 @@ func GetRecoverableTimeRange(backups []Backup) []BackupLogStatus {
return backups[i].Status.StartTimestamp.Before(backups[j].Status.StartTimestamp)
})
result := make([]BackupLogStatus, 0)
start, end := backupsWithLog[0].Status.Manifests.BackupLog.StopTime, backupsWithLog[0].Status.Manifests.BackupLog.StopTime

for i := 1; i < len(backupsWithLog); i++ {
b := backupsWithLog[i].Status.Manifests.BackupLog
if b.StartTime.Before(end) || b.StartTime.Equal(end) {
if b.StopTime.After(end.Time) {
end = b.StopTime
}
} else {
result = append(result, BackupLogStatus{StartTime: start, StopTime: end})
start, end = b.StopTime, b.StopTime
}
start, end := baseBackups[0].Status.Manifests.BackupLog.StopTime, baseBackups[0].Status.Manifests.BackupLog.StopTime
if incrementalBackup != nil && start.Before(incrementalBackup.Status.Manifests.BackupLog.StopTime) {
end = incrementalBackup.Status.Manifests.BackupLog.StopTime
}
return append(result, BackupLogStatus{StartTime: start, StopTime: end})
}
46 changes: 34 additions & 12 deletions controllers/dataprotection/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func (r *BackupReconciler) doNewPhaseAction(
Time: backup.Status.StartTimestamp.Add(dataprotectionv1alpha1.ToDuration(backupPolicy.Spec.TTL)),
}
}

if err = r.Client.Status().Patch(reqCtx.Ctx, backup, patch); err != nil {
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}
Expand Down Expand Up @@ -344,7 +345,7 @@ func (r *BackupReconciler) doInProgressPhaseAction(
if !isOK {
return intctrlutil.RequeueAfter(reconcileInterval, reqCtx.Log, "")
}
if err = r.createUpdatesJobs(reqCtx, backup, backupPolicy.Spec.Snapshot, dataprotectionv1alpha1.PRE); err != nil {
if err = r.createUpdatesJobs(reqCtx, backup, &backupPolicy.Spec.Snapshot.BasePolicy, dataprotectionv1alpha1.PRE); err != nil {
r.Recorder.Event(backup, corev1.EventTypeNormal, "CreatedPreUpdatesJob", err.Error())
}
if err = r.createVolumeSnapshot(reqCtx, backup, backupPolicy.Spec.Snapshot); err != nil {
Expand All @@ -371,7 +372,7 @@ func (r *BackupReconciler) doInProgressPhaseAction(
}

// Failure MetadataCollectionJob does not affect the backup status.
if err = r.createUpdatesJobs(reqCtx, backup, backupPolicy.Spec.Snapshot, dataprotectionv1alpha1.POST); err != nil {
if err = r.createUpdatesJobs(reqCtx, backup, &backupPolicy.Spec.Snapshot.BasePolicy, dataprotectionv1alpha1.POST); err != nil {
r.Recorder.Event(backup, corev1.EventTypeNormal, "CreatedPostUpdatesJob", err.Error())
}

Expand All @@ -390,6 +391,10 @@ func (r *BackupReconciler) doInProgressPhaseAction(
// TODO: add error type
return r.updateStatusIfFailed(reqCtx, backup, fmt.Errorf("not found the %s policy", backup.Spec.BackupType))
}
// createUpdatesJobs should not affect the backup status, just need to record events when the run fails
if err = r.createUpdatesJobs(reqCtx, backup, &commonPolicy.BasePolicy, dataprotectionv1alpha1.PRE); err != nil {
r.Recorder.Event(backup, corev1.EventTypeNormal, "CreatedPreUpdatesJob", err.Error())
}
pathPrefix := r.getBackupPathPrefix(backup.Namespace, backupPolicy.Annotations[constant.BackupDataPathPrefixAnnotationKey])
err = r.createBackupToolJob(reqCtx, backup, commonPolicy, pathPrefix)
if err != nil {
Expand All @@ -407,20 +412,33 @@ func (r *BackupReconciler) doInProgressPhaseAction(
if err != nil {
return r.updateStatusIfFailed(reqCtx, backup, err)
}
// createUpdatesJobs should not affect the backup status, just need to record events when the run fails
if err = r.createUpdatesJobs(reqCtx, backup, &commonPolicy.BasePolicy, dataprotectionv1alpha1.POST); err != nil {
r.Recorder.Event(backup, corev1.EventTypeNormal, "CreatedPostUpdatesJob", err.Error())
}
jobStatusConditions := job.Status.Conditions
if jobStatusConditions[0].Type == batchv1.JobComplete {
// update Phase to in Completed
backup.Status.Phase = dataprotectionv1alpha1.BackupCompleted
backup.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now().UTC()}
backup.Status.Manifests = &dataprotectionv1alpha1.ManifestsStatus{
BackupTool: &dataprotectionv1alpha1.BackupToolManifestsStatus{
FilePath: pathPrefix,
},
if backup.Status.Manifests == nil {
backup.Status.Manifests = &dataprotectionv1alpha1.ManifestsStatus{}
}
if backup.Status.Manifests.BackupTool == nil {
backup.Status.Manifests.BackupTool = &dataprotectionv1alpha1.BackupToolManifestsStatus{}
}
backup.Status.Manifests.BackupTool.FilePath = pathPrefix
} else if jobStatusConditions[0].Type == batchv1.JobFailed {
backup.Status.Phase = dataprotectionv1alpha1.BackupFailed
backup.Status.FailureReason = job.Status.Conditions[0].Reason
}
if backup.Spec.BackupType == dataprotectionv1alpha1.BackupTypeIncremental {
if backup.Status.Manifests != nil &&
backup.Status.Manifests.BackupLog != nil &&
backup.Status.Manifests.BackupLog.StartTime == nil {
backup.Status.Manifests.BackupLog.StartTime = backup.Status.Manifests.BackupLog.StopTime
}
}
}

// finally, update backup status
Expand Down Expand Up @@ -643,7 +661,7 @@ func (r *BackupReconciler) ensureVolumeSnapshotReady(reqCtx intctrlutil.RequestC

func (r *BackupReconciler) createUpdatesJobs(reqCtx intctrlutil.RequestCtx,
backup *dataprotectionv1alpha1.Backup,
snapshotPolicy *dataprotectionv1alpha1.SnapshotPolicy,
basePolicy *dataprotectionv1alpha1.BasePolicy,
stage dataprotectionv1alpha1.BackupStatusUpdateStage) error {
// get backup policy
backupPolicy := &dataprotectionv1alpha1.BackupPolicy{}
Expand All @@ -655,11 +673,11 @@ func (r *BackupReconciler) createUpdatesJobs(reqCtx intctrlutil.RequestCtx,
reqCtx.Log.V(1).Error(err, "Unable to get backupPolicy for backup.", "backupPolicy", backupPolicyNameSpaceName)
return err
}
for _, update := range snapshotPolicy.BackupStatusUpdates {
for _, update := range basePolicy.BackupStatusUpdates {
if update.UpdateStage != stage {
continue
}
if err := r.createMetadataCollectionJob(reqCtx, backup, snapshotPolicy.BasePolicy, update); err != nil {
if err := r.createMetadataCollectionJob(reqCtx, backup, basePolicy, update); err != nil {
return err
}
}
Expand All @@ -668,10 +686,14 @@ func (r *BackupReconciler) createUpdatesJobs(reqCtx intctrlutil.RequestCtx,

func (r *BackupReconciler) createMetadataCollectionJob(reqCtx intctrlutil.RequestCtx,
backup *dataprotectionv1alpha1.Backup,
basePolicy dataprotectionv1alpha1.BasePolicy,
basePolicy *dataprotectionv1alpha1.BasePolicy,
updateInfo dataprotectionv1alpha1.BackupStatusUpdate) error {
mgrNS := viper.GetString(constant.CfgKeyCtrlrMgrNS)
key := types.NamespacedName{Namespace: mgrNS, Name: backup.Name + "-" + strings.ToLower(updateInfo.Path)}
jobName := backup.Name
if len(backup.Name) > 30 {
jobName = backup.Name[:30]
}
key := types.NamespacedName{Namespace: mgrNS, Name: jobName + "-" + strings.ToLower(updateInfo.Path)}
job := &batchv1.Job{}
// check if job is created
if exists, err := intctrlutil.CheckResourceExists(reqCtx.Ctx, r.Client, key, job); err != nil {
Expand Down Expand Up @@ -1147,7 +1169,7 @@ func addTolerations(podSpec *corev1.PodSpec) (err error) {
func (r *BackupReconciler) buildMetadataCollectionPodSpec(
reqCtx intctrlutil.RequestCtx,
backup *dataprotectionv1alpha1.Backup,
basePolicy dataprotectionv1alpha1.BasePolicy,
basePolicy *dataprotectionv1alpha1.BasePolicy,
updateInfo dataprotectionv1alpha1.BackupStatusUpdate) (corev1.PodSpec, error) {
podSpec := corev1.PodSpec{}
targetPod, err := r.getTargetPod(reqCtx, backup, basePolicy.Target.LabelsSelector.MatchLabels)
Expand Down
15 changes: 11 additions & 4 deletions controllers/dataprotection/backuppolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,11 @@ func (r *BackupPolicyReconciler) removeOldestBackups(reqCtx intctrlutil.RequestC
}

func (r *BackupPolicyReconciler) getCronJobName(backupPolicyName, backupPolicyNamespace string, backupType dataprotectionv1alpha1.BackupType) string {
return fmt.Sprintf("%s-%s-%s", backupPolicyName, backupPolicyNamespace, string(backupType))
name := fmt.Sprintf("%s-%s", backupPolicyName, backupPolicyNamespace)
if len(name) > 30 {
name = name[:30]
}
return fmt.Sprintf("%s-%s", name, string(backupType))
}

// buildCronJob builds cronjob from backup policy.
Expand Down Expand Up @@ -287,6 +291,7 @@ func (r *BackupPolicyReconciler) buildCronJob(
BackupType: string(backType),
ServiceAccount: viper.GetString("KUBEBLOCKS_SERVICEACCOUNT_NAME"),
MgrNamespace: viper.GetString(constant.CfgKeyCtrlrMgrNS),
Image: viper.GetString(constant.KBToolsImage),
}
backupPolicyOptionsByte, err := json.Marshal(options)
if err != nil {
Expand All @@ -295,8 +300,11 @@ func (r *BackupPolicyReconciler) buildCronJob(
if err = cueValue.Fill("options", backupPolicyOptionsByte); err != nil {
return nil, err
}

cronjobByte, err := cueValue.Lookup("cronjob")
cuePath := "cronjob"
if backType == dataprotectionv1alpha1.BackupTypeIncremental {
cuePath = "cronjob_incremental"
}
cronjobByte, err := cueValue.Lookup(cuePath)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -420,7 +428,6 @@ func (r *BackupPolicyReconciler) handleIncrementalPolicy(
reqCtx intctrlutil.RequestCtx,
backupPolicy *dataprotectionv1alpha1.BackupPolicy) error {
if backupPolicy.Spec.Incremental == nil {
// TODO delete cronjob if exists
return nil
}
var cronExpression string
Expand Down
56 changes: 54 additions & 2 deletions controllers/dataprotection/cue/cronjob.cue
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ options: {
backupType: string
ttl: string
serviceAccount: string
image: string
}

cronjob: {
Expand All @@ -37,15 +38,15 @@ cronjob: {
}
spec: {
schedule: options.schedule
successfulJobsHistoryLimit: 1
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 1
concurrencyPolicy: "Forbid"
jobTemplate: spec: template: spec: {
restartPolicy: "Never"
serviceAccountName: options.serviceAccount
containers: [{
name: "backup-policy"
image: "appscode/kubectl:1.25"
image: options.image
imagePullPolicy: "IfNotPresent"
command: [
"sh",
Expand Down Expand Up @@ -73,3 +74,54 @@ EOF
}
}
}

cronjob_incremental: {
apiVersion: "batch/v1"
kind: "CronJob"
metadata: {
name: options.name
namespace: options.mgrNamespace
annotations:
"kubeblocks.io/backup-namespace": options.namespace
labels:
"app.kubernetes.io/managed-by": "kubeblocks"
}
spec: {
schedule: options.schedule
successfulJobsHistoryLimit: 0
failedJobsHistoryLimit: 1
concurrencyPolicy: "Forbid"
jobTemplate: spec: template: spec: {
restartPolicy: "Never"
serviceAccountName: options.serviceAccount
containers: [{
name: "backup-policy"
image: options.image
imagePullPolicy: "IfNotPresent"
command: [
"sh",
"-c",
]
args: [
"""
kubectl apply -f - <<EOF
apiVersion: dataprotection.kubeblocks.io/v1alpha1
kind: Backup
metadata:
labels:
app.kubernetes.io/instance: \(options.cluster)
dataprotection.kubeblocks.io/backup-type: \(options.backupType)
kubeblocks.io/backup-protection: retain
name: \(options.name)
namespace: \(options.namespace)
spec:
backupPolicyName: \(options.backupPolicyName)
backupType: \(options.backupType)
EOF
kubectl patch backup/\(options.name) --subresource=status --type=merge --patch '{"status": {"phase": "New"}}';
""",
]
}]
}
}
}
1 change: 1 addition & 0 deletions controllers/dataprotection/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ type backupPolicyOptions struct {
BackupType string `json:"backupType"`
TTL metav1.Duration `json:"ttl,omitempty"`
ServiceAccount string `json:"serviceAccount"`
Image string `json:"image"`
}
4 changes: 2 additions & 2 deletions deploy/postgresql/config/pg14-config.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

listen_addresses = '*'
port = '5432'
#archive_command = 'wal_dir=/pg/arcwal; [[ $(date +%H%M) == 1200 ]] && rm -rf ${wal_dir}/$(date -d"yesterday" +%Y%m%d); /bin/mkdir -p ${wal_dir}/$(date +%Y%m%d) && /usr/bin/lz4 -q -z %p > ${wal_dir}/$(date +%Y%m%d)/%f.lz4'
#archive_mode = 'True'
archive_command = 'wal_dir=/home/postgres/pgdata/pgroot/arcwal; wal_dir_today=${wal_dir}/$(date +%Y%m%d); [[ $(date +%H%M) == 1200 ]] && rm -rf ${wal_dir}/$(date -d"yesterday" +%Y%m%d); mkdir -p ${wal_dir_today} && gzip -kqc %p > ${wal_dir_today}/%f.gz'
archive_mode = 'on'
auto_explain.log_analyze = 'True'
auto_explain.log_min_duration = '1s'
auto_explain.log_nested_statements = 'True'
Expand Down
16 changes: 13 additions & 3 deletions deploy/postgresql/templates/backuppolicytemplate.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ spec:
baseBackup:
type: snapshot
enable: false
cronExpression: "0 18 * * 0"
cronExpression: "0 18 * * *"
incremental:
enable: false
cronExpression: "*/5 * * * *"
snapshot:
target:
connectionCredentialKey:
Expand All @@ -27,7 +30,14 @@ spec:
backupStatusUpdates:
- path: manifests.backupLog
containerName: postgresql
script: /kb-scripts/backup-log-collector.sh
updateStage: pre
script: /kb-scripts/backup-log-collector.sh true
updateStage: post
full:
backupToolName: postgres-basebackup
incremental:
backupToolName: postgres-pitr
backupStatusUpdates:
- path: manifests.backupLog
containerName: postgresql
script: /kb-scripts/backup-log-collector.sh false
updateStage: pre
Loading

0 comments on commit 321fdc3

Please sign in to comment.