Skip to content

Commit

Permalink
Add pvc for local provider
Browse files Browse the repository at this point in the history
Add PVC for local provider to persist data in case of non AWS usage.
Change the tests to cover PVC usage in local mode
  • Loading branch information
Pavel Gonchukov committed Oct 4, 2019
1 parent ccff74f commit 328275f
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 19 deletions.
164 changes: 161 additions & 3 deletions local/local_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package local
import (
"fmt"
"log"
"time"

e "github.com/pkg/errors"
"github.com/sorenmat/k8s-rds/crd"
"github.com/sorenmat/k8s-rds/provider"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

type Local struct {
ServiceProvider provider.ServiceProvider
kc kubernetes.Interface
SkipWaiting bool
}

func New(db *crd.Database, kc kubernetes.Interface) (*Local, error) {
Expand All @@ -26,6 +30,11 @@ func New(db *crd.Database, kc kubernetes.Interface) (*Local, error) {
// CreateDatabase creates a database from the CRD database object, is also ensures that the correct
// subnets are created for the database so we can access it
func (r *Local) CreateDatabase(db *crd.Database) (string, error) {

if err := r.createPVC(db.Name, db.Namespace, db.Spec.Size); err != nil {
return "", err
}

new := false
d, err := r.kc.AppsV1().Deployments(db.Namespace).Get(db.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
Expand Down Expand Up @@ -63,10 +72,129 @@ func (r *Local) CreateDatabase(db *crd.Database) (string, error) {
return db.Name, nil
}

const (
defaultLocalRDSPVSizeUnit = "Gi"
maxAmountOfWaitIterations = 100
iterationWaitPeriodSec = 5 * time.Second
)

func (r *Local) createPVC(name, namespace string, size int64) error {
newPVC := false

pvc, err := r.kc.CoreV1().PersistentVolumeClaims(namespace).Get(name,
metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
// we got an error and it's not the NotFound, let's crash
return err
}
if errors.IsNotFound(err) {
// Deployment seems to be empty, let's assume it means we need to create it
pvc = &corev1.PersistentVolumeClaim{}
newPVC = true
}

pvc.Name = name
pvc.ObjectMeta = metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
"app": name,
},
}

pvc.Annotations = map[string]string{
"repository": "https://github.com/sorenmat/k8s-rds",
}

pvc.Spec = corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
"ReadWriteOnce",
},

Resources: corev1.ResourceRequirements{

Requests: corev1.ResourceList{
"storage": resource.MustParse(fmt.Sprintf("%d%s",
size, defaultLocalRDSPVSizeUnit)),
},
},
}

if newPVC {
log.Printf("creating pvc %v", name)
_, err = r.kc.CoreV1().PersistentVolumeClaims(namespace).Create(pvc)
if err != nil {
return err
}
} else {
log.Printf("updating pvc %v", name)
_, err = r.kc.CoreV1().PersistentVolumeClaims(namespace).Update(pvc)
if err != nil {
return err
}
}

if !r.SkipWaiting {
pvcIsReady := false
for i := 0; i < maxAmountOfWaitIterations; i++ {

pvc, err := r.kc.CoreV1().PersistentVolumeClaims(namespace).Get(name,
metav1.GetOptions{})

if err != nil {
return e.Wrap(err, "problem of getting pvcs")
}
if pvc.Status.Phase == "Bound" {
pv, err := r.kc.CoreV1().PersistentVolumes().Get(pvc.Spec.VolumeName,
metav1.GetOptions{})
if err != nil {
return e.Wrap(err, "problem of getting pv")
}
if pv.Status.Phase == "Bound" {
pvcIsReady = true
break
}
}
time.Sleep(iterationWaitPeriodSec)
}

if pvcIsReady {
log.Printf("pvc %v is ready (bound)\n", name)
return nil
}

return fmt.Errorf("Max amount of wait iterations for pvc %s being bound is expired",
name)
}

return nil
}

const (
nDeleteAttempts = 20
)

// DeleteDatabase deletes the db pod and pvc
func (r *Local) DeleteDatabase(db *crd.Database) error {
// delete the database instance
err := r.kc.AppsV1().Deployments(db.Namespace).Delete(db.Name, &metav1.DeleteOptions{})
return err

for i := 0; i < nDeleteAttempts; i++ {
if err := r.kc.AppsV1().Deployments(db.Namespace).Delete(db.Name,
&metav1.DeleteOptions{}); err != nil {
fmt.Printf("ERROR: error while deleting the deployment: %v\n", err)
continue
}

if err := r.kc.CoreV1().PersistentVolumeClaims(db.Namespace).Delete(db.Name,
&metav1.DeleteOptions{}); err != nil {
fmt.Printf("ERROR: error while deleting the pvc: %v\n", err)
continue
}

return nil
}

return fmt.Errorf("The number of attempts to delete db %s has exceeded",
db.ObjectMeta.Name)
}

func int32Ptr(i int32) *int32 { return &i }
Expand All @@ -92,10 +220,28 @@ func toSpec(db *crd.Database) v1.DeploymentSpec {
Name: db.Name,
Image: db.Spec.Engine, // TODO is this correct
Env: []corev1.EnvVar{corev1.EnvVar{
Name: "POSTGRES_PASSWORD", ValueFrom: &corev1.EnvVarSource{SecretKeyRef: &corev1.SecretKeySelector{LocalObjectReference: corev1.LocalObjectReference{Name: db.Spec.Password.Name}, Key: db.Spec.Password.Key}}},
Name: "POSTGRES_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: db.Spec.Password.Name,
},
Key: db.Spec.Password.Key,
},
},
},
corev1.EnvVar{Name: "POSTGRES_USER", Value: db.Spec.Username},
corev1.EnvVar{Name: "POSTGRES_DB", Value: db.Spec.DBName},
corev1.EnvVar{Name: "PGDATA",
Value: "/var/lib/postgresql/data/pgdata"},
},
VolumeMounts: []corev1.VolumeMount{
corev1.VolumeMount{
Name: fmt.Sprintf("%s-data", db.Name),
MountPath: "/var/lib/postgresql/data",
},
},

Ports: []corev1.ContainerPort{
{
Name: "pgsql",
Expand All @@ -104,7 +250,19 @@ func toSpec(db *crd.Database) v1.DeploymentSpec {
},
}},
},

Volumes: []corev1.Volume{
corev1.Volume{
Name: fmt.Sprintf("%s-data", db.Name),
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: db.Name,
},
},
},
},
},
},
}

}
108 changes: 92 additions & 16 deletions local/local_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,45 @@ func TestCreateDatabase(t *testing.T) {
kc := testclient.NewSimpleClientset()
l, err := New(db, kc)
assert.NoError(t, err)
// we need it to not wait for status
l.SkipWaiting = true
host, err := l.CreateDatabase(db)
assert.NoError(t, err)
assert.NotEmpty(t, host)

assert.Equal(t, "get", kc.Fake.Actions()[0].GetVerb())
assert.Equal(t, "apps", kc.Fake.Actions()[0].GetResource().GroupResource().Group)
assert.Equal(t, "deployments", kc.Fake.Actions()[0].GetResource().GroupResource().Resource)
// create it
assert.Equal(t, "create", kc.Fake.Actions()[1].GetVerb())
assert.Equal(t, "apps", kc.Fake.Actions()[1].GetResource().GroupResource().Group)
assert.Equal(t, "deployments", kc.Fake.Actions()[1].GetResource().GroupResource().Resource)
sequence := []struct {
Action string
Group string
Resource string
}{
{
Action: "get",
Group: "",
Resource: "persistentvolumeclaims",
},
{
Action: "create",
Group: "",
Resource: "persistentvolumeclaims",
},
{
Action: "get",
Group: "apps",
Resource: "deployments",
},
{
Action: "create",
Group: "apps",
Resource: "deployments",
},
}

for i, action := range kc.Fake.Actions() {
assert.Equal(t, sequence[i].Action, action.GetVerb())
assert.Equal(t, sequence[i].Group, action.GetResource().GroupResource().Group)
assert.Equal(t, sequence[i].Resource, action.GetResource().GroupResource().Resource)
}

}

func TestUpdateDatabase(t *testing.T) {
Expand All @@ -85,18 +113,66 @@ func TestUpdateDatabase(t *testing.T) {
kc := testclient.NewSimpleClientset()
l, err := New(db, kc)
assert.NoError(t, err)
// we need it to not wait for status
l.SkipWaiting = true
host, err := l.CreateDatabase(db)
assert.NoError(t, err)
assert.NotEmpty(t, host)
assert.Equal(t, 2, len(kc.Fake.Actions()))
assert.Equal(t, 4, len(kc.Fake.Actions()))
host, err = l.CreateDatabase(db)
assert.Equal(t, 8, len(kc.Fake.Actions()))

assert.Equal(t, 4, len(kc.Fake.Actions()))
assert.Equal(t, "get", kc.Fake.Actions()[2].GetVerb())
assert.Equal(t, "apps", kc.Fake.Actions()[2].GetResource().GroupResource().Group)
assert.Equal(t, "deployments", kc.Fake.Actions()[2].GetResource().GroupResource().Resource)
// create it
assert.Equal(t, "update", kc.Fake.Actions()[3].GetVerb())
assert.Equal(t, "apps", kc.Fake.Actions()[3].GetResource().GroupResource().Group)
assert.Equal(t, "deployments", kc.Fake.Actions()[3].GetResource().GroupResource().Resource)
sequence := []struct {
Action string
Group string
Resource string
}{
{
Action: "get",
Group: "",
Resource: "persistentvolumeclaims",
},
{
Action: "create",
Group: "",
Resource: "persistentvolumeclaims",
},
{
Action: "get",
Group: "apps",
Resource: "deployments",
},
{
Action: "create",
Group: "apps",
Resource: "deployments",
},

{
Action: "get",
Group: "",
Resource: "persistentvolumeclaims",
},
{
Action: "update",
Group: "",
Resource: "persistentvolumeclaims",
},
{
Action: "get",
Group: "apps",
Resource: "deployments",
},
{
Action: "update",
Group: "apps",
Resource: "deployments",
},
}

for i, action := range kc.Fake.Actions() {
assert.Equal(t, sequence[i].Action, action.GetVerb())
assert.Equal(t, sequence[i].Group, action.GetResource().GroupResource().Group)
assert.Equal(t, sequence[i].Resource, action.GetResource().GroupResource().Resource)
}
}

0 comments on commit 328275f

Please sign in to comment.