Skip to content

Commit

Permalink
Adding getting-started-with-ccloud to demo-scene
Browse files Browse the repository at this point in the history
  • Loading branch information
riferrei committed Jun 29, 2020
1 parent 0174cfc commit 3948281
Show file tree
Hide file tree
Showing 20 changed files with 2,854 additions and 0 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@ You need to allocate Docker 8GB when running these. Avoid allocating all your ma

- Confluent Cloud

- [Getting Started with Confluent Cloud using Java](getting-started-with-ccloud-java)
- [Getting Started with Confluent Cloud using Go](getting-started-with-ccloud-golang)
- [Streaming Pac-Man](streaming-pacman)
- ["The Cube" Demo](ccloud-cube-demo)
- [Using Replicator with Confluent Cloud](ccloud-replicator)
- [Getting Started with Confluent Cloud using Java](getting-started-with-ccloud-java)

- Misc
- [Hacky export/import between Kafka clusters](export-import-with-kafkacat) using `kafkacat`
Expand Down
18 changes: 18 additions & 0 deletions getting-started-with-ccloud-golang/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
.vscode
ClientApp
go.sum

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/
223 changes: 223 additions & 0 deletions getting-started-with-ccloud-golang/ClientApp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package main

import (
"encoding/binary"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strings"
"time"

"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/riferrei/srclient"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

const producerMode string = "producer"
const consumerMode string = "consumer"
const schemaFile string = "SensorReading.proto"

func main() {

clientMode := os.Args[1]
props := LoadProperties()
topic := TopicName

if strings.Compare(clientMode, producerMode) == 0 {
producer(props, topic)
} else if strings.Compare(clientMode, consumerMode) == 0 {
consumer(props, topic)
} else {
fmt.Println("Invalid option. Valid options are 'producer' and 'consumer'.")
}

}

/**************************************************/
/******************** Producer ********************/
/**************************************************/

func producer(props map[string]string, topic string) {

CreateTopic(props)

schemaRegistryClient := srclient.CreateSchemaRegistryClient(props["schema.registry.url"])
schemaRegistryClient.CodecCreationEnabled(false)
srBasicAuthUserInfo := props["schema.registry.basic.auth.user.info"]
credentials := strings.Split(srBasicAuthUserInfo, ":")
schemaRegistryClient.SetCredentials(credentials[0], credentials[1])

producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": props["bootstrap.servers"],
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": props["sasl.username"],
"sasl.password": props["sasl.password"]})
if err != nil {
panic(fmt.Sprintf("Failed to create producer %s", err))
}
defer producer.Close()

go func() {
for event := range producer.Events() {
switch ev := event.(type) {
case *kafka.Message:
message := ev
if ev.TopicPartition.Error != nil {
fmt.Printf("Error delivering the order '%s'\n", message.Key)
} else {
fmt.Printf("Reading sent to the partition %d with offset %d. \n",
message.TopicPartition.Partition, message.TopicPartition.Offset)
}
}
}
}()

schema, err := schemaRegistryClient.GetLatestSchema(topic, false)
if schema == nil {
schemaBytes, _ := ioutil.ReadFile(schemaFile)
schema, err = schemaRegistryClient.CreateSchema(topic, string(schemaBytes), "PROTOBUF", false)
if err != nil {
panic(fmt.Sprintf("Error creating the schema %s", err))
}
}

devices := []*SensorReading_Device{}
d1 := new(SensorReading_Device)
deviceID, _ := uuid.NewUUID()
d1.DeviceID = deviceID.String()
d1.Enabled = true
devices = append(devices, d1)

d2 := new(SensorReading_Device)
deviceID, _ = uuid.NewUUID()
d2.DeviceID = deviceID.String()
d2.Enabled = true
devices = append(devices, d2)

d3 := new(SensorReading_Device)
deviceID, _ = uuid.NewUUID()
d3.DeviceID = deviceID.String()
d3.Enabled = true
devices = append(devices, d3)

d4 := new(SensorReading_Device)
deviceID, _ = uuid.NewUUID()
d4.DeviceID = deviceID.String()
d4.Enabled = true
devices = append(devices, d4)

d5 := new(SensorReading_Device)
deviceID, _ = uuid.NewUUID()
d5.DeviceID = deviceID.String()
d5.Enabled = true
devices = append(devices, d5)

for {

choosen := rand.Intn(len(devices))
if choosen == 0 {
choosen = 1
}
deviceSelected := devices[choosen-1]

// Create key and value
key := deviceSelected.DeviceID
sensorReading := SensorReading{
Device: deviceSelected,
DateTime: time.Now().UnixNano(),
Reading: rand.Float64(),
}
valueBytes, _ := proto.Marshal(&sensorReading)

recordValue := []byte{}
recordValue = append(recordValue, byte(0))

// Technically this is not necessary because in
// Go consumers don't need to know the schema to
// be able to deserialize records. However, if this
// client wants to produce records that could be
// deserialized using Java (KafkaProtobufDeserializer)
// then it is important to arrange the bytes according
// to the format expected there.
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))
recordValue = append(recordValue, schemaIDBytes...)

// [Pending] insert the message index list here
// before the actual value since it is required
// for the Java deserializer. Meanwhile this code
// will produce records that can only be read by
// Go consumers.
recordValue = append(recordValue, valueBytes...)

// Produce the record to the topic
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(key), Value: recordValue}, nil)

// Sleep for one second...
time.Sleep(1000 * time.Millisecond)

}

}

/**************************************************/
/******************** Consumer ********************/
/**************************************************/

func consumer(props map[string]string, topic string) {

CreateTopic(props)

// Code below has been commented out because currently
// Go doesn't need to read the schema in order to be
// able to deserialize the record. But keeping the code
// here for future use ¯\_(ツ)_/¯

// schemaRegistryClient := srclient.CreateSchemaRegistryClient(props["schema.registry.url"])
// schemaRegistryClient.CodecCreationEnabled(false)
// srBasicAuthUserInfo := props["schema.registry.basic.auth.user.info"]
// credentials := strings.Split(srBasicAuthUserInfo, ":")
// schemaRegistryClient.SetCredentials(credentials[0], credentials[1])

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": props["bootstrap.servers"],
"sasl.mechanisms": props["sasl.mechanisms"],
"security.protocol": props["security.protocol"],
"sasl.username": props["sasl.username"],
"sasl.password": props["sasl.password"],
"session.timeout.ms": 6000,
"group.id": "golang-consumer",
"auto.offset.reset": "latest"})
if err != nil {
panic(fmt.Sprintf("Failed to create consumer %s", err))
}
defer consumer.Close()

consumer.SubscribeTopics([]string{topic}, nil)

for {
record, err := consumer.ReadMessage(-1)
if err == nil {
// Deserialize the record value using Protobuf encoded bytes
sensorReading := &SensorReading{}
err = proto.Unmarshal(record.Value[5:], sensorReading)
if err != nil {
panic(fmt.Sprintf("Error deserializing the record: %s", err))
}
// Print the record value
fmt.Printf("SensorReading[device=%s, dateTime=%d, reading=%f]\n",
sensorReading.Device.GetDeviceID(),
sensorReading.GetDateTime(),
sensorReading.GetReading())
} else {
fmt.Println(err)
}
}

}
96 changes: 96 additions & 0 deletions getting-started-with-ccloud-golang/KafkaUtils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"bufio"
"context"
"fmt"
"os"
"strings"
"time"

"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

// TopicName holds the name of the topic
const TopicName string = "SensorReading-Golang"

// PropsFile holds the filename with config
const PropsFile string = "ccloud.properties"

// CreateTopic is a utility function that
// creates the topic if it doesn't exist.
func CreateTopic(props map[string]string) {

adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": props["bootstrap.servers"],
"broker.version.fallback": "0.10.0.0",
"api.version.fallback.ms": 0,
"sasl.mechanisms": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": props["sasl.username"],
"sasl.password": props["sasl.password"]})

if err != nil {
fmt.Printf("Failed to create Admin client: %s\n", err)
os.Exit(1)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

maxDuration, err := time.ParseDuration("60s")
if err != nil {
panic("time.ParseDuration(60s)")
}

results, err := adminClient.CreateTopics(ctx,
[]kafka.TopicSpecification{{
Topic: TopicName,
NumPartitions: 4,
ReplicationFactor: 3}},
kafka.SetAdminOperationTimeout(maxDuration))

if err != nil {
fmt.Printf("Problem during the topic creation: %v\n", err)
os.Exit(1)
}

// Check for specific topic errors
for _, result := range results {
if result.Error.Code() != kafka.ErrNoError &&
result.Error.Code() != kafka.ErrTopicAlreadyExists {
fmt.Printf("Topic creation failed for %s: %v",
result.Topic, result.Error.String())
os.Exit(1)
}
}

adminClient.Close()

}

// LoadProperties read the properties file
// containing the Confluent Cloud config
// so the apps can connect to the service.
func LoadProperties() map[string]string {
props := make(map[string]string)
file, err := os.Open(PropsFile)
if err != nil {
panic(fmt.Sprintf("Failed to load the '%s' file", PropsFile))
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if len(line) > 0 {
if !strings.HasPrefix(line, "//") &&
!strings.HasPrefix(line, "#") {
parts := strings.Split(line, "=")
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
props[key] = value
}
}
}
return props
}
Loading

0 comments on commit 3948281

Please sign in to comment.