You can use the HDInsightStreamingActivity Activity invoke a Hadoop Streaming job from an Azure Data Factory pipeline. The following JSON snippet shows the syntax for using the HDInsightStreamingActivity in a pipeline JSON file.
The HDInsight Streaming Activity in a Data Factory pipeline executes Hadoop Streaming programs on your own or on-demand Windows/Linux-based HDInsight cluster. This article builds on the data transformation activities article which presents a general overview of data transformation and the supported transformation activities.
The HDInsight cluster is automatically populated with example programs (wc.exe and cat.exe) and data (davinci.txt). By default, name of the container that is used by the HDInsight cluster is the name of the cluster itself. For example, if your cluster name is myhdicluster, name of the blob container associated would be myhdicluster.
{
"name": "HadoopStreamingPipeline",
"properties": {
"description": "Hadoop Streaming Demo",
"activities": [
{
"type": "HDInsightStreaming",
"typeProperties": {
"mapper": "cat.exe",
"reducer": "wc.exe",
"input": "wasb://<nameofthecluster>@spestore.blob.core.windows.net/example/data/gutenberg/davinci.txt",
"output": "wasb://<nameofthecluster>@spestore.blob.core.windows.net/example/data/StreamingOutput/wc.txt",
"filePaths": [
"<nameofthecluster>/example/apps/wc.exe",
"<nameofthecluster>/example/apps/cat.exe"
],
"fileLinkedService": "StorageLinkedService",
"getDebugInfo": "Failure"
},
"outputs": [
{
"name": "StreamingOutputDataset"
}
],
"policy": {
"timeout": "01:00:00",
"concurrency": 1,
"executionPriorityOrder": "NewestFirst",
"retry": 1
},
"scheduler": {
"frequency": "Day",
"interval": 1
},
"name": "RunHadoopStreamingJob",
"description": "Run a Hadoop streaming job",
"linkedServiceName": "HDInsightLinkedService"
}
],
"start": "2014-01-04T00:00:00Z",
"end": "2014-01-05T00:00:00Z"
}
}
Note the following:
- Set the linkedServiceName to the name of the linked service that points to your HDInsight cluster on which the streaming mapreduce job will be run.
- Set the type of the activity to HDInsightStreaming.
- For the mapper property, specify the name of mapper executable. In the above example, cat.exe is the mapper executable.
- For the reducer property , specify the name of reducer executable. In the above example, wc.exe is the reducer executable.
- For the input type property, specify the input file (including the location) for the mapper. In the example: "wasb://[email protected]/example/data/gutenberg/davinci.txt": adfsample is the blob container, example/data/Gutenberg is the folder and davinci.txt is the blob.
- For the output type property, specify the output file (including the location) for the reducer. The output of the Hadoop Streaming job will be written to the location specified for this property.
- In the filePaths section, specify the paths for the mapper and reducer executables. In the example: "adfsample/example/apps/wc.exe", adfsample is the blob container, example/apps is the folder, and wc.exe is the executable.
- For the fileLinkedService property, specify the Azure Storage linked service that represents the Azure storage that contains the files specified in the filePaths section.
- For the arguments property, specify the arguments for the streaming job.
- The getDebugInfo property is an optional element. When it is set to Failure, the logs are downloaded only on failure. When it is set to All, logs are always downloaded irrespective of the execution status.
[AZURE.NOTE] As shown in the example, you will need to specify an output dataset for the Hadoop Streaming Activity for the outputs property. This is just a dummy dataset that is required to drive the pipeline schedule. You do not need to specify any input dataset for the activity for the inputs property.
The pipeline in this walkthrough runs the Word Count streaming Map/Reduce program on your Azure HDInsight cluster.
First, you create a linked service to link the Azure Storage that is used by the Azure HDInsight cluster to the Azure data factory. If you copy/paste the following code, do not forget to replace account name and account key with the name and key of your Azure Storage.
{
"name": "StorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>"
}
}
}
Next, you create a linked service to link your Azure HDInsight cluster to the Azure data factory. If you copy/paste the following code, replace HDInsight cluster name with the name of your HDInsight cluster, and change user name and password values.
{
"name": "HDInsightLinkedService",
"properties": {
"type": "HDInsight",
"typeProperties": {
"clusterUri": "https://<HDInsight cluster name>.azurehdinsight.net",
"userName": "admin",
"password": "**********",
"linkedServiceName": "StorageLinkedService"
}
}
}
The pipeline in this example does not take any inputs. You will need to specify an output dataset for the HDInsight Streaming Activity. This is just a dummy dataset that is required to drive the pipeline schedule.
{
"name": "StreamingOutputDataset",
"properties": {
"published": false,
"type": "AzureBlob",
"linkedServiceName": "StorageLinkedService",
"typeProperties": {
"folderPath": "adftutorial/streamingdata/",
"format": {
"type": "TextFormat",
"columnDelimiter": ","
},
},
"availability": {
"frequency": "Day",
"interval": 1
}
}
}
The pipeline in this example has only one activity that is of type: HDInsightStreaming.
The HDInsight cluster is automatically populated with example programs (wc.exe and cat.exe) and data (davinci.txt). By default, name of the container that is used by the HDInsight cluster is the name of the cluster itself. For example, if your cluster name is myhdicluster, name of the blob container associated would be myhdicluster.
{
"name": "HadoopStreamingPipeline",
"properties": {
"description": "Hadoop Streaming Demo",
"activities": [
{
"type": "HDInsightStreaming",
"typeProperties": {
"mapper": "cat.exe",
"reducer": "wc.exe",
"input": "wasb://<blobcontainer>@spestore.blob.core.windows.net/example/data/gutenberg/davinci.txt",
"output": "wasb://<blobcontainer>@spestore.blob.core.windows.net/example/data/StreamingOutput/wc.txt",
"filePaths": [
"<blobcontainer>/example/apps/wc.exe",
"<blobcontainer>/example/apps/cat.exe"
],
"fileLinkedService": "StorageLinkedService"
},
"outputs": [
{
"name": "StreamingOutputDataset"
}
],
"policy": {
"timeout": "01:00:00",
"concurrency": 1,
"executionPriorityOrder": "NewestFirst",
"retry": 1
},
"scheduler": {
"frequency": "Day",
"interval": 1
},
"name": "RunHadoopStreamingJob",
"description": "Run a Hadoop streaming job",
"linkedServiceName": "HDInsightLinkedService"
}
],
"start": "2014-01-04T00:00:00Z",
"end": "2014-01-05T00:00:00Z"
}
}