Skip to content

Commit

Permalink
DB schema changes (#20)
Browse files Browse the repository at this point in the history
* API update

* Added changes to openapi.json
  • Loading branch information
patilsuraj767 authored Mar 24, 2023
1 parent 2370315 commit 6e4cbe3
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 52 deletions.
3 changes: 3 additions & 0 deletions docs/database/db-schema
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ Table workloads {
}

Table recommendation_sets {
id uuid [increment]
workload_id bigint [ref: > workloads.id]
container_name text
monitoring_start_time datetime
monitoring_end_time datetime
recommendations jsonb
created_at datetime
Indexes {
id [pk]
workload_id [name: "workloads.id_fkey", type: btree]
}
}
Binary file modified docs/database/db-schema.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 6 additions & 6 deletions internal/api/docs/ros_ocp_backend.postman_collection.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
}
],
"url": {
"raw": "localhost:8088/api/cost-management/v1/recommendations/openshift?start_date=2023-03-12&end_date=2023-03-21&project=proj_rxu&cluster=6678&workload_type=replicaset&workload=deployment_proj_&container=postgres&limit=1",
"raw": "localhost:8000/api/cost-management/v1/recommendations/openshift?start_date=2023-03-12&end_date=2023-03-21&project=proj_rxu&cluster=6678&workload_type=replicaset&workload=deployment_proj_&container=postgres&limit=1",
"host": [
"localhost"
],
"port": "8088",
"port": "8000",
"path": [
"api",
"cost-management",
Expand Down Expand Up @@ -95,11 +95,11 @@
}
],
"url": {
"raw": "localhost:8088/api/cost-management/v1/recommendations/openshift/{recommendation-id}",
"raw": "localhost:8000/api/cost-management/v1/recommendations/openshift/{recommendation-id}",
"host": [
"localhost"
],
"port": "8088",
"port": "8000",
"path": [
"api",
"cost-management",
Expand All @@ -118,11 +118,11 @@
"method": "GET",
"header": [],
"url": {
"raw": "localhost:8088/api/cost-management/v1/recommendations/openshift/openapi.json",
"raw": "localhost:8000/api/cost-management/v1/recommendations/openshift/openapi.json",
"host": [
"localhost"
],
"port": "8088",
"port": "8000",
"path": [
"api",
"cost-management",
Expand Down
13 changes: 3 additions & 10 deletions internal/api/docs/v1/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,9 @@
"type": "string",
"example": "replicaset"
},
"containers": {
"type": "array",
"items": {
"type": "string",
"example": [
"node",
"postgres",
"apache"
]
}
"container": {
"type": "string",
"example": "container-1"
},
"values": {
"type": "object",
Expand Down
4 changes: 2 additions & 2 deletions internal/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func GetRecommendationSetList(c echo.Context) error {
recommendationData["project"] = recommendation.Workload.Namespace
recommendationData["workload_type"] = recommendation.Workload.WorkloadType
recommendationData["workload"] = recommendation.Workload.WorkloadName
recommendationData["containers"] = recommendation.Workload.Containers
recommendationData["container"] = recommendation.ContainerName
recommendationData["last_report"] = recommendation.Workload.Cluster.LastReportedAtStr
recommendationData["values"] = recommendation.Recommendations
allRecommendations = append(allRecommendations, recommendationData)
Expand Down Expand Up @@ -102,7 +102,7 @@ func GetRecommendationSet(c echo.Context) error {
recommendationSlice["project"] = recommendationSet.Workload.Namespace
recommendationSlice["workload_type"] = recommendationSet.Workload.WorkloadType
recommendationSlice["workload"] = recommendationSet.Workload.WorkloadName
recommendationSlice["containers"] = recommendationSet.Workload.Containers
recommendationSlice["container"] = recommendationSet.ContainerName
recommendationSlice["last_report"] = recommendationSet.Workload.Cluster.LastReportedAtStr
recommendationSlice["values"] = recommendationSet.Recommendations
}
Expand Down
13 changes: 12 additions & 1 deletion internal/model/recommendation_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
type RecommendationSet struct {
ID string `gorm:"primaryKey;not null;autoIncrement"`
WorkloadID uint
Workload Workload `gorm:"foreignKey:WorkloadID"`
Workload Workload `gorm:"foreignKey:WorkloadID"`
ContainerName string
MonitoringStartTime time.Time `gorm:"type:timestamp"`
MonitoringEndTime time.Time `gorm:"type:timestamp"`
Recommendations datatypes.JSON
Expand Down Expand Up @@ -96,3 +97,13 @@ func (r *RecommendationSet) CreateRecommendationSet() error {

return nil
}

func DeleteStaleRecommendationSet(workload_id uint, containers []string) error {
db := database.GetDB()
result := db.Where("workload_id = ? AND container_name NOT IN ?", workload_id, containers).Delete(&RecommendationSet{})
if result.Error != nil {
return result.Error
}

return nil
}
2 changes: 1 addition & 1 deletion internal/processor/kruize_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func List_recommendations(experiment types.ExperimentEvent) ([]kruizePayload.Lis
}
q := req.URL.Query()
q.Add("experiment_name", experiment.Experiment_name)
q.Add("experiment_name", experiment.Monitoring_start_time)
q.Add("monitoring_end_time", experiment.Monitoring_end_time)
req.URL.RawQuery = q.Encode()
res, err := client.Do(req)
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions internal/processor/report_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func ProcessReport(msg *kafka.Message) {
namespace := k8s_object[0]["namespace"].(string)
k8s_object_type := k8s_object[0]["k8s_object_type"].(string)
k8s_object_name := k8s_object[0]["k8s_object_name"].(string)
monitoring_start_time := k8s_object[0]["k8s_object_name"].(string)
monitoring_end_time := k8s_object[0]["interval_end"].(string)

experiment_name := generateExperimentName(
kafkaMsg.Metadata.Org_id,
Expand Down Expand Up @@ -119,14 +119,14 @@ func ProcessReport(msg *kafka.Message) {
}
// Sending list_of_experiments to rosocp.kruize.experiments topic.
experimentEventMsg := types.ExperimentEvent{
WorkloadID: workload.ID,
Experiment_name: experiment_name,
K8s_object_name: k8s_object[0]["k8s_object_name"].(string),
K8s_object_type: k8s_object[0]["k8s_object_type"].(string),
Namespace: k8s_object[0]["namespace"].(string),
Fetch_time: time.Now().Add(time.Second * time.Duration(waittime)),
Monitoring_start_time: monitoring_start_time,
K8s_object: k8s_object,
WorkloadID: workload.ID,
Experiment_name: experiment_name,
K8s_object_name: k8s_object[0]["k8s_object_name"].(string),
K8s_object_type: k8s_object[0]["k8s_object_type"].(string),
Namespace: k8s_object[0]["namespace"].(string),
Fetch_time: time.Now().Add(time.Second * time.Duration(waittime)),
Monitoring_end_time: monitoring_end_time,
K8s_object: k8s_object,
}

msgBytes, err := json.Marshal(experimentEventMsg)
Expand Down
42 changes: 27 additions & 15 deletions internal/services/recommender.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,36 @@ func ProcessEvent(msg *kafka.Message) {
}

if is_valid_recommendation(data) {
for _, v := range data[0].Kubernetes_objects[0].Containers[0].Recommendations {
marshalData, err := json.Marshal(v)
if err != nil {
log.Errorf("Unable to list recommendation for: %v", err)
}
// Create RecommendationSet entry into the table.
recommendationSet := model.RecommendationSet{
WorkloadID: kafkaMsg.WorkloadID,
MonitoringStartTime: v.Duration_based.Short_term.Monitoring_start_time,
MonitoringEndTime: v.Duration_based.Short_term.Monitoring_end_time,
Recommendations: marshalData,
}
if err := recommendationSet.CreateRecommendationSet(); err != nil {
log.Errorf("unable to get or add record to recommendation set table: %v. Error: %v", recommendationSet, err)
return
containers := data[0].Kubernetes_objects[0].Containers
container_names := make([]string, 0, len(containers))
for _, container := range containers {
container_names = append(container_names, container.Container_name)
for _, v := range container.Recommendations {
marshalData, err := json.Marshal(v)
if err != nil {
log.Errorf("Unable to list recommendation for: %v", err)
}

// Create RecommendationSet entry into the table.
recommendationSet := model.RecommendationSet{
WorkloadID: kafkaMsg.WorkloadID,
ContainerName: container.Container_name,
MonitoringStartTime: v.Duration_based.Short_term.Monitoring_start_time,
MonitoringEndTime: v.Duration_based.Short_term.Monitoring_end_time,
Recommendations: marshalData,
}
if err := recommendationSet.CreateRecommendationSet(); err != nil {
log.Errorf("unable to get or add record to recommendation set table: %v. Error: %v", recommendationSet, err)
return
}
}
}

// Delete stale container of current workload.
if err := model.DeleteStaleRecommendationSet(kafkaMsg.WorkloadID, container_names); err != nil {
log.Errorf("unable remove stale containers, Error: %v", err)
return
}
} else {
if err := processor.Update_results(kafkaMsg.Experiment_name, kafkaMsg.K8s_object); err != nil {
log.Error(err)
Expand Down
16 changes: 8 additions & 8 deletions internal/types/experimentEvent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package types
import "time"

type ExperimentEvent struct {
WorkloadID uint `validate:"required"`
Experiment_name string `validate:"required"`
K8s_object_name string `validate:"required"`
K8s_object_type string `validate:"required"`
Namespace string `validate:"required"`
Fetch_time time.Time `validate:"required"`
Monitoring_start_time string `validate:"required"`
K8s_object []map[string]interface{} `validate:"required"`
WorkloadID uint `validate:"required"`
Experiment_name string `validate:"required"`
K8s_object_name string `validate:"required"`
K8s_object_type string `validate:"required"`
Namespace string `validate:"required"`
Fetch_time time.Time `validate:"required"`
Monitoring_end_time string `validate:"required"`
K8s_object []map[string]interface{} `validate:"required"`
}
1 change: 1 addition & 0 deletions migrations/000004_create_recommendation_sets_table.up.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS recommendation_sets(
id uuid DEFAULT gen_random_uuid() PRIMARY KEY,
workload_id BIGINT,
container_name TEXT NOT NULL,
monitoring_start_time TIMESTAMP WITH TIME ZONE NOT NULL,
monitoring_end_time TIMESTAMP WITH TIME ZONE NOT NULL,
recommendations jsonb NOT NULL,
Expand Down

0 comments on commit 6e4cbe3

Please sign in to comment.