Skip to content

Commit

Permalink
Add backward compatibility with old versions of Apache Beam (apache#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj authored Nov 11, 2022
1 parent f919abc commit e8ab8cc
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 28 deletions.
57 changes: 35 additions & 22 deletions airflow/providers/apache/beam/hooks/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""This module contains a Apache Beam Hook."""
from __future__ import annotations

import contextlib
import json
import os
import select
Expand All @@ -28,6 +29,8 @@
from tempfile import TemporaryDirectory
from typing import Callable

from packaging.version import Version

from airflow.exceptions import AirflowConfigException, AirflowException
from airflow.hooks.base import BaseHook
from airflow.providers.google.go_module_utils import init_module, install_dependencies
Expand Down Expand Up @@ -226,37 +229,47 @@ def start_python_pipeline(
if "labels" in variables:
variables["labels"] = [f"{key}={value}" for key, value in variables["labels"].items()]

if py_requirements is not None:
if not py_requirements and not py_system_site_packages:
warning_invalid_environment = textwrap.dedent(
"""\
Invalid method invocation. You have disabled inclusion of system packages and empty list
required for installation, so it is not possible to create a valid virtual environment.
In the virtual environment, apache-beam package must be installed for your job to be \
executed. To fix this problem:
* install apache-beam on the system, then set parameter py_system_site_packages to True,
* add apache-beam to the list of required packages in parameter py_requirements.
"""
)
raise AirflowException(warning_invalid_environment)

with TemporaryDirectory(prefix="apache-beam-venv") as tmp_dir:
with contextlib.ExitStack() as exit_stack:
if py_requirements is not None:
if not py_requirements and not py_system_site_packages:
warning_invalid_environment = textwrap.dedent(
"""\
Invalid method invocation. You have disabled inclusion of system packages and empty
list required for installation, so it is not possible to create a valid virtual
environment. In the virtual environment, apache-beam package must be installed for
your job to be executed.
To fix this problem:
* install apache-beam on the system, then set parameter py_system_site_packages
to True,
* add apache-beam to the list of required packages in parameter py_requirements.
"""
)
raise AirflowException(warning_invalid_environment)
tmp_dir = exit_stack.enter_context(TemporaryDirectory(prefix="apache-beam-venv"))
py_interpreter = prepare_virtualenv(
venv_directory=tmp_dir,
python_bin=py_interpreter,
system_site_packages=py_system_site_packages,
requirements=py_requirements,
)
command_prefix = [py_interpreter] + py_options + [py_file]

self._start_pipeline(
variables=variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
)
else:
command_prefix = [py_interpreter] + py_options + [py_file]

beam_version = (
subprocess.check_output(
[py_interpreter, "-c", "import apache_beam; print(apache_beam.__version__)"]
)
.decode()
.strip()
)
self.log.info("Beam version: %s", beam_version)
impersonate_service_account = variables.get("impersonate_service_account")
if impersonate_service_account:
if Version(beam_version) < Version("2.39.0") or True:
raise AirflowException(
"The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer."
)
self._start_pipeline(
variables=variables,
command_prefix=command_prefix,
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/beam/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ versions:

dependencies:
- apache-airflow>=2.3.0
- apache-beam>=2.39.0
- apache-beam>=2.33.0

integrations:
- integration-name: Apache Beam
Expand Down
5 changes: 5 additions & 0 deletions airflow/providers/google/cloud/operators/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class DataflowConfiguration:
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
.. warning::
This option requires Apache Beam 2.39.0 or newer.
:param drain_pipeline: Optional, set to True if want to stop streaming job by draining it
instead of canceling during killing task instance. See:
https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"apache.beam": {
"deps": [
"apache-airflow>=2.3.0",
"apache-beam>=2.39.0"
"apache-beam>=2.33.0"
],
"cross-providers-deps": [
"google"
Expand Down
43 changes: 39 additions & 4 deletions tests/providers/apache/beam/hooks/test_beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import copy
import os
import re
import subprocess
import unittest
from unittest import mock
from unittest.mock import MagicMock

import pytest
from parameterized import parameterized

from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -58,7 +60,8 @@

class TestBeamHook(unittest.TestCase):
@mock.patch(BEAM_STRING.format("BeamCommandRunner"))
def test_start_python_pipeline(self, mock_runner):
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0")
def test_start_python_pipeline(self, mock_check_output, mock_runner):
hook = BeamHook(runner=DEFAULT_RUNNER)
wait_for_done = mock_runner.return_value.wait_for_done
process_line_callback = MagicMock()
Expand All @@ -83,6 +86,26 @@ def test_start_python_pipeline(self, mock_runner):
)
wait_for_done.assert_called_once_with()

@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.35.0")
def test_start_python_pipeline_unsupported_option(self, mock_check_output):
hook = BeamHook(runner=DEFAULT_RUNNER)

with pytest.raises(
AirflowException,
match=re.escape("The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer."),
):
hook.start_python_pipeline(
variables={
"impersonate_service_account": "[email protected]",
},
py_file="/tmp/file.py",
py_options=["-m"],
py_interpreter="python3",
py_requirements=None,
py_system_site_packages=False,
process_line_callback=MagicMock(),
)

@parameterized.expand(
[
("default_to_python3", "python3"),
Expand All @@ -92,7 +115,10 @@ def test_start_python_pipeline(self, mock_runner):
]
)
@mock.patch(BEAM_STRING.format("BeamCommandRunner"))
def test_start_python_pipeline_with_custom_interpreter(self, _, py_interpreter, mock_runner):
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0")
def test_start_python_pipeline_with_custom_interpreter(
self, _, py_interpreter, mock_check_output, mock_runner
):
hook = BeamHook(runner=DEFAULT_RUNNER)
wait_for_done = mock_runner.return_value.wait_for_done
process_line_callback = MagicMock()
Expand Down Expand Up @@ -127,8 +153,14 @@ def test_start_python_pipeline_with_custom_interpreter(self, _, py_interpreter,
)
@mock.patch(BEAM_STRING.format("prepare_virtualenv"))
@mock.patch(BEAM_STRING.format("BeamCommandRunner"))
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0")
def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system_packages(
self, current_py_requirements, current_py_system_site_packages, mock_runner, mock_virtualenv
self,
current_py_requirements,
current_py_system_site_packages,
mock_check_output,
mock_runner,
mock_virtualenv,
):
hook = BeamHook(runner=DEFAULT_RUNNER)
wait_for_done = mock_runner.return_value.wait_for_done
Expand Down Expand Up @@ -164,7 +196,10 @@ def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system
)

@mock.patch(BEAM_STRING.format("BeamCommandRunner"))
def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages(self, mock_runner):
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0")
def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages(
self, mock_check_output, mock_runner
):
hook = BeamHook(runner=DEFAULT_RUNNER)
wait_for_done = mock_runner.return_value.wait_for_done
process_line_callback = MagicMock()
Expand Down

0 comments on commit e8ab8cc

Please sign in to comment.