Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handler stream #848

Merged
merged 41 commits into from
Dec 9, 2024
Merged
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
bc41be7
initial helper functions for streaming functionality
DyfanJones Oct 3, 2024
1b21c17
Merge branch 'main' into handler_stream
DyfanJones Nov 11, 2024
6a9f7b7
add stream_api in operation
DyfanJones Nov 18, 2024
01b34ba
doc update
DyfanJones Nov 18, 2024
6f96af1
tidy up
DyfanJones Nov 18, 2024
6a051f4
add eventstream
DyfanJones Nov 18, 2024
fd595d1
update docs
DyfanJones Nov 18, 2024
ec8932a
switch to using normal.json instead of min.json
DyfanJones Nov 19, 2024
57c108f
Get error from stream
DyfanJones Nov 25, 2024
de74b12
httr pass stream to handlers
DyfanJones Nov 25, 2024
d452f2d
Initial stream handler
DyfanJones Nov 25, 2024
6f8b521
pass stream_api param to operation
DyfanJones Nov 25, 2024
33c9810
export stream functions
DyfanJones Nov 25, 2024
4623a12
initial dynamic streamhandler response
DyfanJones Dec 6, 2024
d2c1af2
remove deprecated code
DyfanJones Dec 6, 2024
1861fc0
initial aws stream parser
DyfanJones Dec 6, 2024
2a70b1f
prevent non valid aws event messages being passed to eventstream_parser
DyfanJones Dec 7, 2024
dafd308
move streamhandler to jsonrpc_unmarshal
DyfanJones Dec 7, 2024
a5b1623
remove custom handler
DyfanJones Dec 7, 2024
e0a5a83
deprecate old steam handler in favour of new approach
DyfanJones Dec 7, 2024
9c85760
tidy up
DyfanJones Dec 7, 2024
115583a
add streaming method for xml
DyfanJones Dec 7, 2024
e5729be
handle streaming errors
DyfanJones Dec 7, 2024
ec8c946
move to stream file
DyfanJones Dec 7, 2024
86f1f9c
add xml streaming method
DyfanJones Dec 7, 2024
4a76a8d
rename
DyfanJones Dec 7, 2024
41d6435
Add default payload size
DyfanJones Dec 8, 2024
8917d9d
remove redundant code
DyfanJones Dec 8, 2024
17e7509
reexport paws_stream_praser to all category packages
DyfanJones Dec 8, 2024
0c068b2
update comment
DyfanJones Dec 8, 2024
1207131
move unit test to test_stream
DyfanJones Dec 9, 2024
8405cb2
test stream functionality
DyfanJones Dec 9, 2024
bcc24cf
initial stream documentation for website
DyfanJones Dec 9, 2024
117fa35
revert paws regen
DyfanJones Dec 9, 2024
02e6620
bump paws.common feature version for paws regen
DyfanJones Dec 9, 2024
89e5f7a
add abstract function
DyfanJones Dec 9, 2024
914f1dd
add deleted function
DyfanJones Dec 9, 2024
84d7cc5
rename file
DyfanJones Dec 9, 2024
c164200
typo
DyfanJones Dec 9, 2024
d0a65d0
add eventstream to json file dummy
DyfanJones Dec 9, 2024
1db1cae
add stream_api
DyfanJones Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
initial helper functions for streaming functionality
  • Loading branch information
DyfanJones committed Oct 3, 2024
commit bc41be75c1987448ed79ea6fddb0b3f285554f0e
146 changes: 146 additions & 0 deletions paws.common/R/handler_stream.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
big_endian <- function(vec, dtype) {
switch(
dtype,
"int64" = c(
vec[8:1], vec[16:9], vec[24:17], vec[32:25], vec[40:33], vec[48:41], vec[56:49], vec[64:57]
),
"int32" = c(vec[8:1], vec[16:9], vec[24:17], vec[32:25]),
"int16" = c(vec[8:1], vec[16:9]),
"int8" = vec[8:1]
)
}

int_to_uint <- function (x, adjustment=2^32) {
if (sign(x) < 0) {
return(x + adjustment)
}
return(x)
}

# Convert raw vector into integers with big-endian
int64 <- function(x) {
bits <- as.integer(big_endian(rawToBits(x), "int64"))
sum(bits[-1] * 2^(62:0)) - bits[[1]] * 2^63
}

int32 <- function(x) {
bits <- as.integer(big_endian(rawToBits(x), "int32"))
sum(bits[-1] * 2^(30:0)) - bits[[1]] * 2^31
}

int16 <- function(x) {
bits <- as.integer(big_endian(rawToBits(x), "int16"))
sum(bits[-1] * 2^(14:0)) - bits[[1]] * 2^15
}

int8 <- function(x) {
bits <- as.integer(big_endian(rawToBits(x), "int8"))
sum(bits[-1] * 2^(6:0)) - bits[[1]] * 2^7
}

# Converts raw vector into unsigned integers with big-endian
uint64 <- function(x) {
int_to_uint(int64(x), 2^64)
}

uint32 <- function(x) {
int_to_uint(readBin(x, "integer", n=length(x), size = 4, endian = "big"))
}

uint16 <- function(x) {
readBin(x, "integer", n=length(x), size=2, signed = F, endian = "big")
}

uint8 <- function(x) {
readBin(x, "integer", n=length(x), size=1, signed = F, endian = "big")
}



###############

unpack <- function(data, length_byte_size){
uint <- switch(as.character(length_byte_size),
"1" = uint8,
"2" = uint16,
"4" = uint32
)
uint(data)
}

unpack_byte_array <- function(data, length_byte_size=2){
length <- unpack(data[1:length_byte_size], length_byte_size)
bytes_end <- length + length_byte_size
array_bytes <- data[length_byte_size:bytes_end]
return(list(array_bytes, bytes_end))
}

unpack_utf8_string <- function(data, length_byte_size = 2) {
info <- unpack_byte_array(data, length_byte_size)
return(list(paws.common:::raw_to_utf8(info[[1]]), info[[2]]))
}

parse_header <- function(data) {
info <- parse_name(data)
return(parse_value(info[[1]]))
}

parse_name <- function(data) {
info <- unpack_utf8_string(data, 1)
data <- advance_data(data, info[[2]])
return(list(data, info[[1]]))
}

parse_type <- function(data) {
type <- uint8(data[1])
data <- advance_data(data, 1)
return(list(data, type))
}

parse_value <- function(data) {
info <- parse_type(data)
value_unpacker <- HEADER_TYPE_MAP[[as.character(info[[2]])]]
info <- value_unpacker(data)
data <- advance_data(info[[1]], info[[2]])
return(list(data, info[[2]]))
}

unpack_uuid <- function(data) {
return(list(data[1:16], 16))
}

unpack_prelude <- function(data) {
return(c(uint32(data[1:4]), uint32(data[5:8]), uint32(data[9:12])))
}

advance_data <- function(data, consumed) {
return(data[(consumed + 1): length(data)])
}

HEADER_TYPE_MAP = list(
# boolean_true
"0"= function(data) list(T, 0),
# boolean_false
"1"= function(data) list(F, 0),
# byte
"2"= function(data) list(int8(data[1]), 1),
# short
"3"= function(data) list(int16(data[1:2]), 2),
# integer
"4"= function(data) list(int32(data[1:4]), 4),
# long
"5"= function(data) list(int64(data[1:8]), 8),
# byte_array
"6"= unpack_byte_array,
# string
"7"= unpack_utf8_string,
# timestamp
"8"= function(data) list(int64(data[1:8]), 8),
# uuid
"9"= unpack_uuid
)