Skip to content

Commit

Permalink
Fixes #RHIROS-1214 - Sources integration (RedHatInsights#103)
Browse files Browse the repository at this point in the history
* Fixes #RHIROS-1214 - Sources integration
  • Loading branch information
patilsuraj767 authored Aug 2, 2023
1 parent 6b78c8e commit 89d5c97
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 0 deletions.
30 changes: 30 additions & 0 deletions clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ objects:
dependencies:
- ingress
- rbac
- sources-api
deployments:
- name: processor
replicas: ${{PROCESSOR_REPLICA_COUNT}}
Expand Down Expand Up @@ -130,6 +131,30 @@ objects:
value: "rosocp-api"
- name: LOG_LEVEL
value: ${LOG_LEVEL}
- name: housekeeper
replicas: ${{HOUSEKEEPER_REPLICA_COUNT}}
podSpec:
image: ${IMAGE}:${IMAGE_TAG}
command: ["sh"]
args: ["-c", "./rosocp db migrate up && ./rosocp start housekeeper"]
resources:
requests:
cpu: ${CPU_REQUEST}
memory: ${MEMORY_REQUEST}
limits:
cpu: ${CPU_LIMIT}
memory: ${MEMORY_LIMIT}
env:
- name: CLOWDER_ENABLED
value: ${CLOWDER_ENABLED}
- name: SSL_CERT_DIR
value: ${SSL_CERT_DIR}
- name: SERVICE_NAME
value: "rosocp-housekeeper"
- name: CW_LOG_STREAM_NAME
value: "rosocp-housekeeper"
- name: LOG_LEVEL
value: ${LOG_LEVEL}
database:
name: rosocp
version: 13
Expand All @@ -138,6 +163,8 @@ objects:
partitions: 1
- topicName: rosocp.kruize.experiments
partitions: 1
- topicName: platform.sources.event-stream
partitions: 1
testing:
iqePlugin: ros-ocp

Expand Down Expand Up @@ -194,6 +221,9 @@ parameters:
- description: Replica count for recommender pod
name: RECOMMENDER_REPLICA_COUNT
value: "1"
- description: Replica count for recommender pod
name: HOUSEKEEPER_REPLICA_COUNT
value: "1"
- description: Time to wait before hitting listRecommendation API
name: KRUIZE_WAIT_TIME
value: "120"
Expand Down
10 changes: 10 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,19 @@ var apiCmd = &cobra.Command{
},
}

var houseKeeperCmd = &cobra.Command{
Use: "housekeeper",
Short: "starts ros-ocp housekeeper service",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("starting ros-ocp housekeeper service")
services.StartHouseKeeperService()
},
}

func init() {
rootCmd.AddCommand(startCmd)
startCmd.AddCommand(processorCmd)
startCmd.AddCommand(recommenderCmd)
startCmd.AddCommand(apiCmd)
startCmd.AddCommand(houseKeeperCmd)
}
Binary file modified docs/images/ROS for Openshift Arch Diagram.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 14 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Config struct {
KafkaAutoCommit bool `mapstructure:"KAFKA_AUTO_COMMIT"`
UploadTopic string `mapstructure:"UPLOAD_TOPIC"`
ExperimentsTopic string `mapstructure:"EXPERIMENTS_TOPIC"`
SourcesEventTopic string `mapstructure:"SOURCES_EVENT_TOPIC"`
KafkaUsername string
KafkaPassword string
KafkaSASLMechanism string
Expand Down Expand Up @@ -59,6 +60,10 @@ type Config struct {

// Prometheus config
PrometheusPort string `mapstructure:"PROMETHEUS_PORT"`

// Sources-api-go config
SourceApiBaseUrl string `mapstructure:"SOURCES_API_BASE_URL"`
SourceApiPrefix string `mapstructure:"SOURCES_API_PREFIX"`
}

var cfg *Config = nil
Expand All @@ -73,6 +78,7 @@ func initConfig() {
viper.SetDefault("KAFKA_BOOTSTRAP_SERVERS", strings.Join(clowder.KafkaServers, ","))
viper.SetDefault("UPLOAD_TOPIC", clowder.KafkaTopics["hccm.ros.events"].Name)
viper.SetDefault("EXPERIMENTS_TOPIC", clowder.KafkaTopics["rosocp.kruize.experiments"].Name)
viper.SetDefault("SOURCES_EVENT_TOPIC", clowder.KafkaTopics["platform.sources.event-stream"].Name)

// Kafka SSL Config
if broker.Authtype != nil {
Expand Down Expand Up @@ -106,6 +112,8 @@ func initConfig() {
viper.SetDefault("RBACPort", endpoint.Port)
viper.SetDefault("RBACProtocol", "http")
viper.SetDefault("RBAC_ENABLE", true)
} else if endpoint.App == "sources-api" {
viper.SetDefault("SOURCES_API_BASE_URL", fmt.Sprintf("http://%v:%v", endpoint.Hostname, endpoint.Port))
}
}

Expand All @@ -125,6 +133,7 @@ func initConfig() {
viper.SetDefault("KAFKA_BOOTSTRAP_SERVERS", "localhost:29092")
viper.SetDefault("UPLOAD_TOPIC", "hccm.ros.events")
viper.SetDefault("EXPERIMENTS_TOPIC", "rosocp.kruize.experiments")
viper.SetDefault("SOURCES_EVENT_TOPIC", "platform.sources.event-stream")

// default DB Config
viper.SetDefault("DBName", "postgres")
Expand All @@ -144,7 +153,12 @@ func initConfig() {
// prometheus config
viper.SetDefault("PROMETHEUS_PORT", "5005")

// Sources-api-go
viper.SetDefault("SOURCES_API_BASE_URL", "http://127.0.0.1:8002")

}

viper.SetDefault("SOURCES_API_PREFIX", "/api/sources/v3.1")
viper.SetDefault("SERVICE_NAME", "rosocp")
viper.SetDefault("API_PORT", "8000")
viper.SetDefault("KRUIZE_WAIT_TIME", "30")
Expand Down
10 changes: 10 additions & 0 deletions internal/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,13 @@ func (c *Cluster) CreateCluster() error {
}
return nil
}

func (c *Cluster) DeleteCluster() error {
db := database.GetDB()
result := db.Where("source_id = ?", c.SourceId).Delete(c)
if result.Error != nil {
dbError.Inc()
return result.Error
}
return nil
}
62 changes: 62 additions & 0 deletions internal/services/houseKeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package services

import (
"encoding/json"
"os"
"strconv"

k "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/labstack/gommon/log"

"github.com/redhatinsights/ros-ocp-backend/internal/config"
"github.com/redhatinsights/ros-ocp-backend/internal/kafka"
"github.com/redhatinsights/ros-ocp-backend/internal/logging"
"github.com/redhatinsights/ros-ocp-backend/internal/model"
"github.com/redhatinsights/ros-ocp-backend/internal/types"
"github.com/redhatinsights/ros-ocp-backend/internal/utils/sources"
)

var cost_app_id int

func StartHouseKeeperService() {
log := logging.GetLogger()
cfg := config.GetConfig()
var err error
cost_app_id, err = sources.GetCostApplicationID()
if err != nil {
log.Error("Unable to get cost application id")
os.Exit(1)
}

kafka.StartConsumer(cfg.SourcesEventTopic, sourcesListener)

}

func sourcesListener(msg *k.Message) {
headers := msg.Headers
for _, v := range headers {
if v.Key == "event_type" && string(v.Value) == "Application.destroy" {
var data types.SourcesEvent
if !json.Valid([]byte(msg.Value)) {
log.Errorf("Received message on kafka topic is not vaild JSON: %s", msg.Value)
return
}
if err := json.Unmarshal(msg.Value, &data); err != nil {
log.Errorf("Unable to decode kafka message: %s", msg.Value)
return
}
if data.Application_type_id == cost_app_id {
cluster := model.Cluster{
SourceId: strconv.Itoa(data.Source_id),
}
if err := cluster.DeleteCluster(); err != nil {
log.Errorf("unable to delete record from clusters table: %v. Error: %v", cluster, err)
} else {
log.Infof("Successfully deleted the cluster with Source_id: %v.", cluster.SourceId)
}
}

}
}

}
8 changes: 8 additions & 0 deletions internal/types/sources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package types

type SourcesEvent struct {
Id int `validate:"required"`
Source_id int `validate:"required"`
Application_type_id int `validate:"required"`
Tenant string `validate:"required"`
}
34 changes: 34 additions & 0 deletions internal/utils/sources/sources_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package sources

import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"

"github.com/redhatinsights/ros-ocp-backend/internal/config"
)

var cfg *config.Config = config.GetConfig()

func GetCostApplicationID() (int, error) {
url := cfg.SourceApiBaseUrl + cfg.SourceApiPrefix + "/application_types?filter[name][eq]=/insights/platform/cost-management"
res, err := http.Get(url)
if err != nil {
return 0, fmt.Errorf("error while calling sources API: %v", err)
}
defer res.Body.Close()
body, _ := io.ReadAll(res.Body)
if res.StatusCode != 200 {
return 0, fmt.Errorf("%v", res)
}
payload := map[string]interface{}{}
if err := json.Unmarshal(body, &payload); err != nil {
return 0, fmt.Errorf("unable to unmarshal response of sources /application_types API %v", err)
}
data := payload["data"].([]interface{})
app := data[0].(map[string]interface{})
cost_app_id, _ := strconv.Atoi(app["id"].(string))
return cost_app_id, nil
}
58 changes: 58 additions & 0 deletions scripts/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ services:
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:29092 1 20 && \
kafka-topics --create --if-not-exists --topic hccm.ros.events --bootstrap-server kafka:29092 && \
kafka-topics --create --if-not-exists --topic platform.sources.event-stream --bootstrap-server kafka:29092 && \
kafka-topics --create --if-not-exists --topic rosocp.kruize.experiments --bootstrap-server kafka:29092'"
depends_on:
- kafka
Expand Down Expand Up @@ -87,3 +88,60 @@ services:
- ./samples:/usr/share/nginx/html
ports:
- 8888:80

db-sources:
image: postgres
restart: always
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
POSTGRES_DB: sources_api_development
ports:
- "15434:5432"

redis:
image: quay.io/cloudservices/redis-ephemeral:6
ports:
- "6379:6379"

sources-db-setup:
image: quay.io/cloudservices/sources-api-go
restart: on-failure
command: -setup
environment:
- DATABASE_USER=postgres
- DATABASE_PASSWORD=postgres
- DATABASE_HOST=db-sources
- DATABASE_PORT=5432
- DATABASE_NAME=sources_api_development
- LOG_LEVEL=DEBUG
- REDIS_CACHE_HOST=redis
- REDIS_CACHE_PORT=6379
- BYPASS_RBAC=true
- QUEUE_HOST=kafka
- QUEUE_PORT=29092
depends_on:
- db-sources

sources-api-go:
image: quay.io/cloudservices/sources-api-go
command: ""
restart: always
ports:
- 8002:8000
environment:
- DATABASE_USER=postgres
- DATABASE_PASSWORD=postgres
- DATABASE_HOST=db-sources
- DATABASE_PORT=5432
- DATABASE_NAME=sources_api_development
- LOG_LEVEL=DEBUG
- REDIS_CACHE_HOST=redis
- REDIS_CACHE_PORT=6379
- BYPASS_RBAC=true
- QUEUE_HOST=kafka
- QUEUE_PORT=29092
depends_on:
- db-sources
- kafka
- redis

0 comments on commit 89d5c97

Please sign in to comment.