Skip to content

Commit

Permalink
[Playground] Examples CD (apache#23664)
Browse files Browse the repository at this point in the history
* WIP workflow

* WIP

* aio

* subdirs

* fix

* WF fix

* WF tmp

* TO REMOVE

* Update playground_deploy_examples.yml

* Update playground_deploy_examples.yml

* GA workflow

* TO REMOVE

* WIP

* wf

* to remove

* Revert "to remove"

This reverts commit c33aeb7.

* revert

* concurrency2

* no concurrency

* lock

* seq

* beam_concurr

* debug

* debug

* dirt

* wait_for_ready

* Revert "debug"

This reverts commit 0d08a0c80ef6424829a47c01dbfe52ea0e3f0d93.

* BEAM_CONC: 4

* origin required

* webgrpc

* examples_cd wf

* add

* to remove

* keep

* fix

* secrets

* refactor

* rm

* -dump

* Revert "rm"

This reverts commit 349d43c.

* renamse

* nit

* fix

* stable

Co-authored-by: MakarkinSAkvelon <[email protected]>
  • Loading branch information
eantyshev and MakarkinSAkvelon authored Oct 19, 2022
1 parent 10977bf commit b5a0f48
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 67 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/playground_examples_cd.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Playground Examples CD

on:
workflow_dispatch:
# Concurrency group for all deployment ops
concurrency: playground_production
jobs:
deploy_examples:
strategy:
matrix:
sdk: ["python", "go", "java"]
# run sequentially
max-parallel: 1
uses: ./.github/workflows/playground_examples_cd_reusable.yml
with:
sdk: ${{ matrix.sdk }}
origin: PG_EXAMPLES
subdirs: "./learning/katas ./examples ./sdks"
secrets:
project_id: ${{ secrets.GCP_PLAYGROUND_PROJECT_ID }}
sa_key_content: ${{ secrets.GCP_PLAYGROUND_SA_KEY }}
76 changes: 76 additions & 0 deletions .github/workflows/playground_examples_cd_reusable.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Playground Examples CD for a given SDK and origin

on:
workflow_call:
inputs:
sdk:
type: string
required: true
subdirs:
type: string
required: true
origin:
type: string
required: true
secrets:
project_id:
required: true
sa_key_content:
required: true

jobs:
cd:
name: CD ${{ inputs.sdk }} ${{ inputs.origin }}
runs-on: ubuntu-latest
env:
ORIGIN: ${{ inputs.origin }}
SDK: ${{ inputs.sdk }}
STEP: CD
SUBDIRS: ${{ inputs.subdirs }}

GOOGLE_APPLICATION_CREDENTIALS: /tmp/gcp_access.json
GOOGLE_CLOUD_PROJECT: ${{ secrets.project_id }}
SA_KEY_CONTENT: ${{ secrets.sa_key_content }}
steps:
- name: Check out the repo
uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.8'
- name: install deps
run: pip install -r requirements.txt
working-directory: playground/infrastructure
- name: Decode GCP credentials
run: |
echo "$SA_KEY_CONTENT" | base64 -d > $GOOGLE_APPLICATION_CREDENTIALS
- name: Run ci_cd.py
run: |
python3 ci_cd.py \
--step $STEP \
--sdk SDK_${SDK^^} \
--origin $ORIGIN \
--subdirs $SUBDIRS
working-directory: playground/infrastructure
env:
BEAM_ROOT_DIR: "../../"
SDK_CONFIG: "../../playground/sdks.yaml"
BEAM_EXAMPLE_CATEGORIES: "../categories.yaml"
SERVER_ADDRESS: https://backend-${{ env.SDK }}-beta-dot-apache-beam-testing.appspot.com
BEAM_USE_WEBGRPC: yes
BEAM_CONCURRENCY: 4
9 changes: 5 additions & 4 deletions .github/workflows/playground_examples_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ name: Playground Examples CI
on:
push:
paths:
- .github/workflows/playground*
- .github/workflows/playground_examples_ci_reusable.yml
- .github/workflows/playground_examples_ci.yml
- playground/backend/**
- playground/infrastructure/**
- learning/katas/**
Expand All @@ -35,15 +36,15 @@ jobs:
sdk: ["python", "go", "java"]
# run sequentially
max-parallel: 1
uses: ./.github/workflows/playground_examples_reusable.yml
uses: ./.github/workflows/playground_examples_ci_reusable.yml
with:
sdk: ${{ matrix.sdk }}
step: CI
origin: PG_EXAMPLES
subdirs: ./learning/katas ./examples ./sdks
# unfortunately, there's no input type for list
allowlist: |
.github/workflows/playground_examples_reusable.yml \
.github/workflows/playground_examples_ci_reusable.yml \
.github/workflows/playground_examples_ci.yml \
playground/backend \
playground/infrastructure
playground/infrastructure \
8 changes: 4 additions & 4 deletions .github/workflows/tour_of_beam_examples_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ name: Tour Of Beam Examples CI
on:
push:
paths:
- ./.github/workflows/playground_examples_reusable.yml
- ./.github/workflows/playground_examples_ci_reusable.yml
- ./.github/workflows/tour_of_beam_examples_ci.yml
- playground/backend/**
- playground/infrastructure/**
Expand All @@ -34,14 +34,14 @@ jobs:
sdk: ["python", "go", "java"]
# run sequentially
max-parallel: 1
uses: ./.github/workflows/playground_examples_reusable.yml
uses: ./.github/workflows/playground_examples_ci_reusable.yml
with:
sdk: ${{ matrix.sdk }}
step: CI
origin: TB_EXAMPLES
subdirs: "./learning/tour-of-beam/learning-content"
allowlist: |
.github/workflows/playground_examples_reusable.yml \
.github/workflows/playground_examples_ci_reusable.yml \
.github/workflows/tour_of_beam_examples_ci.yml \
playground/backend \
playground/infrastructure
playground/infrastructure \
42 changes: 18 additions & 24 deletions playground/infrastructure/cd_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,21 @@ async def _get_outputs(self, examples: List[Example]):
Args:
examples: beam examples that should be run
"""
await get_statuses(
examples) # run examples code and wait until all are executed
client = GRPCClient()
tasks = [client.get_run_output(example.pipeline_id) for example in examples]
outputs = await asyncio.gather(*tasks)

tasks = [client.get_log(example.pipeline_id) for example in examples]
logs = await asyncio.gather(*tasks)

if len(examples) > 0 and examples[0].sdk in [SDK_PYTHON, SDK_JAVA]:
tasks = [
client.get_graph(example.pipeline_id, example.filepath)
for example in examples
]
graphs = await asyncio.gather(*tasks)

for graph, example in zip(graphs, examples):
example.graph = graph

for output, example in zip(outputs, examples):
example.output = output

for log, example in zip(logs, examples):
example.logs = log
async def _populate_fields(example: Example):
try:
example.compile_output = await client.get_compile_output(example.pipeline_id)
example.output = await client.get_run_output(example.pipeline_id)
example.logs = await client.get_log(example.pipeline_id)
if example.sdk in [SDK_JAVA, SDK_PYTHON]:
example.graph = await client.get_graph(example.pipeline_id, example.filepath)
except Exception as e:
logging.error(example.link)
logging.error(example.compile_output)
raise RuntimeError(f"error in {example.name}") from e

async with GRPCClient() as client:
await get_statuses(client,
examples) # run examples code and wait until all are executed
tasks = [_populate_fields(example) for example in examples]
await asyncio.gather(*tasks)

8 changes: 4 additions & 4 deletions playground/infrastructure/ci_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ async def verify_examples(self, examples: List[Example], origin: Origin):
"""
single_file_examples = list(filter(
lambda example: example.tag.multifile is False, examples))
await get_statuses(single_file_examples)
await self._verify_examples(single_file_examples, origin)
async with GRPCClient() as client:
await get_statuses(client, single_file_examples)
await self._verify_examples(client, single_file_examples, origin)

async def _verify_examples(self, examples: List[Example], origin: Origin):
async def _verify_examples(self, client: any, examples: List[Example], origin: Origin):
"""
Verify statuses of beam examples and the number of found default examples.
Expand All @@ -74,7 +75,6 @@ async def _verify_examples(self, examples: List[Example], origin: Origin):
examples: beam examples that should be verified
"""
count_of_verified = 0
client = GRPCClient()
verify_status_failed = False
default_examples = []

Expand Down
38 changes: 28 additions & 10 deletions playground/infrastructure/grpc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
Module contains the client to communicate with GRPC test Playground server
"""
import logging
import os
import uuid

import grpc
import sonora.aio

from api.v1 import api_pb2_grpc, api_pb2
from config import Config
Expand All @@ -29,11 +31,25 @@ class GRPCClient:
"""GRPCClient is gRPC client for sending a request to the backend."""

def __init__(self, timeout=10, wait_for_ready=True):
self._channel = grpc.aio.insecure_channel(Config.SERVER_ADDRESS)
use_webgrpc = os.getenv("BEAM_USE_WEBGRPC", False)
if use_webgrpc:
self._channel = sonora.aio.insecure_web_channel(Config.SERVER_ADDRESS)
else:
self._channel = grpc.aio.insecure_channel(Config.SERVER_ADDRESS)

self._stub = api_pb2_grpc.PlaygroundServiceStub(self._channel)
self._timeout = timeout
self._wait_for_ready = wait_for_ready

self._kwargs = dict(timeout=timeout)
if wait_for_ready and not use_webgrpc:
self._kwargs["wait_for_ready"] = True

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._channel.__aexit__(exc_type, exc_val, exc_tb)


async def run_code(
self, code: str, sdk: api_pb2.Sdk, pipeline_options: str) -> str:
"""
Expand All @@ -54,7 +70,7 @@ async def run_code(
f'Incorrect sdk: must be from this pool: {", ".join(sdks)}')
request = api_pb2.RunCodeRequest(
code=code, sdk=sdk, pipeline_options=pipeline_options)
response = await self._stub.RunCode(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.RunCode(request, **self._kwargs)
return response.pipeline_uuid

async def check_status(self, pipeline_uuid: str) -> api_pb2.Status:
Expand All @@ -69,7 +85,7 @@ async def check_status(self, pipeline_uuid: str) -> api_pb2.Status:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.CheckStatusRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.CheckStatus(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.CheckStatus(request, **self._kwargs)
return response.status

async def get_run_error(self, pipeline_uuid: str) -> str:
Expand All @@ -84,7 +100,7 @@ async def get_run_error(self, pipeline_uuid: str) -> str:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetRunErrorRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.GetRunError(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetRunError(request, **self._kwargs)
return response.output

async def get_run_output(self, pipeline_uuid: str) -> str:
Expand All @@ -99,7 +115,7 @@ async def get_run_output(self, pipeline_uuid: str) -> str:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetRunOutputRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.GetRunOutput(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetRunOutput(request, **self._kwargs)
return response.output

async def get_log(self, pipeline_uuid: str) -> str:
Expand All @@ -114,7 +130,8 @@ async def get_log(self, pipeline_uuid: str) -> str:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetLogsRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.GetLogs(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetLogs(request, **self._kwargs)

return response.output

async def get_compile_output(self, pipeline_uuid: str) -> str:
Expand All @@ -129,7 +146,8 @@ async def get_compile_output(self, pipeline_uuid: str) -> str:
"""
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetCompileOutputRequest(pipeline_uuid=pipeline_uuid)
response = await self._stub.GetCompileOutput(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetCompileOutput(request, **self._kwargs)

return response.output

async def get_graph(self, pipeline_uuid: str, example_filepath: str) -> str:
Expand All @@ -146,7 +164,7 @@ async def get_graph(self, pipeline_uuid: str, example_filepath: str) -> str:
self._verify_pipeline_uuid(pipeline_uuid)
request = api_pb2.GetGraphRequest(pipeline_uuid=pipeline_uuid)
try:
response = await self._stub.GetGraph(request, timeout=self._timeout, wait_for_ready=self._wait_for_ready)
response = await self._stub.GetGraph(request, **self._kwargs)
if response.graph == "":
logging.warning("Graph for %s wasn't generated", example_filepath)
return response.graph
Expand Down
5 changes: 2 additions & 3 deletions playground/infrastructure/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Example:
type: PrecompiledObjectType = PRECOMPILED_OBJECT_TYPE_UNSPECIFIED
pipeline_id: str = ""
output: str = ""
compile_output: str = ""
graph: str = ""


Expand Down Expand Up @@ -146,7 +147,7 @@ def find_examples(root_dir: str, subdirs: List[str], supported_categories: List[
return examples


async def get_statuses(examples: List[Example], concurrency: int = 10):
async def get_statuses(client: GRPCClient, examples: List[Example], concurrency: int = 10):
"""
Receive status and update example.status and example.pipeline_id for
each example
Expand All @@ -156,8 +157,6 @@ async def get_statuses(examples: List[Example], concurrency: int = 10):
pipeline_id values.
"""
tasks = []
client = GRPCClient()

try:
concurrency = int(os.environ["BEAM_CONCURRENCY"])
logging.info("override default concurrency: %d", concurrency)
Expand Down
3 changes: 2 additions & 1 deletion playground/infrastructure/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ pytest-asyncio==0.18.2
pytest-mock==3.6.1
PyYAML==6.0
tqdm~=4.62.3
google-cloud-datastore==2.7.1
google-cloud-datastore==2.7.1
sonora==0.2.2
Loading

0 comments on commit b5a0f48

Please sign in to comment.