Skip to content

Latest commit

 

History

History
 
 

example

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

How to Create a New Connector

If you want to create a new connector that streams mutations from Couchbase to Kafka, you can use the go-dcp-kafka library. This library provides a simple and flexible way to implement connectors that can be customized to your needs.

Step 1: Installing the Library

To use the go-dcp-kafka library, you first need to install it. You can do this using the go get command:

$ go get github.com/Trendyol/go-dcp-kafka

Step 2: Implementing the Mapper

The mapper is a function that takes a couchbase.Event as input and returns a slice of message.KafkaMessage. The mapper is responsible for converting the Couchbase mutations to Kafka events that will be sent to Kafka.

Here's an example mapper implementation:

package main

import (
	"github.com/Trendyol/go-dcp-kafka/couchbase"
	"github.com/Trendyol/go-dcp-kafka/kafka/message"
)

func mapper(event couchbase.Event) []message.KafkaMessage {
	// here you can filter and transform events
	return []message.KafkaMessage{
		{
			Headers: nil,
			Key:     event.Key,
			Value:   event.Value,
		},
	}
}

Step 3: Configuring the Connector

The configuration for the connector is provided via a YAML file. Here's an example configuration:

You can find explanation of configurations

You can pass this configuration file to the connector by providing the path to the file when creating the connector:

connector, err := dcpkafka.NewConnector("path-to-config.yml", mapper)
if err != nil {
panic(err)
}

Step 4: Starting and Closing the Connector

Once you have implemented the mapper and configured the connector, you can start/stop the connector:

defer connector.Close()
connector.Start()

This will start the connector, which will continuously listen for Couchbase mutations and stream events to Kafka according to the configured mapper.

Monitoring

The connector features an API that allows for management and observation, and it also exposes multiple metrics to facilitate these tasks.

You can find explanation here