Skip to content

Latest commit

 

History

History
121 lines (80 loc) · 3.39 KB

README.md

File metadata and controls

121 lines (80 loc) · 3.39 KB

Go Kafka Connect Couchbase

This repository contains the Go implementation of the Couchbase Kafka Connector.

Contents


What is Couchbase Kafka Connector?

Official Couchbase documentation defines the Couchbase Kafka Connector as follows:

The Couchbase Kafka connector is a plug-in for the Kafka Connect framework. It provides source and sink components.

The source connector streams documents from Couchbase Database Change Protocol (DCP) and publishes each document to a Kafka topic in near real-time.

The sink connector subscribes to Kafka topics and writes the messages to Couchbase.

Go Kafka Connect Couchbase is a source connector. So it sends Couchbase mutations to Kafka as events.


Why?

  • Build a Couchbase Kafka Connector by using Go to reduce resource usage.
  • Suggesting more configurations so users can make changes to less code.
  • By presenting the connector as a library, ensuring that users do not clone the code they don't need.

Example

package main

func mapper(event couchbase.Event) *message.KafkaMessage {
	// return nil if you wish to filter the event
	return message.GetKafkaMessage(event.Key, event.Value, map[string]string{})
}

func main() {
	connector := gokafkaconnectcouchbase.NewConnector("./example/config.yml", mapper)

	defer connector.Close()

	connector.Start()
}

Custom log structures can be used with the connector

package main

type ConnectorLogger struct{}

func (d *ConnectorLogger) Printf(msg string, args ...interface{}) {
	zapLogger.Info(fmt.Sprintf(msg, args...))
}

func main() {
	l := &ConnectorLogger{}
	connector := gokafkaconnectcouchbase.NewConnectorWithLoggers("./example/config.yml", mapper, l, l)

	defer connector.Close()

	connector.Start()
}


Features

  • Batch Producer
  • Secure Kafka

Usage

$ go get github.com/Trendyol/go-kafka-connect-couchbase


Dcp Configuration

Check out on go-dcp-client

Kafka Specific Configuration

Variable Type Is Required
kafka.collectionTopicMapping map[string][string] yes
kafka.brokers array yes
kafka.readTimeout integer no
kafka.compression integer no
kafka.writeTimeout integer no
kafka.producerBatchSize integer yes
kafka.producerBatchTickerDuration integer yes
kafka.requiredAcks integer no
kafka.secureConnection boolean (true/false) no
kafka.rootCAPath string no
kafka.interCAPath string no
kafka.scramUsername string no
kafka.scramPassword string no

Examples