Skip to content

Commit

Permalink
[BEAM-7923] Streaming support and pipeline pruning when instrumenting…
Browse files Browse the repository at this point in the history
… a pipeline with interactivity (apache#11100)

[BEAM-7923] Streaming support and pipeline pruning when instrumenting a pipeline with interactivity (apache#11100)

1. Updated pipeline_fragment to not prune newly added TestStream
transforms.
2. Updated pipeline_instrument to support streaming pipeline and prune
transforms that don't contribute to the desired outputs.
3. Updated ipython version upperbound to <8. We are targeting ipython >
7.3 for the introduction of cell magic `%pip`. Note `%pip` has different
behavior than `!pip` because the previous one guaranteees the
installation in the kernel not arbitrary environment the notebook is
running in.
4. Updated the README for Interactive Beam. The main change is
recommending using Jupyterlab over old Jupyter notebook and setup
instructions such as setting virtual env through Python3 venv module,
installing
new labextensions over nbextensions and install extra dependencies.
  • Loading branch information
Ning Kang authored Mar 13, 2020
1 parent 08ac97f commit 64c2b41
Show file tree
Hide file tree
Showing 12 changed files with 1,095 additions and 151 deletions.
104 changes: 67 additions & 37 deletions sdks/python/apache_beam/runners/interactive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ exploration much faster and easier. It provides nice features including
| | Caching locally | Caching on GCS |
| ------------------------ | --------------- | -------------- |
| Running on local machine | supported | supported |
| Running on Flink | / | supported |
| Running on Flink | supported | supported |

## Getting Started

Expand All @@ -84,23 +84,24 @@ a quick reference). For a more general and complete getting started guide, see
* Install [GraphViz](https://www.graphviz.org/download/) with your favorite
system package manager.

- Install [Jupyter](https://jupyter.org/). You can either use the one that's
included in [Anaconda](https://www.anaconda.com/download/) or
- Install [JupyterLab](https://jupyter.org/install.html). You can use
either **conda** or **pip**.

```bash
$ pip2 install --upgrade jupyter
```
* conda
```bash
conda install -c conda-forge jupyterlab
```
* pip
```bash
pip install jupyterlab
```

Make sure you have **Python2 Jupyter** since Apache Beam only supports
Python 2 at the time being.

- Install, create and activate your [virtualenv](https://virtualenv.pypa.io/).
- Install, create and activate your [venv](https://docs.python.org/3/library/venv.html).
(optional but recommended)

```bash
$ pip2 install --upgrade virtualenv
$ virtualenv -p python2 beam_venv_dir
$ source beam_venv_dir/bin/activate
python3 -m venv /path/to/beam_venv_dir
source /path/to/beam_venv_dir/bin/activate
```

If you are using shells other than bash (e.g. fish, csh), check
Expand All @@ -110,55 +111,89 @@ a quick reference). For a more general and complete getting started guide, see
**CHECK** that the virtual environment is activated by running

```bash
$ echo $VIRTUAL_ENV # This should point to beam_venv_dir
# or
$ which python # This sould point to beam_venv_dir/bin/python
which python # This sould point to beam_venv_dir/bin/python
```

* Set up Apache Beam Python. **Make sure the virtual environment is activated
when you run `setup.py`**

* ```bash
$ git clone https://github.com/apache/beam
$ cd beam/sdks/python
$ python setup.py install
git clone https://github.com/apache/beam
cd beam/sdks/python
python setup.py install
```

- Install a IPython kernel for the virtual environment you've just created.
- Install an IPython kernel for the virtual environment you've just created.
**Make sure the virtual environment is activate when you do this.** You can
skip this step if not using virtualenv.
skip this step if not using venv.

```bash
$ python -m pip install ipykernel
$ python -m ipykernel install --user --name beam_venv_kernel --display-name "Python (beam_venv)"
pip install ipykernel
python -m ipykernel install --user --name beam_venv_kernel --display-name "Python3 (beam_venv)"
```

**CHECK** that IPython kernel `beam_venv_kernel` is available for Jupyter to
use.

```bash
$ jupyter kernelspec list
jupyter kernelspec list
```

- Extend JupyterLab through labextension. **Note**: labextension is different from nbextension
from pre-lab jupyter notebooks.

All jupyter labextensions need nodejs

```bash
# Homebrew users do
brew install node
# Or Conda users do
conda install -c conda-forge nodejs
```

Enable ipywidgets

```bash
pip install ipywidgets
jupyter labextension install @jupyter-widgets/jupyterlab-manager
```

### Start the notebook

To start the notebook, simply run

```bash
$ jupyter notebook
jupyter lab
```

Optionally increase the iopub broadcast data rate limit of jupyterlab

```bash
jupyter lab --NotebookApp.iopub_data_rate_limit=10000000
```


This automatically opens your default web browser pointing to
http://localhost:8888.
http://localhost:8888/lab.

You can create a new notebook file by clicking `New` > `Notebook: Python
(beam_venv)`.
You can create a new notebook file by clicking `Python3 (beam_venv)` from the launcher
page of jupyterlab.

Or after you've already opend a notebook, change the kernel by clicking
`Kernel` > `Change Kernel` > `Python (beam_venv)`.
Or after you've already opened a notebook, change the kernel by clicking
`Kernel` > `Change Kernel` > `Python3 (beam_venv)`.

Voila! You can now run Beam pipelines interactively in your Jupyter notebook!

In the notebook, you can use `tab` key on the keyboard for auto-completion.
To turn on greedy auto-completion, you can run such ipython magic

```
%config IPCompleter.greedy=True
```

You can also use `shift` + `tab` keys on the keyboard for a popup of docstrings at the
current cursor position.

**See [Interactive Beam Example.ipynb](examples/Interactive%20Beam%20Example.ipynb)
for more examples.**

Expand Down Expand Up @@ -231,13 +266,8 @@ You can choose to run Interactive Beam on Flink with the following settings.
[Interactive Beam Running on Flink.ipynb](examples/Interactive%20Beam%20Running%20on%20Flink.ipynb)
capture the status of the world when it's last updated.

## TL;DR;

You can now interactively run Beam Python pipeline! Check out the Youtube demo

[![IMAGE ALT TEXT HERE](https://img.youtube.com/vi/c5CjA1e3Cqw/0.jpg)](https://www.youtube.com/watch?v=c5CjA1e3Cqw)

## More Information

* [Apache Beam Python SDK Quickstart](https://beam.apache.org/get-started/quickstart-py/)
* [Interactive Beam Design Doc](https://docs.google.com/document/d/10bTc97GN5Wk-nhwncqNq9_XkJFVVy0WLT4gPFqP6Kmw/edit?usp=sharing)
* [Interactive Beam Design Doc V2](https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit?usp=sharing)
* [Interactive Beam Design Doc V1](https://docs.google.com/document/d/10bTc97GN5Wk-nhwncqNq9_XkJFVVy0WLT4gPFqP6Kmw/edit?usp=sharing)
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ def is_background_caching_job_needed(user_pipeline):
# If this is True, we can invalidate a previous done/running job if there is
# one.
cache_changed = is_source_to_cache_changed(user_pipeline)
# When capture replay is disabled, cache is always needed for capturable
# sources (if any).
if need_cache and not ie.current_env().options.enable_capture_replay:
from apache_beam.runners.interactive.options import capture_control
capture_control.evict_captured_data()
return True
return (
need_cache and
# Checks if it's the first time running a job from the pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
from datetime import timedelta

from dateutil import tz
from pandas.io.json import json_normalize

from apache_beam import pvalue
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import pipeline_instrument as instr
from apache_beam.runners.interactive.utils import elements_to_df
from apache_beam.runners.interactive.utils import to_element_list
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import IntervalWindow

try:
import jsons # pylint: disable=import-error
from IPython import get_ipython # pylint: disable=import-error
from IPython.core.display import HTML # pylint: disable=import-error
from IPython.core.display import Javascript # pylint: disable=import-error
Expand All @@ -59,9 +59,6 @@

_LOGGER = logging.getLogger(__name__)

# 1-d types that need additional normalization to be compatible with DataFrame.
_one_dimension_types = (int, float, str, bool, list, tuple)

_CSS = """
<style>
.p-Widget.jp-OutputPrompt.jp-OutputArea-prompt:empty {{
Expand Down Expand Up @@ -243,6 +240,11 @@ def __init__(self, pcoll, include_window_info=False, display_facets=False):
# With only the constructor of PipelineInstrument, any interactivity related
# pre-process or instrument is not triggered for performance concerns.
self._pin = instr.PipelineInstrument(pcoll.pipeline)
# Variable name as the title for element value in the rendered data table.
self._pcoll_var = self._pin.cacheable_var_by_pcoll_id(
self._pin.pcolls_to_pcoll_id.get(str(pcoll), None))
if not self._pcoll_var:
self._pcoll_var = 'Value'
self._cache_key = self._pin.cache_key(self._pcoll)
self._dive_display_id = 'facets_dive_{}_{}'.format(
self._cache_key, id(self))
Expand Down Expand Up @@ -286,6 +288,12 @@ def display(self, updating_pv=None):
# Ensures that dive, overview and table render the same data because the
# materialized PCollection data might being updated continuously.
data = self._to_dataframe()
# Give the numbered column names when visualizing.
data.columns = [
self._pcoll_var + '.' +
str(column) if isinstance(column, int) else column
for column in data.columns
]
# String-ify the dictionaries for display because elements of type dict
# cannot be ordered.
data = data.applymap(lambda x: str(x) if isinstance(x, dict) else x)
Expand Down Expand Up @@ -326,6 +334,9 @@ def _display_overview(self, data, update=None):
for column in ('event_time', 'windows', 'pane_info'))):
data = data.drop(['event_time', 'windows', 'pane_info'], axis=1)

# GFSG expects all column names to be strings.
data.columns = data.columns.astype(str)

gfsg = GenericFeatureStatisticsGenerator()
proto = gfsg.ProtoFromDataFrames([{'name': 'data', 'table': data}])
protostr = base64.b64encode(proto.SerializeToString()).decode('utf-8')
Expand Down Expand Up @@ -383,34 +394,15 @@ def _display_dataframe(self, data, update=None):
if not data.empty:
self._is_datatable_empty = False

def _to_element_list(self):
pcoll_list = []
if ie.current_env().cache_manager().exists('full', self._cache_key):
pcoll_list, _ = ie.current_env().cache_manager().read('full',
self._cache_key)
return pcoll_list

# TODO(BEAM-7926): Refactor to new non-flatten dataframe conversion logic.
def _to_dataframe(self):
normalized_list = []
# Column name for _one_dimension_types if presents.
normalized_column = str(self._pcoll)
# Normalization needs to be done for each element because they might be of
# different types. The check is only done on the root level, pandas json
# normalization I/O would take care of the nested levels.
for el in self._to_element_list():
if self._is_one_dimension_type(el):
# Makes such data structured.
normalized_list.append({normalized_column: el})
else:
normalized_list.append(jsons.load(jsons.dump(el)))
# Creates a dataframe that str() 1-d iterable elements after
# normalization so that facets_overview can treat such data as categorical.
return json_normalize(normalized_list).applymap(
lambda x: str(x) if type(x) in (list, tuple) else x)

def _is_one_dimension_type(self, val):
return type(val) in _one_dimension_types
results = []
cache_manager = ie.current_env().cache_manager()
if cache_manager.exists('full', self._cache_key):
coder = cache_manager.load_pcoder('full', self._cache_key)
reader, _ = cache_manager.read('full', self._cache_key)
results = list(to_element_list(reader, coder, include_window_info=True))

return elements_to_df(results, self._include_window_info)


def format_window_info_in_dataframe(data):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ def setUp(self):
# Generally test the logic where notebook is connected to the assumed
# ipython kernel by forcefully setting notebook check to True.
ie.current_env()._is_in_notebook = True
ib.options.display_timezone = pytz.timezone('US/Pacific')

self._p = beam.Pipeline(ir.InteractiveRunner())
# pylint: disable=range-builtin-not-iterating
self._pcoll = self._p | 'Create' >> beam.Create(range(5))
ib.watch(self)
self._p.run()
ib.options.display_timezone = pytz.timezone('US/Pacific')

def test_raise_error_for_non_pcoll_input(self):
class Foo(object):
Expand Down Expand Up @@ -177,9 +177,16 @@ def test_windows_formatter_interval(self):
'2020-03-02 15:14:54.000000-0800 (2h 31m 46s)',
pv.windows_formatter([iw]))

def pane_info_formatter(self):
PaneInfo(is_last=True, timing=PaneInfoTiming.EARLY)
self.assertEqual('Pane Final EARLY')
def test_pane_info_formatter(self):
self.assertEqual(
'Pane 0: Final Early',
pv.pane_info_formatter(
PaneInfo(
is_first=False,
is_last=True,
timing=PaneInfoTiming.EARLY,
index=0,
nonspeculative_index=0)))


if __name__ == '__main__':
Expand Down
22 changes: 18 additions & 4 deletions sdks/python/apache_beam/runners/interactive/interactive_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from apache_beam.runners.interactive import background_caching_job
from apache_beam.runners.interactive.display import pipeline_graph
from apache_beam.runners.interactive.options import capture_control
from apache_beam.runners.interactive.utils import to_element_list

# size of PCollection samples cached.
SAMPLE_SIZE = 8
Expand Down Expand Up @@ -213,11 +214,24 @@ def state(self):
def wait_until_finish(self):
self._underlying_result.wait_until_finish()

def get(self, pcoll):
def get(self, pcoll, include_window_info=False):
"""Materializes the PCollection into a list.
If include_window_info is True, then returns the elements as
WindowedValues. Otherwise, return the element as itself.
"""
return list(self.read(pcoll, include_window_info))

def read(self, pcoll, include_window_info=False):
"""Reads the PCollection one element at a time from cache.
If include_window_info is True, then returns the elements as
WindowedValues. Otherwise, return the element as itself.
"""
key = self._pipeline_instrument.cache_key(pcoll)
if ie.current_env().cache_manager().exists('full', key):
pcoll_list, _ = ie.current_env().cache_manager().read('full', key)
return pcoll_list
cache_manager = ie.current_env().cache_manager()
if cache_manager.exists('full', key):
coder = cache_manager.load_pcoder('full', key)
reader, _ = cache_manager.read('full', key)
return to_element_list(reader, coder, include_window_info)
else:
raise ValueError('PCollection not available, please run the pipeline.')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import apache_beam as beam
from apache_beam.pipeline import PipelineVisitor
from apache_beam.testing.test_stream import TestStream


class PipelineFragment(object):
Expand Down Expand Up @@ -201,6 +202,8 @@ def _prune_runner_pipeline_to_fragment(
self, runner_pipeline, necessary_transforms):
class PruneVisitor(PipelineVisitor):
def enter_composite_transform(self, transform_node):
if isinstance(transform_node.transform, TestStream):
return
pruned_parts = list(transform_node.parts)
for part in transform_node.parts:
if part not in necessary_transforms:
Expand Down
Loading

0 comments on commit 64c2b41

Please sign in to comment.