Skip to content

Commit

Permalink
Fix Type Error while using DynamoDBToS3Operator (apache#28158)
Browse files Browse the repository at this point in the history
  • Loading branch information
snjypl authored Dec 6, 2022
1 parent 39f501d commit 0d90c62
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 2 deletions.
13 changes: 12 additions & 1 deletion airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import json
from copy import copy
from decimal import Decimal
from os.path import getsize
from tempfile import NamedTemporaryFile
from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
Expand All @@ -36,8 +37,18 @@
from airflow.utils.context import Context


class JSONEncoder(json.JSONEncoder):
"""Custom json encoder implementation"""

def default(self, obj):
"""Convert decimal objects in a json serializable format."""
if isinstance(obj, Decimal):
return float(obj)
return super().default(obj)


def _convert_item_to_json_bytes(item: dict[str, Any]) -> bytes:
return (json.dumps(item) + "\n").encode("utf-8")
return (json.dumps(item, cls=JSONEncoder) + "\n").encode("utf-8")


def _upload_file_to_s3(
Expand Down
43 changes: 42 additions & 1 deletion tests/providers/amazon/aws/transfers/test_dynamodb_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,21 @@

import json
import unittest
from decimal import Decimal
from unittest.mock import MagicMock, patch

from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator
from airflow.providers.amazon.aws.transfers.dynamodb_to_s3 import DynamoDBToS3Operator, JSONEncoder


class JSONEncoderTest(unittest.TestCase):
def test_jsonencoder_with_decimal(self):
"""Test JSONEncoder correctly encodes and decodes decimal values."""

for i in ["102938.3043847474", 1.010001, 10, "100", "1E-128", 1e-128]:
org = Decimal(i)
encoded = json.dumps(org, cls=JSONEncoder)
decoded = json.loads(encoded, parse_float=Decimal)
self.assertAlmostEqual(decoded, org)


class DynamodbToS3Test(unittest.TestCase):
Expand Down Expand Up @@ -65,6 +77,35 @@ def test_dynamodb_to_s3_success(self, mock_aws_dynamodb_hook, mock_s3_hook):

assert [{"a": 1}, {"b": 2}, {"c": 3}] == self.output_queue

@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.S3Hook")
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBHook")
def test_dynamodb_to_s3_success_with_decimal(self, mock_aws_dynamodb_hook, mock_s3_hook):
a = Decimal(10.028)
b = Decimal("10.048")
responses = [
{
"Items": [{"a": a}, {"b": b}],
}
]
table = MagicMock()
table.return_value.scan.side_effect = responses
mock_aws_dynamodb_hook.return_value.get_conn.return_value.Table = table

s3_client = MagicMock()
s3_client.return_value.upload_file = self.mock_upload_file
mock_s3_hook.return_value.get_conn = s3_client

dynamodb_to_s3_operator = DynamoDBToS3Operator(
task_id="dynamodb_to_s3",
dynamodb_table_name="airflow_rocks",
s3_bucket_name="airflow-bucket",
file_size=4000,
)

dynamodb_to_s3_operator.execute(context={})

assert [{"a": float(a)}, {"b": float(b)}] == self.output_queue

@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.S3Hook")
@patch("airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBHook")
def test_dynamodb_to_s3_with_different_aws_conn_id(self, mock_aws_dynamodb_hook, mock_s3_hook):
Expand Down

0 comments on commit 0d90c62

Please sign in to comment.