diff --git a/local/local_provider.go b/local/local_provider.go index 0e665287..4aafc348 100644 --- a/local/local_provider.go +++ b/local/local_provider.go @@ -3,12 +3,15 @@ package local import ( "fmt" "log" + "time" + e "github.com/pkg/errors" "github.com/sorenmat/k8s-rds/crd" "github.com/sorenmat/k8s-rds/provider" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) @@ -16,6 +19,7 @@ import ( type Local struct { ServiceProvider provider.ServiceProvider kc kubernetes.Interface + SkipWaiting bool } func New(db *crd.Database, kc kubernetes.Interface) (*Local, error) { @@ -26,6 +30,11 @@ func New(db *crd.Database, kc kubernetes.Interface) (*Local, error) { // CreateDatabase creates a database from the CRD database object, is also ensures that the correct // subnets are created for the database so we can access it func (r *Local) CreateDatabase(db *crd.Database) (string, error) { + + if err := r.createPVC(db.Name, db.Namespace, db.Spec.Size); err != nil { + return "", err + } + new := false d, err := r.kc.AppsV1().Deployments(db.Namespace).Get(db.Name, metav1.GetOptions{}) if err != nil && !errors.IsNotFound(err) { @@ -63,10 +72,129 @@ func (r *Local) CreateDatabase(db *crd.Database) (string, error) { return db.Name, nil } +const ( + defaultLocalRDSPVSizeUnit = "Gi" + maxAmountOfWaitIterations = 100 + iterationWaitPeriodSec = 5 * time.Second +) + +func (r *Local) createPVC(name, namespace string, size int64) error { + newPVC := false + + pvc, err := r.kc.CoreV1().PersistentVolumeClaims(namespace).Get(name, + metav1.GetOptions{}) + if err != nil && !errors.IsNotFound(err) { + // we got an error and it's not the NotFound, let's crash + return err + } + if errors.IsNotFound(err) { + // Deployment seems to be empty, let's assume it means we need to create it + pvc = &corev1.PersistentVolumeClaim{} + newPVC = true + } + + pvc.Name = name + pvc.ObjectMeta = metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "app": name, + }, + } + + pvc.Annotations = map[string]string{ + "repository": "https://github.com/sorenmat/k8s-rds", + } + + pvc.Spec = corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + "ReadWriteOnce", + }, + + Resources: corev1.ResourceRequirements{ + + Requests: corev1.ResourceList{ + "storage": resource.MustParse(fmt.Sprintf("%d%s", + size, defaultLocalRDSPVSizeUnit)), + }, + }, + } + + if newPVC { + log.Printf("creating pvc %v", name) + _, err = r.kc.CoreV1().PersistentVolumeClaims(namespace).Create(pvc) + if err != nil { + return err + } + } else { + log.Printf("updating pvc %v", name) + _, err = r.kc.CoreV1().PersistentVolumeClaims(namespace).Update(pvc) + if err != nil { + return err + } + } + + if !r.SkipWaiting { + pvcIsReady := false + for i := 0; i < maxAmountOfWaitIterations; i++ { + + pvc, err := r.kc.CoreV1().PersistentVolumeClaims(namespace).Get(name, + metav1.GetOptions{}) + + if err != nil { + return e.Wrap(err, "problem of getting pvcs") + } + if pvc.Status.Phase == "Bound" { + pv, err := r.kc.CoreV1().PersistentVolumes().Get(pvc.Spec.VolumeName, + metav1.GetOptions{}) + if err != nil { + return e.Wrap(err, "problem of getting pv") + } + if pv.Status.Phase == "Bound" { + pvcIsReady = true + break + } + } + time.Sleep(iterationWaitPeriodSec) + } + + if pvcIsReady { + log.Printf("pvc %v is ready (bound)\n", name) + return nil + } + + return fmt.Errorf("Max amount of wait iterations for pvc %s being bound is expired", + name) + } + + return nil +} + +const ( + nDeleteAttempts = 20 +) + +// DeleteDatabase deletes the db pod and pvc func (r *Local) DeleteDatabase(db *crd.Database) error { // delete the database instance - err := r.kc.AppsV1().Deployments(db.Namespace).Delete(db.Name, &metav1.DeleteOptions{}) - return err + + for i := 0; i < nDeleteAttempts; i++ { + if err := r.kc.AppsV1().Deployments(db.Namespace).Delete(db.Name, + &metav1.DeleteOptions{}); err != nil { + fmt.Printf("ERROR: error while deleting the deployment: %v\n", err) + continue + } + + if err := r.kc.CoreV1().PersistentVolumeClaims(db.Namespace).Delete(db.Name, + &metav1.DeleteOptions{}); err != nil { + fmt.Printf("ERROR: error while deleting the pvc: %v\n", err) + continue + } + + return nil + } + + return fmt.Errorf("The number of attempts to delete db %s has exceeded", + db.ObjectMeta.Name) } func int32Ptr(i int32) *int32 { return &i } @@ -92,10 +220,28 @@ func toSpec(db *crd.Database) v1.DeploymentSpec { Name: db.Name, Image: db.Spec.Engine, // TODO is this correct Env: []corev1.EnvVar{corev1.EnvVar{ - Name: "POSTGRES_PASSWORD", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: db.Spec.Password.Name}, Key: db.Spec.Password.Key}}}, + Name: "POSTGRES_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: db.Spec.Password.Name, + }, + Key: db.Spec.Password.Key, + }, + }, + }, corev1.EnvVar{Name: "POSTGRES_USER", Value: db.Spec.Username}, corev1.EnvVar{Name: "POSTGRES_DB", Value: db.Spec.DBName}, + corev1.EnvVar{Name: "PGDATA", + Value: "/var/lib/postgresql/data/pgdata"}, }, + VolumeMounts: []corev1.VolumeMount{ + corev1.VolumeMount{ + Name: fmt.Sprintf("%s-data", db.Name), + MountPath: "/var/lib/postgresql/data", + }, + }, + Ports: []corev1.ContainerPort{ { Name: "pgsql", @@ -104,7 +250,19 @@ func toSpec(db *crd.Database) v1.DeploymentSpec { }, }}, }, + + Volumes: []corev1.Volume{ + corev1.Volume{ + Name: fmt.Sprintf("%s-data", db.Name), + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: db.Name, + }, + }, + }, + }, }, }, } + } diff --git a/local/local_provider_test.go b/local/local_provider_test.go index 2450cb55..426edd04 100644 --- a/local/local_provider_test.go +++ b/local/local_provider_test.go @@ -52,17 +52,45 @@ func TestCreateDatabase(t *testing.T) { kc := testclient.NewSimpleClientset() l, err := New(db, kc) assert.NoError(t, err) + // we need it to not wait for status + l.SkipWaiting = true host, err := l.CreateDatabase(db) assert.NoError(t, err) assert.NotEmpty(t, host) - assert.Equal(t, "get", kc.Fake.Actions()[0].GetVerb()) - assert.Equal(t, "apps", kc.Fake.Actions()[0].GetResource().GroupResource().Group) - assert.Equal(t, "deployments", kc.Fake.Actions()[0].GetResource().GroupResource().Resource) - // create it - assert.Equal(t, "create", kc.Fake.Actions()[1].GetVerb()) - assert.Equal(t, "apps", kc.Fake.Actions()[1].GetResource().GroupResource().Group) - assert.Equal(t, "deployments", kc.Fake.Actions()[1].GetResource().GroupResource().Resource) + sequence := []struct { + Action string + Group string + Resource string + }{ + { + Action: "get", + Group: "", + Resource: "persistentvolumeclaims", + }, + { + Action: "create", + Group: "", + Resource: "persistentvolumeclaims", + }, + { + Action: "get", + Group: "apps", + Resource: "deployments", + }, + { + Action: "create", + Group: "apps", + Resource: "deployments", + }, + } + + for i, action := range kc.Fake.Actions() { + assert.Equal(t, sequence[i].Action, action.GetVerb()) + assert.Equal(t, sequence[i].Group, action.GetResource().GroupResource().Group) + assert.Equal(t, sequence[i].Resource, action.GetResource().GroupResource().Resource) + } + } func TestUpdateDatabase(t *testing.T) { @@ -85,18 +113,66 @@ func TestUpdateDatabase(t *testing.T) { kc := testclient.NewSimpleClientset() l, err := New(db, kc) assert.NoError(t, err) + // we need it to not wait for status + l.SkipWaiting = true host, err := l.CreateDatabase(db) assert.NoError(t, err) assert.NotEmpty(t, host) - assert.Equal(t, 2, len(kc.Fake.Actions())) + assert.Equal(t, 4, len(kc.Fake.Actions())) host, err = l.CreateDatabase(db) + assert.Equal(t, 8, len(kc.Fake.Actions())) - assert.Equal(t, 4, len(kc.Fake.Actions())) - assert.Equal(t, "get", kc.Fake.Actions()[2].GetVerb()) - assert.Equal(t, "apps", kc.Fake.Actions()[2].GetResource().GroupResource().Group) - assert.Equal(t, "deployments", kc.Fake.Actions()[2].GetResource().GroupResource().Resource) - // create it - assert.Equal(t, "update", kc.Fake.Actions()[3].GetVerb()) - assert.Equal(t, "apps", kc.Fake.Actions()[3].GetResource().GroupResource().Group) - assert.Equal(t, "deployments", kc.Fake.Actions()[3].GetResource().GroupResource().Resource) + sequence := []struct { + Action string + Group string + Resource string + }{ + { + Action: "get", + Group: "", + Resource: "persistentvolumeclaims", + }, + { + Action: "create", + Group: "", + Resource: "persistentvolumeclaims", + }, + { + Action: "get", + Group: "apps", + Resource: "deployments", + }, + { + Action: "create", + Group: "apps", + Resource: "deployments", + }, + + { + Action: "get", + Group: "", + Resource: "persistentvolumeclaims", + }, + { + Action: "update", + Group: "", + Resource: "persistentvolumeclaims", + }, + { + Action: "get", + Group: "apps", + Resource: "deployments", + }, + { + Action: "update", + Group: "apps", + Resource: "deployments", + }, + } + + for i, action := range kc.Fake.Actions() { + assert.Equal(t, sequence[i].Action, action.GetVerb()) + assert.Equal(t, sequence[i].Group, action.GetResource().GroupResource().Group) + assert.Equal(t, sequence[i].Resource, action.GetResource().GroupResource().Resource) + } }