Skip to content

Commit

Permalink
Add max_batch_age and max_batch_size optional config params
Browse files Browse the repository at this point in the history
  • Loading branch information
rstml committed Nov 25, 2023
1 parent 521b59f commit 99f1a36
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 4 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com).
"append_date_to_filename": true|false,
"append_date_to_filename_grain": "microsecond",
"flattening_enabled": true|false,
"flattening_max_depth": int
"flattening_max_depth": int,
"max_batch_age": int,
"max_batch_size": int
}
```
`format.format_parquet.validate` [`Boolean`, default: `False`] - this flag determines whether the data types of incoming data elements should be validated. When set `True`, a schema is created from the first record and all subsequent records that don't match that data type are cast.
Expand Down
4 changes: 4 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ plugins:
- name: flatten_records
kind: boolean
value: false
- name: max_batch_age
value: 5
- name: max_batch_size
value: 10000

4 changes: 3 additions & 1 deletion sample-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@
"append_date_to_filename": true,
"append_date_to_filename_grain": "microsecond",
"flattening_enabled": false,
"flattening_max_depth": 1
"flattening_max_depth": 1,
"max_batch_age": 5,
"max_batch_size": 10000
}
11 changes: 9 additions & 2 deletions target_s3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
class s3Sink(BatchSink):
"""s3 target sink class."""

MAX_SIZE = 10000 # Max records to write in one batch

def __init__(
self,
target: any,
Expand All @@ -39,6 +37,15 @@ def __init__(
else:
raise Exception("No file type supplied.")

@property
def max_size(self) -> int:
"""Get maximum batch size.
Returns:
Maximum batch size
"""
return self.config.get("batch_size", 10000)

def process_batch(self, context: dict) -> None:
"""Write out any prepped records and return once fully written."""
# add stream name to context
Expand Down
18 changes: 18 additions & 0 deletions target_s3/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,28 @@ class Targets3(Target):
allowed_values=DATE_GRAIN.keys(),
default="day",
),
th.Property(
"max_batch_age",
th.NumberType,
description="Maximum time in minutes between state messages when records are streamed in.",
required=False,
default=5.0,
),
th.Property(
"max_batch_size",
th.IntegerType,
description="Maximum size of batches when records are streamed in.",
required=False,
default=10000,
),
).to_dict()

default_sink_class = s3Sink

@property
def _MAX_RECORD_AGE_IN_MINUTES(self) -> float: # type: ignore
return float(self.config.get("max_batch_age", 5.0))

def deserialize_json(self, line: str) -> dict:
"""Override base target's method to overcome Decimal cast,
only applied when generating parquet schema from tap schema.
Expand Down

0 comments on commit 99f1a36

Please sign in to comment.