title | description | services | documentationcenter | author | manager | editor | ms.assetid | ms.service | ms.custom | ms.devlang | ms.topic | ms.tgt_pltfrm | ms.workload | ms.date | ms.author |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Mirror Apache Kafka topics - Azure HDInsight | Microsoft Docs |
Learn how to use Apache Kafka's mirroring feature to maintain a replica of a Kafka on HDInsight cluster by mirroring topics to a secondary cluster. |
hdinsight |
Blackmist |
jhubbard |
cgronlun |
015d276e-f678-4f2b-9572-75553c56625b |
hdinsight |
hdinsightactive |
na |
article |
na |
big-data |
11/07/2017 |
larryfr |
Learn how to use Apache Kafka's mirroring feature to replicate topics to a secondary cluster. Mirroring can be ran as a continuous process, or used intermittently as a method of migrating data from one cluster to another.
In this example, mirroring is used to replicate topics between two HDInsight clusters. Both clusters are in an Azure Virtual Network in the same region.
Warning
Mirroring should not be considered as a means to achieve fault-tolerance. The offset to items within a topic are different between the source and destination clusters, so clients cannot use the two interchangeably.
If you are concerned about fault tolerance, you should set replication for the topics within your cluster. For more information, see Get started with Kafka on HDInsight.
Mirroring works by using the MirrorMaker tool (part of Apache Kafka) to consume records from topics on the source cluster and then create a local copy on the destination cluster. MirrorMaker uses one (or more) consumers that read from the source cluster, and a producer that writes to the local (destination) cluster.
The following diagram illustrates the Mirroring process:
Apache Kafka on HDInsight does not provide access to the Kafka service over the public internet. Kafka producers or consumers must be in the same Azure virtual network as the nodes in the Kafka cluster. For this example, both the Kafka source and destination clusters are located in an Azure virtual network. The following diagram shows how communication flows between the clusters:
The source and destination clusters can be different in the number of nodes and partitions, and offsets within the topics are different also. Mirroring maintains the key value that is used for partitioning, so record order is preserved on a per-key basis.
If you need to mirror between Kafka clusters in different networks, there are the following additional considerations:
-
Gateways: The networks must be able to communicate at the TCPIP level.
-
Name resolution: The Kafka clusters in each network must be able to connect to each other by using hostnames. This may require a Domain Name System (DNS) server in each network that is configured to forward requests to the other networks.
When creating an Azure Virtual Network, instead of using the automatic DNS provided with the network, you must specify a custom DNS server and the IP address for the server. After the Virtual Network has been created, you must then create an Azure Virtual Machine that uses that IP address, then install and configure DNS software on it.
[!WARNING] Create and configure the custom DNS server before installing HDInsight into the Virtual Network. There is no additional configuration required for HDInsight to use the DNS server configured for the Virtual Network.
For more information on connecting two Azure Virtual Networks, see Configure a VNet-to-VNet connection.
While you can create an Azure virtual network and Kafka clusters manually, it's easier to use an Azure Resource Manager template. Use the following steps to deploy an Azure virtual network and two Kafka clusters to your Azure subscription.
-
Use the following button to sign in to Azure and open the template in the Azure portal.
The Azure Resource Manager template is located at https://hditutorialdata.blob.core.windows.net/armtemplates/create-linux-based-kafka-mirror-cluster-in-vnet-v2.1.json.
[!WARNING] To guarantee availability of Kafka on HDInsight, your cluster must contain at least three worker nodes. This template creates a Kafka cluster that contains three worker nodes.
-
Use the following information to populate the entries on the Custom deployment blade:
-
Resource group: Create a group or select an existing one. This group contains the HDInsight cluster.
-
Location: Select a location geographically close to you.
-
Base Cluster Name: This value is used as the base name for the Kafka clusters. For example, entering hdi creates clusters named source-hdi and dest-hdi.
-
Cluster Login User Name: The admin user name for the source and destination Kafka clusters.
-
Cluster Login Password: The admin user password for the source and destination Kafka clusters.
-
SSH User Name: The SSH user to create for the source and destination Kafka clusters.
-
SSH Password: The password for the SSH user for the source and destination Kafka clusters.
-
-
Read the Terms and Conditions, and then select I agree to the terms and conditions stated above.
-
Finally, check Pin to dashboard and then select Purchase. It takes about 20 minutes to create the clusters.
Important
The name of the HDInsight clusters are source-BASENAME and dest-BASENAME, where BASENAME is the name you provided to the template. You use these names in later steps when connecting to the clusters.
-
Connect to the source cluster using SSH:
Replace sshuser with the SSH user name used when creating the cluster. Replace BASENAME with the base name used when creating the cluster.
For information, see Use SSH with HDInsight.
-
Use the following commands to find the Zookeeper hosts for the source cluster:
# Install jq if it is not installed sudo apt -y install jq # get the zookeeper hosts for the source cluster export SOURCE_ZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
Replace
$CLUSTERNAME
with the name of the source cluster. When prompted, enter the password for the cluster login (admin) account. -
To create a topic named
testtopic
, use the following command:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 8 --topic testtopic --zookeeper $SOURCE_ZKHOSTS
-
Use the following command to verify that the topic was created:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $SOURCE_ZKHOSTS
The response contains
testtopic
. -
Use the following to view the Zookeeper host information for this (the source) cluster:
echo $SOURCE_ZKHOSTS
This returns information similar to the following text:
zk0-source.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:2181,zk1-source.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:2181
Save this information. It is used in the next section.
-
Connect to the destination cluster using a different SSH session:
Replace sshuser with the SSH user name used when creating the cluster. Replace BASENAME with the base name used when creating the cluster.
For information, see Use SSH with HDInsight.
-
A
consumer.properties
file is used to configure communication with the source cluster. To create the file, use the following command:nano consumer.properties
Use the following text as the contents of the
consumer.properties
file:zookeeper.connect=SOURCE_ZKHOSTS group.id=mirrorgroup
Replace SOURCE_ZKHOSTS with the Zookeeper hosts information from the source cluster.
This file describes the consumer information to use when reading from the source Kafka cluster. For more information consumer configuration, see Consumer Configs at kafka.apache.org.
To save the file, use Ctrl + X, Y, and then Enter.
-
Before configuring the producer that communicates with the destination cluster, you must find the broker hosts for the destination cluster. Use the following commands to retrieve this information:
sudo apt -y install jq DEST_BROKERHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $DEST_BROKERHOSTS
Replace
$CLUSTERNAME
with the name of the destination cluster. When prompted, enter the password for the cluster login (admin) account.The
echo
command returns information similar to the following text:wn0-dest.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092,wn1-dest.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092
-
A
producer.properties
file is used to communicate the destination cluster. To create the file, use the following command:nano producer.properties
Use the following text as the contents of the
producer.properties
file:bootstrap.servers=DEST_BROKERS compression.type=none
Replace DEST_BROKERS with the broker information from the previous step.
For more information producer configuration, see Producer Configs at kafka.apache.org.
-
From the SSH connection to the destination cluster, use the following command to start the MirrorMaker process:
/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer.properties --producer.config producer.properties --whitelist testtopic --num.streams 4
The parameters used in this example are:
-
--consumer.config: Specifies the file that contains consumer properties. These properties are used to create a consumer that reads from the source Kafka cluster.
-
--producer.config: Specifies the file that contains producer properties. These properties are used to create a producer that writes to the destination Kafka cluster.
-
--whitelist: A list of topics that MirrorMaker replicates from the source cluster to the destination.
-
--num.streams: The number of consumer threads to create.
-
On startup, MirrorMaker returns information similar to the following text:
```json
{metadata.broker.list=wn1-source.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092,wn0-source.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092, request.timeout.ms=30000, client.id=mirror-group-3, security.protocol=PLAINTEXT}{metadata.broker.list=wn1-source.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092,wn0-source.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092, request.timeout.ms=30000, client.id=mirror-group-0, security.protocol=PLAINTEXT}
metadata.broker.list=wn1-source.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092,wn0-kafka.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092, request.timeout.ms=30000, client.id=mirror-group-2, security.protocol=PLAINTEXT}
metadata.broker.list=wn1-source.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092,wn0-source.aazwc2onlofevkbof0cuixrp5h.gx.internal.cloudapp.net:9092, request.timeout.ms=30000, client.id=mirror-group-1, security.protocol=PLAINTEXT}
```
-
From the SSH connection to the source cluster, use the following command to start a producer and send messages to the topic:
SOURCE_BROKERHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $SOURCE_BROKERHOSTS --topic testtopic
Replace
$CLUSTERNAME
with the name of the source cluster. When prompted, enter the password for the cluster login (admin) account.When you arrive at a blank line with a cursor, type in a few text messages. The messages are sent to the topic on the source cluster. When done, use Ctrl + C to end the producer process.
-
From the SSH connection to the destination cluster, use Ctrl + C to end the MirrorMaker process. To verify that the topic and messages were replicated to the destination, use the following commands:
DEST_ZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2` /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $DEST_ZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper $DEST_ZKHOSTS --topic testtopic --from-beginning
Replace
$CLUSTERNAME
with the name of the destination cluster. When prompted, enter the password for the cluster login (admin) account.The list of topics now includes
testtopic
, which is created when MirrorMaster mirrors the topic from the source cluster to the destination. The messages retrieved from the topic are the same as entered on the source cluster.
[!INCLUDE delete-cluster-warning]
Since the steps in this document create both clusters in the same Azure resource group, you can delete the resource group in the Azure portal. Deleting the resource group removes all resources created by following this document, the Azure Virtual Network, and storage account used by the clusters.
In this document, you learned how to use MirrorMaker to create a replica of a Kafka cluster. Use the following links to discover other ways to work with Kafka: