Skip to content

Commit

Permalink
Replace Python MapReduce example with Spark example. (spotify#2668)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lars Albertsson authored and Tarrasch committed Mar 11, 2019
1 parent 3701c68 commit a3ea435
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 60 deletions.
89 changes: 65 additions & 24 deletions doc/example_top_artists.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ Example – Top Artists
---------------------

This is a very simplified case of something we do at Spotify a lot.
All user actions are logged to HDFS where
we run a bunch of Hadoop jobs to transform the data.
All user actions are logged to Google Cloud Storage (previously HDFS) where
we run a bunch of processing jobs to transform the data. The processing code itself is implemented
in a scalable data processing framework, such as Scio, Scalding, or Spark, but the jobs
are orchestrated with Luigi.
At some point we might end up with
a smaller data set that we can bulk ingest into Cassandra, Postgres, or
some other format.
other storage suitable for serving or exploration.

For the purpose of this exercise, we want to aggregate all streams,
find the top 10 artists and then put the results into Postgres.
Expand Down Expand Up @@ -83,7 +85,7 @@ Note that *top_artists* needs to be in your PYTHONPATH, or else this can produc
$ PYTHONPATH='.' luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06
You can also try to view the manual using `--help` which will give you an
You can also try to view the manual using ``--help`` which will give you an
overview of the options.

Running the command again will do nothing because the output file is
Expand All @@ -95,39 +97,78 @@ the input files is modified.
You need to delete the output file
manually.

The `--local-scheduler` flag tells Luigi not to connect to a scheduler
The ``--local-scheduler`` flag tells Luigi not to connect to a scheduler
server. This is not recommended for other purpose than just testing
things.

Step 1b - Running this in Hadoop
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Step 1b - Aggregate artists with Spark
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Luigi comes with native Python Hadoop mapreduce support built in, and
here is how this could look like, instead of the class above.
While Luigi can process data inline, it is normally used to orchestrate external programs that
perform the actual processing. In this example, we will demonstrate how top artists instead can be
read from HDFS and calculated with Spark, orchestrated by Luigi.

.. code:: python
class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask):
class AggregateArtistsSpark(luigi.contrib.spark.SparkSubmitTask):
date_interval = luigi.DateIntervalParameter()
app = 'top_artists_spark.py'
master = 'local[*]'
def output(self):
return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval)
def requires(self):
return [StreamsHdfs(date) for date in self.date_interval]
def mapper(self, line):
timestamp, artist, track = line.strip().split()
yield artist, 1
def app_options(self):
# :func:`~luigi.task.Task.input` returns the targets produced by the tasks in
# `~luigi.task.Task.requires`.
return [','.join([p.path for p in self.input()]),
self.output().path]
:class:`luigi.contrib.hadoop.SparkSubmitTask` doesn't require you to implement a
:func:`~luigi.task.Task.run` method. Instead, you specify the command line parameters to send
to ``spark-submit``, as well as any other configuration specific to Spark.

Python code for the Spark job is found below.

.. code:: python
import operator
import sys
from pyspark.sql import SparkSession
def main(argv):
input_paths = argv[1].split(',')
output_path = argv[2]
spark = SparkSession.builder.getOrCreate()
streams = spark.read.option('sep', '\t').csv(input_paths[0])
for stream_path in input_paths[1:]:
streams.union(spark.read.option('sep', '\t').csv(stream_path))
# The second field is the artist
counts = streams \
.map(lambda row: (row[1], 1)) \
.reduceByKey(add)
counts.write.option('sep', '\t').csv(output_path)
if __name__ == '__main__':
sys.exit(main(sys.argv))
def reducer(self, key, values):
yield key, sum(values)
In a typical deployment scenario, the Luigi orchestration definition above as well as the
Pyspark processing code would be packaged into a deployment package, such as a container image. The
processing code does not have to be implemented in Python, any program can be packaged in the
image and run from Luigi.

Note that :class:`luigi.contrib.hadoop.JobTask` doesn't require you to implement a
:func:`~luigi.task.Task.run` method. Instead, you typically implement a
:func:`~luigi.contrib.hadoop.JobTask.mapper` and
:func:`~luigi.contrib.hadoop.JobTask.reducer` method. *mapper* and *combiner* require
yielding tuple of only two elements: key and value. Both key and value also may be a tuple.

Step 2 – Find the Top Artists
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -148,7 +189,7 @@ we choose to do this not as a Hadoop job, but just as a plain old for-loop in Py
def requires(self):
if self.use_hadoop:
return AggregateArtistsHadoop(self.date_interval)
return AggregateArtistsSpark(self.date_interval)
else:
return AggregateArtists(self.date_interval)
Expand Down Expand Up @@ -213,15 +254,15 @@ building all its upstream dependencies.
Using the Central Planner
~~~~~~~~~~~~~~~~~~~~~~~~~

The `--local-scheduler` flag tells Luigi not to connect to a central scheduler.
The ``--local-scheduler`` flag tells Luigi not to connect to a central scheduler.
This is recommended in order to get started and or for development purposes.
At the point where you start putting things in production
we strongly recommend running the central scheduler server.
In addition to providing locking
so that the same task is not run by multiple processes at the same time,
this server also provides a pretty nice visualization of your current work flow.

If you drop the `--local-scheduler` flag,
If you drop the ``--local-scheduler`` flag,
your script will try to connect to the central planner,
by default at localhost port 8082.
If you run
Expand All @@ -234,7 +275,7 @@ in the background and then run your task without the ``--local-scheduler`` flag,
then your script will now schedule through a centralized server.
You need `Tornado <http://www.tornadoweb.org/>`__ for this to work.

Launching `http://localhost:8082` should show something like this:
Launching http://localhost:8082 should show something like this:

.. figure:: web_server.png
:alt: Web server screenshot
Expand Down
66 changes: 30 additions & 36 deletions examples/top_artists.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from luigi import six

import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs
import luigi.contrib.postgres
import luigi.contrib.spark


class ExternalStreams(luigi.ExternalTask):
Expand Down Expand Up @@ -136,17 +136,28 @@ def run(self):
out_file.write('{}\t{}\n'.format(artist, count))


class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask):
class AggregateArtistsSpark(luigi.contrib.spark.SparkSubmitTask):
"""
This task runs a :py:class:`luigi.contrib.hadoop.JobTask` task
This task runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
over each target data returned by :py:meth:`~/.StreamsHdfs.output` and
writes the result into its :py:meth:`~.AggregateArtistsHadoop.output` target (a file in HDFS).
This class uses :py:meth:`luigi.contrib.spark.SparkJob.run`.
writes the result into its :py:meth:`~.AggregateArtistsSpark.output` target (a file in HDFS).
"""

date_interval = luigi.DateIntervalParameter()

"""
The Pyspark script to run.
For Spark applications written in Java or Scala, the name of a jar file should be supplied instead.
"""
app = 'top_artists_spark.py'

"""
Address of the Spark cluster master. In this case, we are not using a cluster, but running
Spark in local mode.
"""
master = 'local[*]'

def output(self):
"""
Returns the target output for this task.
Expand All @@ -155,10 +166,7 @@ def output(self):
:return: the target output for this task.
:rtype: object (:py:class:`luigi.target.Target`)
"""
return luigi.contrib.hdfs.HdfsTarget(
"data/artist_streams_%s.tsv" % self.date_interval,
format=luigi.contrib.hdfs.PlainDir
)
return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval)

def requires(self):
"""
Expand All @@ -170,48 +178,34 @@ def requires(self):
"""
return [StreamsHdfs(date) for date in self.date_interval]

def mapper(self, line):
"""
The implementation of the map phase of the Hadoop job.
:param line: the input.
:return: tuple ((key, value) or, in this case, (artist, 1 stream count))
"""
_, artist, _ = line.strip().split()
yield artist, 1

def reducer(self, key, values):
"""
The implementation of the reducer phase of the Hadoop job.
:param key: the artist.
:param values: the stream count.
:return: tuple (artist, count of streams)
"""
yield key, sum(values)
def app_options(self):
# :func:`~luigi.task.Task.input` returns the targets produced by the tasks in
# `~luigi.task.Task.requires`.
return [','.join([p.path for p in self.input()]),
self.output().path]


class Top10Artists(luigi.Task):
"""
This task runs over the target data returned by :py:meth:`~/.AggregateArtists.output` or
:py:meth:`~/.AggregateArtistsHadoop.output` in case :py:attr:`~/.Top10Artists.use_hadoop` is set and
:py:meth:`~/.AggregateArtistsSpark.output` in case :py:attr:`~/.Top10Artists.use_spark` is set and
writes the result into its :py:meth:`~.Top10Artists.output` target (a file in local filesystem).
"""

date_interval = luigi.DateIntervalParameter()
use_hadoop = luigi.BoolParameter()
use_spark = luigi.BoolParameter()

def requires(self):
"""
This task's dependencies:
* :py:class:`~.AggregateArtists` or
* :py:class:`~.AggregateArtistsHadoop` if :py:attr:`~/.Top10Artists.use_hadoop` is set.
* :py:class:`~.AggregateArtistsSpark` if :py:attr:`~/.Top10Artists.use_spark` is set.
:return: object (:py:class:`luigi.task.Task`)
"""
if self.use_hadoop:
return AggregateArtistsHadoop(self.date_interval)
if self.use_spark:
return AggregateArtistsSpark(self.date_interval)
else:
return AggregateArtists(self.date_interval)

Expand Down Expand Up @@ -256,7 +250,7 @@ class ArtistToplistToDatabase(luigi.contrib.postgres.CopyToTable):
"""

date_interval = luigi.DateIntervalParameter()
use_hadoop = luigi.BoolParameter()
use_spark = luigi.BoolParameter()

host = "localhost"
database = "toplists"
Expand All @@ -277,7 +271,7 @@ def requires(self):
:return: list of object (:py:class:`luigi.task.Task`)
"""
return Top10Artists(self.date_interval, self.use_hadoop)
return Top10Artists(self.date_interval, self.use_spark)


if __name__ == "__main__":
Expand Down
28 changes: 28 additions & 0 deletions examples/top_artists_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-

import operator
import sys

from pyspark.sql import SparkSession


def main(argv):
input_paths = argv[1].split(',')
output_path = argv[2]

spark = SparkSession.builder.getOrCreate()

streams = spark.read.option('sep', '\t').csv(input_paths[0])
for stream_path in input_paths[1:]:
streams.union(spark.read.option('sep', '\t').csv(stream_path))

# The second field is the artist
counts = streams \
.map(lambda row: (row[1], 1)) \
.reduceByKey(operator.add)

counts.write.option('sep', '\t').csv(output_path)


if __name__ == '__main__':
sys.exit(main(sys.argv))

0 comments on commit a3ea435

Please sign in to comment.