title | description | services | keyword | author | ms.reviewer | ms.service | ms.custom | ms.topic | ms.date | ms.author |
---|---|---|---|---|---|---|---|---|---|---|
Develop Python streaming MapReduce jobs with HDInsight - Azure |
Learn how to use Python in streaming MapReduce jobs. Apache Hadoop provides a streaming API for MapReduce for writing in languages other than Java. |
hdinsight |
mapreduce python,python map reduce,python mapreduce |
hrasheed-msft |
jasonh |
hdinsight |
hdinsightactive,hdiseo17may2017 |
conceptual |
04/10/2018 |
hrasheed |
Learn how to use Python in streaming MapReduce operations. Apache Hadoop provides a streaming API for MapReduce that enables you to write map and reduce functions in languages other than Java. The steps in this document implement the Map and Reduce components in Python.
-
A Linux-based Apache Hadoop on HDInsight cluster
[!IMPORTANT] The steps in this document require an HDInsight cluster that uses Linux. Linux is the only operating system used on HDInsight version 3.4 or greater. For more information, see HDInsight retirement on Windows.
-
A text editor
[!IMPORTANT] The text editor must use LF as the line ending. Using a line ending of CRLF causes errors when running the MapReduce job on Linux-based HDInsight clusters.
-
The
ssh
andscp
commands, or Azure PowerShell
This example is a basic word count implemented in a python a mapper and reducer. The mapper breaks sentences into individual words, and the reducer aggregates the words and counts to produce the output.
The following flowchart illustrates what happens during the map and reduce phases.
Hadoop allows you to specify a file that contains the map and reduce logic that is used by a job. The specific requirements for the map and reduce logic are:
- Input: The map and reduce components must read input data from STDIN.
- Output: The map and reduce components must write output data to STDOUT.
- Data format: The data consumed and produced must be a key/value pair, separated by a tab character.
Python can easily handle these requirements by using the sys
module to read from STDIN and using print
to print to STDOUT. The remaining task is simply formatting the data with a tab (\t
) character between the key and value.
-
Create a file named
mapper.py
and use the following code as the content:#!/usr/bin/env python # Use the sys module import sys # 'file' in this case is STDIN def read_input(file): # Split each line into words for line in file: yield line.split() def main(separator='\t'): # Read the data using read_input data = read_input(sys.stdin) # Process each word returned from read_input for words in data: # Process each word for word in words: # Write to STDOUT print '%s%s%d' % (word, separator, 1) if __name__ == "__main__": main()
-
Create a file named reducer.py and use the following code as the content:
#!/usr/bin/env python # import modules from itertools import groupby from operator import itemgetter import sys # 'file' in this case is STDIN def read_mapper_output(file, separator='\t'): # Go through each line for line in file: # Strip out the separator character yield line.rstrip().split(separator, 1) def main(separator='\t'): # Read the data using read_mapper_output data = read_mapper_output(sys.stdin, separator=separator) # Group words and counts into 'group' # Since MapReduce is a distributed process, each word # may have multiple counts. 'group' will have all counts # which can be retrieved using the word as the key. for current_word, group in groupby(data, itemgetter(0)): try: # For each word, pull the count(s) for the word # from 'group' and create a total count total_count = sum(int(count) for current_word, count in group) # Write to stdout print "%s%s%d" % (current_word, separator, total_count) except ValueError: # Count was not a number, so do nothing pass if __name__ == "__main__": main()
To ensure that your files have the right line endings, use the following PowerShell script:
[!code-powershellmain]
Use the following PowerShell script to upload the files, run the job, and view the output:
[!code-powershellmain]
-
From your development environment, in the same directory as
mapper.py
andreducer.py
files, use the following command:scp mapper.py reducer.py [email protected]:
Replace
username
with the SSH user name for your cluster, andclustername
with the name of your cluster.This command copies the files from the local system to the head node.
[!NOTE] If you used a password to secure your SSH account, you are prompted for the password. If you used an SSH key, you may have to use the
-i
parameter and the path to the private key. For example,scp -i /path/to/private/key mapper.py reducer.py [email protected]:
. -
Connect to the cluster by using SSH:
ssh [email protected]`
For more information on, see Use SSH with HDInsight.
-
To ensure the mapper.py and reducer.py have the correct line endings, use the following commands:
perl -pi -e 's/\r\n/\n/g' mapper.py perl -pi -e 's/\r\n/\n/g' reducer.py
-
Use the following command to start the MapReduce job.
yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input /example/data/gutenberg/davinci.txt -output /example/wordcountout
This command has the following parts:
-
hadoop-streaming.jar: Used when performing streaming MapReduce operations. It interfaces Hadoop with the external MapReduce code you provide.
-
-files: Adds the specified files to the MapReduce job.
-
-mapper: Tells Hadoop which file to use as the mapper.
-
-reducer: Tells Hadoop which file to use as the reducer.
-
-input: The input file that we should count words from.
-
-output: The directory that the output is written to.
As the MapReduce job works, the process is displayed as percentages.
15/02/05 19:01:04 INFO mapreduce.Job: map 0% reduce 0% 15/02/05 19:01:16 INFO mapreduce.Job: map 100% reduce 0% 15/02/05 19:01:27 INFO mapreduce.Job: map 100% reduce 100%
-
-
To view the output, use the following command:
hdfs dfs -text /example/wordcountout/part-00000
This command displays a list of words and how many times the word occurred.
Now that you have learned how to use streaming MapRedcue jobs with HDInsight, use the following links to explore other ways to work with Azure HDInsight.