Skip to content

Commit

Permalink
[Tour Of Beam] saving user code (apache#23938)
Browse files Browse the repository at this point in the history
* docker-compose

* stubs&README

* AIO

* fix

* nit

* WF triggers

* fix

* fix
  • Loading branch information
eantyshev authored Nov 3, 2022
1 parent ec6da53 commit 77116c5
Show file tree
Hide file tree
Showing 25 changed files with 5,842 additions and 143 deletions.
18 changes: 15 additions & 3 deletions .github/workflows/tour_of_beam_backend_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ on:
push:
branches: ['master', 'release-*']
tags: 'v*'
paths: ['learning/tour-of-beam/backend/**']
paths:
- 'learning/tour-of-beam/backend/**'
- 'playground/backend/**'
pull_request:
branches: ['master', 'release-*']
tags: 'v*'
paths: ['learning/tour-of-beam/backend/**']
paths:
- 'learning/tour-of-beam/backend/**'
- 'playground/backend/**'

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
Expand All @@ -43,12 +47,16 @@ env:
DATASTORE_PROJECT_ID: demo-test-proj
DATASTORE_EMULATOR_HOST: localhost:8081
DATASTORE_EMULATOR_DATADIR: ./datadir
# playground API
PLAYGROUND_ROUTER_HOST: localhost:8000

# GCF
PORT_SDK_LIST: 8801
PORT_GET_CONTENT_TREE: 8802
PORT_GET_UNIT_CONTENT: 8803
PORT_GET_USER_PROGRESS: 8804
PORT_POST_UNIT_COMPLETE: 8805
PORT_POST_USER_CODE: 8806


jobs:
Expand All @@ -63,7 +71,9 @@ jobs:
with:
# pin to the biggest Go version supported by Cloud Functions runtime
go-version: '1.16'

- name: Build Playground router image
run: ./gradlew playground:backend:containers:router:docker
working-directory: ${{ env.GITHUB_WORKSPACE }}
# 1. Start emulators
- name: Start emulators
run: docker-compose up -d
Expand All @@ -81,6 +91,8 @@ jobs:
run: PORT=${{ env.PORT_GET_USER_PROGRESS }} FUNCTION_TARGET=getUserProgress ./tob_function &
- name: Run postUnitComplete in background
run: PORT=${{ env.PORT_POST_UNIT_COMPLETE }} FUNCTION_TARGET=postUnitComplete ./tob_function &
- name: Run postUserCode in background
run: PORT=${{ env.PORT_POST_USER_CODE }} FUNCTION_TARGET=postUserCode ./tob_function &

# 3. Load data in datastore: run CD step on samples/learning-content
- name: Run CI/CD to populate datastore
Expand Down
6 changes: 5 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ tasks.rat {

// Tour Of Beam backend autogenerated Datastore indexes
"learning/tour-of-beam/backend/internal/storage/index.yaml",


// Tour Of Beam backend autogenerated Playground GRPC API stubs and mocks
"learning/tour-of-beam/backend/playground_api/api.pb.go",
"learning/tour-of-beam/backend/playground_api/api_grpc.pb.go",
"learning/tour-of-beam/backend/playground_api/mock.go",

// test p8 file for SnowflakeIO
"sdks/java/io/snowflake/src/test/resources/invalid_test_rsa_key.p8",
Expand Down
8 changes: 8 additions & 0 deletions learning/tour-of-beam/backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ Authorized endpoints also consume `Authorization: Bearer <id_token>` header
* getUserProgress?sdk=<sdk>
* postUnitContent?sdk=<sdk>&id=<id>

### Playground GRPC API

We use Playground GRPC to save/get user snippets, so we keep the generated stubs in [playground_api](playground_api)
To re-generate:
```
$ go generate -x ./...
```


### Datastore schema

Expand Down
13 changes: 13 additions & 0 deletions learning/tour-of-beam/backend/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,16 @@ services:
- PROJECT_ID=${GOOGLE_CLOUD_PROJECT}
ports:
- "9099:9099"

playground-router:
image: apache/beam_playground-backend-router
environment:
- GOOGLE_CLOUD_PROJECT
- DATASTORE_EMULATOR_HOST=datastore:8081
- CACHE_TYPE=local
- SDK_CONFIG=/opt/playground/backend/sdks-emulator.yaml
- PROTOCOL_TYPE=TCP
ports:
- "8000:8080"
depends_on:
- datastore
62 changes: 59 additions & 3 deletions learning/tour-of-beam/backend/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
// specific language governing permissions and limitations
// under the License.

//go:generate protoc -I ../../../playground/api/v1 --go_out=playground_api --go_opt=paths=source_relative api.proto
//go:generate protoc -I ../../../playground/api/v1 --go-grpc_out=playground_api --go-grpc_opt=paths=source_relative api.proto
//go:generate moq -rm -out playground_api/mock.go playground_api PlaygroundServiceClient

package tob

import (
Expand All @@ -29,13 +33,18 @@ import (
tob "beam.apache.org/learning/tour-of-beam/backend/internal"
"beam.apache.org/learning/tour-of-beam/backend/internal/service"
"beam.apache.org/learning/tour-of-beam/backend/internal/storage"
pb "beam.apache.org/learning/tour-of-beam/backend/playground_api"
"cloud.google.com/go/datastore"
"github.com/GoogleCloudPlatform/functions-framework-go/functions"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
grpc_status "google.golang.org/grpc/status"
)

var (
svc service.IContent
auth *Authorizer
svc service.IContent
auth *Authorizer
pgClient pb.PlaygroundServiceClient
)

// Helper to format http error messages.
Expand Down Expand Up @@ -69,11 +78,31 @@ func MakeRepo(ctx context.Context) storage.Iface {
}
}

func MakePlaygroundClient(ctx context.Context) pb.PlaygroundServiceClient {
// dependencies
// required:
// * TOB_MOCK: use mock implementation
// OR
// * PLAYGROUND_ROUTER_HOST: playground API host/port
if os.Getenv("TOB_MOCK") > "" {
fmt.Println("Using mock playground client")
return pb.GetMockClient()
} else {
host := os.Getenv("PLAYGROUND_ROUTER_HOST")
cc, err := grpc.Dial(host, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("fail to dial playground: %v", err)
}
return pb.NewPlaygroundServiceClient(cc)
}
}

func init() {
ctx := context.Background()

repo := MakeRepo(ctx)
svc = &service.Svc{Repo: repo}
pgClient = MakePlaygroundClient(ctx)
svc = &service.Svc{Repo: repo, PgClient: pgClient}
auth = MakeAuthorizer(ctx, repo)
commonGet := Common(http.MethodGet)
commonPost := Common(http.MethodPost)
Expand All @@ -85,6 +114,7 @@ func init() {

functions.HTTP("getUserProgress", commonGet(ParseSdkParam(auth.ParseAuthHeader(getUserProgress))))
functions.HTTP("postUnitComplete", commonPost(ParseSdkParam(auth.ParseAuthHeader(postUnitComplete))))
functions.HTTP("postUserCode", commonPost(ParseSdkParam(auth.ParseAuthHeader(postUserCode))))
}

// Get list of SDK names
Expand Down Expand Up @@ -175,3 +205,29 @@ func postUnitComplete(w http.ResponseWriter, r *http.Request, sdk tob.Sdk, uid s

fmt.Fprint(w, "{}")
}

// Save user code for unit
func postUserCode(w http.ResponseWriter, r *http.Request, sdk tob.Sdk, uid string) {
unitId := r.URL.Query().Get("id")

var userCodeRequest tob.UserCodeRequest
err := json.NewDecoder(r.Body).Decode(&userCodeRequest)
if err != nil {
log.Println("body decode error:", err)
finalizeErrResponse(w, http.StatusBadRequest, BAD_FORMAT, "bad request body")
return
}

err = svc.SaveUserCode(r.Context(), sdk, unitId, uid, userCodeRequest)
if err != nil {
log.Println("Save user code error:", err)
message := "storage error"
if st, ok := grpc_status.FromError(err); ok {
message = fmt.Sprintf("playground api error: %s", st)
}
finalizeErrResponse(w, http.StatusInternalServerError, INTERNAL_ERROR, message)
return
}

fmt.Fprint(w, "{}")
}
2 changes: 2 additions & 0 deletions learning/tour-of-beam/backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ require (
cloud.google.com/go/firestore v1.7.0 // indirect
firebase.google.com/go/v4 v4.9.0
github.com/stretchr/testify v1.8.0
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.1
)
10 changes: 10 additions & 0 deletions learning/tour-of-beam/backend/integration_tests/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,13 @@ type UnitProgress struct {
type SdkProgress struct {
Units []UnitProgress `json:"units"`
}

type UserCodeFile struct {
Name string `json:"name"`
Content string `json:"content"`
IsMain bool `json:"isMain,omitempty"`
}
type UserCodeRequest struct {
Files []UserCodeFile `json:"files"`
PipelineOptions string `json:"pipelineOptions"`
}
134 changes: 134 additions & 0 deletions learning/tour-of-beam/backend/integration_tests/auth_emulator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//go:build integration
// +build integration

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"bytes"
"encoding/json"
"io"
"log"
"net/http"
"os"
"time"
)

const (
TIMEOUT_HTTP = 10 * time.Second
TIMEOUT_STARTUP = 30 * time.Second
)

type EmulatorClient struct {
host string
client *http.Client
}

func makeEmulatorCiient() *EmulatorClient {
return &EmulatorClient{
os.Getenv("FIREBASE_AUTH_EMULATOR_HOST"),
&http.Client{Timeout: TIMEOUT_HTTP},
}
}

func (e *EmulatorClient) waitApi() {
terminate := time.NewTimer(TIMEOUT_STARTUP)
tick := time.NewTicker(5 * time.Second)
for {
select {
case <-terminate.C:
log.Fatalf("timeout waiting for emulator")
case <-tick.C:
resp, err := e.do(http.MethodGet, "", nil)
if err != nil {
log.Println("emulator API:", err)
continue
}
parsed := struct {
AuthEmulator struct {
Ready bool `json:"ready"`
} `json:"authEmulator"`
}{}
err = json.Unmarshal(resp, &parsed)
if err != nil {
log.Println("emulator API bad response:", err)
continue
}
if parsed.AuthEmulator.Ready {
return
}
}
}
}

func (e *EmulatorClient) do(method, endpoint string, jsonBody map[string]string) ([]byte, error) {
url := "http://" + e.host
if endpoint > "" {
url += "/" + endpoint
}
var buf []byte
// handle nil jsonBody as no body
if jsonBody != nil {
buf, _ = json.Marshal(jsonBody)
}

req, err := http.NewRequest(method, url, bytes.NewBuffer(buf))
if err != nil {
return nil, err
}
req.Header.Add("content-type", "application/json")

response, err := e.client.Do(req)
if err != nil {
return nil, err
}

// Close the connection to reuse it
defer response.Body.Close()
// show the response in stdout
tee := io.TeeReader(response.Body, os.Stdout)
defer os.Stdout.WriteString("\n")

var out []byte
out, err = io.ReadAll(tee)
if err != nil {
return nil, err
}

return out, nil
}

// Get valid Firebase ID token
// Simulate Frontend client authorization logic
// Here, we use the simplest possible authorization: email/password
// Firebase Admin SDK lacks methods to create a user and get ID token
func (e *EmulatorClient) getIDToken() string {
// create a user (sign-up with dummy email/password)
endpoint := "identitytoolkit.googleapis.com/v1/accounts:signUp?key=anything_goes"
body := map[string]string{"email": "[email protected]", "password": "1q2w3e"}
resp, err := e.do(http.MethodPost, endpoint, body)
if err != nil {
log.Fatalf("emulator request error: %+v", err)
}

var parsed struct {
IdToken string `json:"idToken"`
}
err = json.Unmarshal(resp, &parsed)
if err != nil {
log.Fatalf("failed to parse output: %+v", err)
}

return parsed.IdToken
}
Loading

0 comments on commit 77116c5

Please sign in to comment.