Skip to content

Commit

Permalink
Introduced interoperability between Java and Go
Browse files Browse the repository at this point in the history
  • Loading branch information
riferrei committed Jul 7, 2020
1 parent 2cc1673 commit a97f521
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 23 deletions.
21 changes: 7 additions & 14 deletions getting-started-with-ccloud-golang/ClientApp.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,43 +123,38 @@ func producer(props map[string]string, topic string) {
}
deviceSelected := devices[choosen-1]

// Create key and value
key := deviceSelected.DeviceID
sensorReading := SensorReading{
Device: deviceSelected,
DateTime: time.Now().UnixNano(),
Reading: rand.Float64(),
}
valueBytes, _ := proto.Marshal(&sensorReading)

recordValue := []byte{}
recordValue = append(recordValue, byte(0))

// Technically this is not necessary because in
// Go consumers don't need to know the schema to
// be able to deserialize records. However, if this
// client wants to produce records that could be
// deserialized using Java (KafkaProtobufDeserializer)
// then it is important to arrange the bytes according
// to the format expected there.
// to the following format:
// [Magic Byte] + [Schema ID] + [Message Index] + [Value]
recordValue = append(recordValue, byte(0))
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, byte(0))

// [Pending] insert the message index list here
// before the actual value since it is required
// for the Java deserializer. Meanwhile this code
// will produce records that can only be read by
// Go consumers.
// Now write the actual value into the record...
valueBytes, _ := proto.Marshal(&sensorReading)
recordValue = append(recordValue, valueBytes...)

// Produce the record to the topic
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(key), Value: recordValue}, nil)

// Sleep for one second...
time.Sleep(1000 * time.Millisecond)

}
Expand Down Expand Up @@ -204,13 +199,11 @@ func consumer(props map[string]string, topic string) {
for {
record, err := consumer.ReadMessage(-1)
if err == nil {
// Deserialize the record value using Protobuf encoded bytes
sensorReading := &SensorReading{}
err = proto.Unmarshal(record.Value[5:], sensorReading)
err = proto.Unmarshal(record.Value[6:], sensorReading)
if err != nil {
panic(fmt.Sprintf("Error deserializing the record: %s", err))
}
// Print the record value
fmt.Printf("SensorReading[device=%s, dateTime=%d, reading=%f]\n",
sensorReading.Device.GetDeviceID(),
sensorReading.GetDateTime(),
Expand Down
3 changes: 1 addition & 2 deletions getting-started-with-ccloud-golang/KafkaUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// TopicName holds the name of the topic
const TopicName string = "SensorReading-Golang"
const TopicName string = "SensorReading"

// PropsFile holds the filename with config
const PropsFile string = "ccloud.properties"
Expand Down Expand Up @@ -55,7 +55,6 @@ func CreateTopic(props map[string]string) {
os.Exit(1)
}

// Check for specific topic errors
for _, result := range results {
if result.Error.Code() != kafka.ErrNoError &&
result.Error.Code() != kafka.ErrTopicAlreadyExists {
Expand Down
16 changes: 10 additions & 6 deletions getting-started-with-ccloud-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,17 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

public class KafkaUtils {

public static final String TOPIC = "SensorReading-Java";
public static final String TOPIC = "SensorReading";

public static void createTopic(String topic, int numPartitions,
short replicationFactor) {
Expand Down

0 comments on commit a97f521

Please sign in to comment.