Skip to content

Commit

Permalink
Merge pull request aiven#85 from aiven/ivans-new-kafka-topic
Browse files Browse the repository at this point in the history
add new kafka topic endpoint functionality

aiven#85
  • Loading branch information
rikonen authored Oct 13, 2020
2 parents 43d2abd + 8f3e0ab commit 21d29d6
Show file tree
Hide file tree
Showing 2 changed files with 291 additions and 20 deletions.
139 changes: 119 additions & 20 deletions kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,113 @@
package aiven

type (
// KafkaTopicConfig represents a Kafka Topic Config on Aiven.
KafkaTopicConfig struct {
CleanupPolicy string `json:"cleanup_policy,omitempty"`
CompressionType string `json:"compression_type,omitempty"`
DeleteRetentionMs *int `json:"delete_retention_ms,omitempty"`
FileDeleteDelayMs *int `json:"file_delete_delay_ms,omitempty"`
FlushMessages *int `json:"flush_messages,omitempty"`
FlushMs *int `json:"flush_ms,omitempty"`
IndexIntervalBytes *int `json:"index_interval_bytes,omitempty"`
MaxCompactionLagMs *int `json:"max_compaction_lag_ms,omitempty"`
MaxMessageBytes *int `json:"max_message_bytes,omitempty"`
MessageDownconversionEnable *bool `json:"message_downconversion_enable,omitempty"`
MessageFormatVersion string `json:"message_format_version,omitempty"`
MessageTimestampDifferenceMaxMs *int `json:"message_timestamp_difference_max_ms,omitempty"`
MessageTimestampType string `json:"message_timestamp_type,omitempty"`
MinCleanableDirtyRatio *float32 `json:"min_cleanable_dirty_ratio,omitempty"`
MinCompactionLagMs *int `json:"min_compaction_lag_ms,omitempty"`
MinInsyncReplicas *int `json:"min_insync_replicas,omitempty"`
Preallocate *bool `json:"preallocate,omitempty"`
RetentionBytes *int `json:"retention_bytes,omitempty"`
RetentionMs *int `json:"retention_ms,omitempty"`
SegmentBytes *int `json:"segment_bytes,omitempty"`
SegmentIndexBytes *int `json:"segment_index_bytes,omitempty"`
SegmentJitterMs *int `json:"segment_jitter_ms,omitempty"`
SegmentMs *int `json:"segment_ms,omitempty"`
UncleanLeaderElectionEnable *bool `json:"unclean_leader_election_enable,omitempty"`
}

// KafkaTopicConfigResponse represents a Kafka Topic Config on Aiven.
KafkaTopicConfigResponse struct {
CleanupPolicy KafkaTopicConfigResponseString `json:"cleanup_policy,omitempty"`
CompressionType KafkaTopicConfigResponseString `json:"compression_type,omitempty"`
DeleteRetentionMs KafkaTopicConfigResponseInt `json:"delete_retention_ms,omitempty"`
FileDeleteDelayMs KafkaTopicConfigResponseInt `json:"file_delete_delay_ms,omitempty"`
FlushMessages KafkaTopicConfigResponseInt `json:"flush_messages,omitempty"`
FlushMs KafkaTopicConfigResponseInt `json:"flush_ms,omitempty"`
IndexIntervalBytes KafkaTopicConfigResponseInt `json:"index_interval_bytes,omitempty"`
MaxCompactionLagMs KafkaTopicConfigResponseInt `json:"max_compaction_lag_ms,omitempty"`
MaxMessageBytes KafkaTopicConfigResponseInt `json:"max_message_bytes,omitempty"`
MessageDownconversionEnable KafkaTopicConfigResponseBool `json:"message_downconversion_enable,omitempty"`
MessageFormatVersion KafkaTopicConfigResponseString `json:"message_format_version,omitempty"`
MessageTimestampDifferenceMaxMs KafkaTopicConfigResponseInt `json:"message_timestamp_difference_max_ms,omitempty"`
MessageTimestampType KafkaTopicConfigResponseString `json:"message_timestamp_type,omitempty"`
MinCleanableDirtyRatio KafkaTopicConfigResponseFloat `json:"min_cleanable_dirty_ratio,omitempty"`
MinCompactionLagMs KafkaTopicConfigResponseInt `json:"min_compaction_lag_ms,omitempty"`
MinInsyncReplicas KafkaTopicConfigResponseInt `json:"min_insync_replicas,omitempty"`
Preallocate KafkaTopicConfigResponseBool `json:"preallocate,omitempty"`
RetentionBytes KafkaTopicConfigResponseInt `json:"retention_bytes,omitempty"`
RetentionMs KafkaTopicConfigResponseInt `json:"retention_ms,omitempty"`
SegmentBytes KafkaTopicConfigResponseInt `json:"segment_bytes,omitempty"`
SegmentIndexBytes KafkaTopicConfigResponseInt `json:"segment_index_bytes,omitempty"`
SegmentJitterMs KafkaTopicConfigResponseInt `json:"segment_jitter_ms,omitempty"`
SegmentMs KafkaTopicConfigResponseInt `json:"segment_ms,omitempty"`
UncleanLeaderElectionEnable KafkaTopicConfigResponseBool `json:"unclean_leader_election_enable,omitempty"`
}

KafkaTopicConfigResponseString struct {
Source string `json:"source"`
Value string `json:"value"`
Synonyms []struct {
Source string `json:"source"`
Value string `json:"value"`
Name string `json:"name"`
} `json:"synonyms"`
}

KafkaTopicConfigResponseInt struct {
Source string `json:"source"`
Value int `json:"value"`
Synonyms []struct {
Source string `json:"source"`
Value int `json:"value"`
Name string `json:"name"`
} `json:"synonyms"`
}

KafkaTopicConfigResponseBool struct {
Source string `json:"source"`
Value bool `json:"value"`
Synonyms []struct {
Source string `json:"source"`
Value bool `json:"value"`
Name string `json:"name"`
} `json:"synonyms"`
}

KafkaTopicConfigResponseFloat struct {
Source string `json:"source"`
Value float32 `json:"value"`
Synonyms []struct {
Source string `json:"source"`
Value float32 `json:"value"`
Name string `json:"name"`
} `json:"synonyms"`
}

// KafkaTopic represents a Kafka Topic on Aiven.
KafkaTopic struct {
CleanupPolicy string `json:"cleanup_policy"`
MinimumInSyncReplicas int `json:"min_insync_replicas"`
Partitions []*Partition `json:"partitions"`
Replication int `json:"replication"`
RetentionBytes int `json:"retention_bytes"`
RetentionHours *int `json:"retention_hours,omitempty"`
State string `json:"state"`
TopicName string `json:"topic_name"`
CleanupPolicy string `json:"cleanup_policy"`
MinimumInSyncReplicas int `json:"min_insync_replicas"`
Partitions []*Partition `json:"partitions"`
Replication int `json:"replication"`
RetentionBytes int `json:"retention_bytes"`
RetentionHours *int `json:"retention_hours,omitempty"`
State string `json:"state"`
TopicName string `json:"topic_name"`
Config KafkaTopicConfigResponse `json:"config"`
}

// KafkaListTopic represents kafka list topic model on Aiven.
Expand Down Expand Up @@ -52,22 +149,24 @@ type (

// CreateKafkaTopicRequest are the parameters used to create a kafka topic.
CreateKafkaTopicRequest struct {
CleanupPolicy *string `json:"cleanup_policy,omitempty"`
MinimumInSyncReplicas *int `json:"min_insync_replicas,omitempty"`
Partitions *int `json:"partitions,omitempty"`
Replication *int `json:"replication,omitempty"`
RetentionBytes *int `json:"retention_bytes,omitempty"`
RetentionHours *int `json:"retention_hours,omitempty"`
TopicName string `json:"topic_name"`
CleanupPolicy *string `json:"cleanup_policy,omitempty"`
MinimumInSyncReplicas *int `json:"min_insync_replicas,omitempty"`
Partitions *int `json:"partitions,omitempty"`
Replication *int `json:"replication,omitempty"`
RetentionBytes *int `json:"retention_bytes,omitempty"`
RetentionHours *int `json:"retention_hours,omitempty"`
TopicName string `json:"topic_name"`
Config KafkaTopicConfig `json:"config"`
}

// UpdateKafkaTopicRequest are the parameters used to update a kafka topic.
UpdateKafkaTopicRequest struct {
MinimumInSyncReplicas *int `json:"min_insync_replicas,omitempty"`
Partitions *int `json:"partitions,omitempty"`
Replication *int `json:"replication,omitempty"`
RetentionBytes *int `json:"retention_bytes,omitempty"`
RetentionHours *int `json:"retention_hours,omitempty"`
MinimumInSyncReplicas *int `json:"min_insync_replicas,omitempty"`
Partitions *int `json:"partitions,omitempty"`
Replication *int `json:"replication,omitempty"`
RetentionBytes *int `json:"retention_bytes,omitempty"`
RetentionHours *int `json:"retention_hours,omitempty"`
Config KafkaTopicConfig `json:"config"`
}

// KafkaTopicResponse is the response for Kafka Topic requests.
Expand Down
172 changes: 172 additions & 0 deletions kafka_topic_acc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package aiven

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"math/rand"
"os"
"strconv"
"time"
)

var _ = Describe("Kafka Topic", func() {
var (
projectName string
project *Project
err error
)

Context("Kafka Topic CRUD", func() {
It("should not error", func() {
projectName = os.Getenv("AIVEN_PROJECT_NAME")
project, err = client.Projects.Get(projectName)

Expect(err).NotTo(HaveOccurred())
})

It("should populate fields properly", func() {
Expect(project).NotTo(BeNil())

if project != nil {
Expect(project.Name).NotTo(BeEmpty())
Expect(project.AccountId).To(BeEmpty())
}
})

// kafka service
var (
serviceName string
service *Service
errS error
)

It("creating service", func() {
serviceName = "test-acc-kafka-topic-sr-" + strconv.Itoa(rand.Int())
service, errS = client.Services.Create(projectName, CreateServiceRequest{
Cloud: "google-europe-west1",
Plan: "business-4",
ProjectVPCID: nil,
ServiceName: serviceName,
ServiceType: "kafka",
})
})

It("should not error", func() {
Expect(errS).NotTo(HaveOccurred())
})

It("should populate fields properly", func() {
Expect(service).NotTo(BeNil())

if service != nil {
Expect(service.Name).NotTo(BeEmpty())
Expect(service.Plan).NotTo(BeEmpty())
Expect(service.Type).Should(Equal("kafka"))

Eventually(func() string {
service, _ = client.Services.Get(projectName, serviceName)
return service.State
}, 25*time.Minute, 1*time.Minute).Should(Equal("RUNNING"))
}
})

// kafka topic
var (
errC error
topicName string
segmentJitterMs int
)
segmentJitterMs = 10
topicName = "test1"

It("create kafka topic", func() {
time.Sleep(10 * time.Second)
errC = client.KafkaTopics.Create(projectName, serviceName, CreateKafkaTopicRequest{
CleanupPolicy: nil,
MinimumInSyncReplicas: nil,
Partitions: nil,
Replication: nil,
RetentionBytes: nil,
RetentionHours: nil,
TopicName: topicName,
Config: KafkaTopicConfig{
CleanupPolicy: "compact",
CompressionType: "",
DeleteRetentionMs: nil,
FileDeleteDelayMs: nil,
FlushMessages: nil,
FlushMs: nil,
IndexIntervalBytes: nil,
MaxCompactionLagMs: nil,
MaxMessageBytes: nil,
MessageDownconversionEnable: nil,
MessageFormatVersion: "",
MessageTimestampDifferenceMaxMs: nil,
MessageTimestampType: "",
MinCleanableDirtyRatio: nil,
MinCompactionLagMs: nil,
MinInsyncReplicas: nil,
Preallocate: nil,
RetentionBytes: nil,
RetentionMs: nil,
SegmentBytes: nil,
SegmentIndexBytes: nil,
SegmentJitterMs: &segmentJitterMs,
SegmentMs: nil,
UncleanLeaderElectionEnable: nil,
},
})

Eventually(func() string {
topic, _ := client.KafkaTopics.Get(projectName, serviceName, topicName)

if topic != nil {
return topic.State
}

return ""
}, 25*time.Minute, 1*time.Minute).Should(Equal("ACTIVE"))
})

It("should not error kafka topic with config", func() {
Expect(errC).NotTo(HaveOccurred())
})

It("should populate fields properly", func() {
t, errT := client.KafkaTopics.Get(projectName, serviceName, topicName)
Expect(errT).NotTo(HaveOccurred())

if t != nil {
Expect(t.Config.CleanupPolicy.Value).NotTo(BeEmpty())
Expect(t.Config.SegmentJitterMs.Value).To(Equal(segmentJitterMs))
}
})

It("should update topic config", func() {
var uncleanLeaderElectionEnable = true

errU := client.KafkaTopics.Update(projectName, serviceName, topicName, UpdateKafkaTopicRequest{Config: KafkaTopicConfig{
UncleanLeaderElectionEnable: &uncleanLeaderElectionEnable,
}})
Expect(errU).NotTo(HaveOccurred())

t2, errG := client.KafkaTopics.Get(projectName, serviceName, topicName)
Expect(errG).NotTo(HaveOccurred())
Expect(t2).NotTo(BeNil())

if t2 != nil {
Expect(t2.Config.UncleanLeaderElectionEnable.Value).Should(Equal(true))
}
})

It("delete Kafka Topic and Kafka service", func() {
if errD := client.KafkaTopics.Delete(projectName, serviceName, topicName); errD != nil {
Fail("cannot delete kafka topic:" + errD.Error())
}

if errD := client.Services.Delete(projectName, serviceName); errD != nil {
Fail("cannot delete service:" + errD.Error())
}
})
})
})

0 comments on commit 21d29d6

Please sign in to comment.