Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handler stream #848

Merged
merged 41 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
bc41be7
initial helper functions for streaming functionality
DyfanJones Oct 3, 2024
1b21c17
Merge branch 'main' into handler_stream
DyfanJones Nov 11, 2024
6a9f7b7
add stream_api in operation
DyfanJones Nov 18, 2024
01b34ba
doc update
DyfanJones Nov 18, 2024
6f96af1
tidy up
DyfanJones Nov 18, 2024
6a051f4
add eventstream
DyfanJones Nov 18, 2024
fd595d1
update docs
DyfanJones Nov 18, 2024
ec8932a
switch to using normal.json instead of min.json
DyfanJones Nov 19, 2024
57c108f
Get error from stream
DyfanJones Nov 25, 2024
de74b12
httr pass stream to handlers
DyfanJones Nov 25, 2024
d452f2d
Initial stream handler
DyfanJones Nov 25, 2024
6f8b521
pass stream_api param to operation
DyfanJones Nov 25, 2024
33c9810
export stream functions
DyfanJones Nov 25, 2024
4623a12
initial dynamic streamhandler response
DyfanJones Dec 6, 2024
d2c1af2
remove deprecated code
DyfanJones Dec 6, 2024
1861fc0
initial aws stream parser
DyfanJones Dec 6, 2024
2a70b1f
prevent non valid aws event messages being passed to eventstream_parser
DyfanJones Dec 7, 2024
dafd308
move streamhandler to jsonrpc_unmarshal
DyfanJones Dec 7, 2024
a5b1623
remove custom handler
DyfanJones Dec 7, 2024
e0a5a83
deprecate old steam handler in favour of new approach
DyfanJones Dec 7, 2024
9c85760
tidy up
DyfanJones Dec 7, 2024
115583a
add streaming method for xml
DyfanJones Dec 7, 2024
e5729be
handle streaming errors
DyfanJones Dec 7, 2024
ec8c946
move to stream file
DyfanJones Dec 7, 2024
86f1f9c
add xml streaming method
DyfanJones Dec 7, 2024
4a76a8d
rename
DyfanJones Dec 7, 2024
41d6435
Add default payload size
DyfanJones Dec 8, 2024
8917d9d
remove redundant code
DyfanJones Dec 8, 2024
17e7509
reexport paws_stream_praser to all category packages
DyfanJones Dec 8, 2024
0c068b2
update comment
DyfanJones Dec 8, 2024
1207131
move unit test to test_stream
DyfanJones Dec 9, 2024
8405cb2
test stream functionality
DyfanJones Dec 9, 2024
bcc24cf
initial stream documentation for website
DyfanJones Dec 9, 2024
117fa35
revert paws regen
DyfanJones Dec 9, 2024
02e6620
bump paws.common feature version for paws regen
DyfanJones Dec 9, 2024
89e5f7a
add abstract function
DyfanJones Dec 9, 2024
914f1dd
add deleted function
DyfanJones Dec 9, 2024
84d7cc5
rename file
DyfanJones Dec 9, 2024
c164200
typo
DyfanJones Dec 9, 2024
d0a65d0
add eventstream to json file dummy
DyfanJones Dec 9, 2024
1db1cae
add stream_api
DyfanJones Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions docs/streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Streaming

As of `paws v-0.8.0+` streaming is supported in `paws`.


## Basic usage:

Example taken from: https://docs.aws.amazon.com/code-library/latest/ug/python_3_bedrock-runtime_code_examples.html


### Internal function:

Paws allows for a function to be passed to the returning stream for processing.

```r
library(paws)

client <- bedrockruntime()

# Set the model ID, e.g., Titan Text Premier.
model_id <- "amazon.titan-text-premier-v1:0"

# Start a conversation with the user message.
user_message <- "Describe the purpose of a 'hello world' program in one line."
conversation <- list(
list(
role = "user",
content = list(list(text= user_message)),
)
)

resp <- client.converse_stream(
modelId=model_id,
messages=conversation,
inferenceConfig=list(maxTokens = 512, temperature = 0.5, topP = 0.9)
)

resp$stream(\(chunk) chunk$contentBlockDelta$delta$text)
```

### paws_connection:

`paws` allows for the raw connection to be retrieved. The connection is a sub class of `httr2::httr2_response` class.
This allows paws_connection to be handle both a paws parser or httr2 stream parser.

```r
library(paws)

client <- bedrockruntime()

# Set the model ID, e.g., Titan Text Premier.
model_id <- "amazon.titan-text-premier-v1:0"

# Start a conversation with the user message.
user_message <- "Describe the purpose of a 'hello world' program in one line."
conversation <- list(
list(
role = "user",
content = list(list(text= user_message)),
)
)

resp <- client.converse_stream(
modelId=model_id,
messages=conversation,
inferenceConfig=list(maxTokens = 512, temperature = 0.5, topP = 0.9)
)

con <- resp$stream(.connection = TRUE)

while(!is.null(chunk <- paws_stream_parser(con))) {
print(chunk$contentBlockDelta$delta$text)
}
```

Note: the paws_stream_parser return the stream in the response syntax. In this case please check https://paws-r.github.io/docs/bedrockruntime_converse_stream/

For full flexibility you can use [httr2::resp_stream_aws](https://httr2.r-lib.org/reference/req_perform_stream.html?search-input=resp_stream_aws) to get the raw response from AWS.

```r
library(paws)

client <- bedrockruntime()

# Set the model ID, e.g., Titan Text Premier.
model_id <- "amazon.titan-text-premier-v1:0"

# Start a conversation with the user message.
user_message <- "Describe the purpose of a 'hello world' program in one line."
conversation <- list(
list(
role = "user",
content = list(list(text= user_message)),
)
)

resp <- client.converse_stream(
modelId=model_id,
messages=conversation,
inferenceConfig=list(maxTokens = 512, temperature = 0.5, topP = 0.9)
)

con <- resp$stream(.connection = TRUE)
repeat{
event <- resp_stream_aws(con)
if (is.null(event)) {
close(con)
break
}

str(event)
}
```
2 changes: 1 addition & 1 deletion make.paws/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: make.paws
Type: Package
Title: Generate Paws AWS SDKs for R
Version: 0.9.1
Version: 0.9.2
Authors@R: c(
person("David", "Kretch", email = "[email protected]", role = "aut"),
person("Adam", "Banker", email = "[email protected]", role = "aut"),
Expand Down
2 changes: 1 addition & 1 deletion make.paws/R/cran_category.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#' @include package.R service.R
NULL

.paws.common.import.version <- "paws.common (>= 0.7.5)"
.paws.common.import.version <- "paws.common (>= 0.8.0)"

# Make all category-level packages.
make_categories <- function(sdk_dir, out_dir, categories, service_names) {
Expand Down
3 changes: 2 additions & 1 deletion make.paws/R/custom/s3.R
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ s3_download_file <- function(Bucket, Key, Filename, IfMatch = NULL, IfModifiedSi
name = "GetObject",
http_method = "GET",
http_path = "/{Bucket}/{Key+}",
paginator = list()
paginator = list(),
stream_api = FALSE
)
input <- .s3$get_object_input(Bucket = Bucket, IfMatch = IfMatch, IfModifiedSince = IfModifiedSince, IfNoneMatch = IfNoneMatch, IfUnmodifiedSince = IfUnmodifiedSince, Key = Key, Range = Range, ResponseCacheControl = ResponseCacheControl, ResponseContentDisposition = ResponseContentDisposition, ResponseContentEncoding = ResponseContentEncoding, ResponseContentLanguage = ResponseContentLanguage, ResponseContentType = ResponseContentType, ResponseExpires = ResponseExpires, VersionId = VersionId, SSECustomerAlgorithm = SSECustomerAlgorithm, SSECustomerKey = SSECustomerKey, SSECustomerKeyMD5 = SSECustomerKeyMD5, RequestPayer = RequestPayer, PartNumber = PartNumber, ExpectedBucketOwner = ExpectedBucketOwner)
output <- .s3$get_object_output()
Expand Down
10 changes: 8 additions & 2 deletions make.paws/R/operations.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ operation_template <- template(
http_method = ${http_method},
http_path = ${http_path},
host_prefix = ${host_prefix},
paginator = ${paginator}
paginator = ${paginator},
stream_api = ${stream_api}
)
input <- .${service}$${operation_input}
output <- .${service}$${operation_output}
Expand Down Expand Up @@ -66,7 +67,8 @@ make_operation <- function(operation, api, doc_maker) {
http_method = quoted(operation$http$method),
http_path = quoted(operation$http$requestUri),
host_prefix = quoted(operation[["endpoint"]][["hostPrefix"]] %||% ""),
paginator = set_paginator(operation$paginators)
paginator = set_paginator(operation$paginators),
stream_api = set_stream_api(operation)
)
}

Expand All @@ -91,6 +93,10 @@ set_paginator <- function(paginator) {
}
}

set_stream_api <- function(operation) {
as.character(operation$eventstream %||% FALSE)
}

# Override operation name from extdata/operation_name_override.yml
operation_name_override <- function(operation_name) {
path <- system_file(
Expand Down
35 changes: 19 additions & 16 deletions make.paws/R/process_api.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ TEST_DIR <- "tests/testthat"
#'
#' @keywords internal
make_sdk_for_api <- function(api_name, in_dir) {
result <- list()
api <- read_api(api_name, in_dir)
result$name <- package_name(api)
result$code <- make_code_files(api)
result$tests <- make_tests_files(api)
result$docs <- make_docs_files(api)
result <- list(
name = package_name(api),
code = make_code_files(api),
tests = make_tests_files(api),
docs = make_docs_files(api)
)
return(result)
}

#-------------------------------------------------------------------------------

# Write code for a given API.
make_code_files <- function(api) {
result <- list()
result$operations <- make_operations_files(api, doc_maker = make_docs_short)
result$interfaces <- make_interfaces_files(api)
result$service <- make_service_files(api)
result$custom <- make_custom_operations_files(api)
result$reexports <- make_reexports()
result <- list(
operations = make_operations_files(api, doc_maker = make_docs_short),
interfaces = make_interfaces_files(api),
service = make_service_files(api),
custom = make_custom_operations_files(api),
reexports = make_reexports()
)
return(result)
}

Expand Down Expand Up @@ -90,11 +92,12 @@ make_reexports <- function() {
}

make_docs_files <- function(api) {
result <- list()
result$operations <- make_operations_files(api, doc_maker = make_docs_long)
result$service <- make_service_files(api)
result$custom <- make_custom_operations_files(api)
result$reexports <- make_reexports()
result <- list(
operations = make_operations_files(api, doc_maker = make_docs_long),
service = make_service_files(api),
custom = make_custom_operations_files(api),
reexports = make_reexports()
)
return(result)
}

Expand Down
21 changes: 21 additions & 0 deletions make.paws/R/read_api.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Read a given API's definition and documentation files.
# aws-sdk-js deprecated and apis is not being updated
# TODO: short term migrate to botocore jsons
read_api <- function(api_name, path) {
api_path <- file.path(path, "apis")
region_config_path <- file.path(path, "lib/region_config_data.json")
Expand All @@ -18,6 +20,7 @@ read_api <- function(api_name, path) {
paginators <- jsonlite::read_json(files$paginators)
api <- merge_paginators(api, paginators$pagination)
}
api <- merge_eventstream(api)
region_config <- jsonlite::read_json(region_config_path)
api <- merge_region_config(api, region_config)
api <- fix_region_config(api)
Expand Down Expand Up @@ -62,6 +65,24 @@ merge_paginators <- function(api, paginators) {
return(api)
}

merge_eventstream <- function(api) {
flat_shape <- unlist(api$shapes)
eventstream <- flat_shape[endsWith(names(flat_shape),"eventstream")]
names(eventstream) <- stringr::str_extract(names(eventstream), "([a-zA-Z]+)")

shape <- flat_shape[endsWith(names(flat_shape), "shape")]
shape <- shape[shape %in% names(eventstream)]
names(shape) <- gsub(
"Output$|Response$", "", stringr::str_extract(names(shape), "([a-zA-Z]+)")
)
names(eventstream) <- names(shape)

for (nms in names(eventstream)) {
api$operations[[nms]]$eventstream <- eventstream[nms]
}
return(api)
}

# Returns an API object with region config info attached. Region config info
# lists endpoints for each service and region, if different from the default.
merge_region_config <- function(api, region_config) {
Expand Down
4 changes: 4 additions & 0 deletions make.paws/inst/templates/reexports_paws.common.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ paws.common::credentials
#' @importFrom paws.common creds
#' @export
paws.common::creds

#' @importFrom paws.common paws_stream_parser
#' @export
paws.common::paws_stream_parser
3 changes: 2 additions & 1 deletion make.paws/tests/testthat/test_operations.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ test_that("make_operation", {
http_method = \"POST\",
http_path = \"/abc\",
host_prefix = \"\",
paginator = list()
paginator = list(),
stream_api = FALSE
)
input <- .api$operation_input(Input1 = Input1, Input2 = Input2, Input3 = Input3)
output <- .api$operation_output()
Expand Down
5 changes: 4 additions & 1 deletion make.paws/tests/testthat/test_read_api.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ test_that("read_api", {

write_json(list(foo = "examples"), file.path(api_path, "foo-2018-11-01.examples.json"))
write_json(list(foo = "min"), file.path(api_path, "foo-2018-11-01.min.json"))
write_json(list(foo = "normal", name = "foo", metadata = list(endpointPrefix = "baz")), file.path(api_path, "foo-2018-11-01.normal.json"))
write_json(
list(foo = "normal", name = "foo", metadata = list(endpointPrefix = "baz"), shapes = list(foo = list(eventstream = "TRUE"))),
file.path(api_path, "foo-2018-11-01.normal.json")
)
write_json(list(foo = "paginators"), file.path(api_path, "foo-2018-11-01.paginators.json"))

write_json(list(foo = "wrong1"), file.path(api_path, "foo-2017-11-01.examples.json"))
Expand Down
8 changes: 4 additions & 4 deletions paws.common/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: paws.common
Type: Package
Title: Paws Low-Level Amazon Web Services API
Version: 0.7.7.9000
Version: 0.8.0
Authors@R: c(
person("David", "Kretch", email = "[email protected]", role = "aut"),
person("Adam", "Banker", email = "[email protected]", role = "aut"),
Expand Down Expand Up @@ -64,17 +64,17 @@ Collate:
'head_bucket.R'
'http_status.R'
'error.R'
'tags.R'
'xmlutil.R'
'stream.R'
'custom_s3.R'
'handlers_core.R'
'handlers_ec2query.R'
'handlers_jsonrpc.R'
'handlers_query.R'
'handlers_rest.R'
'handlers_restjson.R'
'tags.R'
'xmlutil.R'
'handlers_restxml.R'
'handlers_stream.R'
'idempotency.R'
'jsonutil.R'
'onLoad.R'
Expand Down
5 changes: 4 additions & 1 deletion paws.common/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ S3method(is_empty_xml,character)
S3method(is_empty_xml,default)
S3method(is_empty_xml,list)
S3method(is_empty_xml,raw)
S3method(print,PawsStreamHandler)
export(config)
export(credentials)
export(creds)
Expand All @@ -31,6 +32,7 @@ export(paginate_lapply)
export(paginate_sapply)
export(paws_config_log)
export(paws_reset_cache)
export(paws_stream_parser)
export(populate)
export(send_request)
export(set_config)
Expand All @@ -42,9 +44,10 @@ export(tag_has)
export(type)
importFrom(Rcpp,evalCpp)
importFrom(curl,curl_unescape)
importFrom(httr2,req_body_raw)
importFrom(digest,digest)
importFrom(httr2,req_options)
importFrom(httr2,req_perform)
importFrom(httr2,req_perform_connection)
importFrom(httr2,request)
importFrom(stats,runif)
importFrom(utils,flush.console)
Expand Down
5 changes: 4 additions & 1 deletion paws.common/NEWS.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# paws.common 0.7.7.9000
# paws.common 0.8.0
* migrate backend from httr to httr2
* enrich sso message (#844). Thanks to @hadley for raising issue.
* attempt to automatically set/refresh `sso` credentials by calling `aws cli` (#844)
* moved api log level to `debug` and `trace`. This is to prevent `info` level being saturated by api calls.
* migrate backend `httr` to `httr2`
* new `PawsStreamHandler`, allows paws to handle aws stream event (#842). Thankyou to @hadley for developing the initial solution in `httr2`.
* deprecated custom handler for `s3_unmarshal_select_object_content`

# paws.common 0.7.7
* fix unix time expiration check
Expand Down
Loading
Loading