Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDO-157 Combined all commits of branch tg-cdo-157 #360

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(
max_chunk_size: int = 100,
process_chunks_in_parallel: bool = True,
maximum_concurrent_tasks: int = 100,
use_uuid_for_id_above: bool = False,
) -> None:
"""
Transformer to call and receive FHIR resources from a FHIR server
Expand Down Expand Up @@ -152,6 +153,8 @@ def __init__(
:param graph_json: (Optional) a FHIR GraphDefinition resource to use for retrieving data
:param run_synchronously: (Optional) Run on the Spark master to make debugging easier on dev machines
:param refresh_token_function: (Optional) function to refresh the token
:param use_uuid_for_id_above: (boolean) flag indicating whether to use UUIDs for the "id:above" parameter when
fetching next resources.
"""
super().__init__(
name=name, parameters=parameters, progress_logger=progress_logger
Expand Down Expand Up @@ -417,6 +420,11 @@ def __init__(
)
self._setDefault(maximum_concurrent_tasks=maximum_concurrent_tasks)

self.use_uuid_for_id_above: Param[bool] = Param(
self, "use_uuid_for_id_above", ""
)
self._setDefault(use_uuid_for_id_above=use_uuid_for_id_above)

kwargs = self._input_kwargs
self.setParams(**kwargs)

Expand Down Expand Up @@ -526,6 +534,7 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
self.process_chunks_in_parallel
)
maximum_concurrent_tasks: int = self.getOrDefault(self.maximum_concurrent_tasks)
use_uuid_for_id_above: bool = self.getOrDefault(self.use_uuid_for_id_above)

if parameters and parameters.get("flow_name"):
user_agent_value = (
Expand Down Expand Up @@ -624,6 +633,7 @@ async def _transform_async(self, df: DataFrame) -> DataFrame:
log_level=log_level,
maximum_concurrent_tasks=maximum_concurrent_tasks,
),
use_uuid_for_id_above=use_uuid_for_id_above,
)

if id_view:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class FhirReceiverParameters:
use_id_above_for_paging: Optional[bool]
pandas_udf_parameters: AsyncPandasUdfParameters
refresh_token_function: Optional[RefreshTokenFunction] = None
use_uuid_for_id_above: bool = False

def set_additional_parameters(
self, additional_parameters: List[str] | None
Expand Down Expand Up @@ -102,4 +103,5 @@ def clone(self) -> "FhirReceiverParameters":
refresh_token_function=self.refresh_token_function,
use_id_above_for_paging=self.use_id_above_for_paging,
pandas_udf_parameters=self.pandas_udf_parameters,
use_uuid_for_id_above=self.use_uuid_for_id_above,
)
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,8 @@ async def get_batch_results_paging_async(
next_url: Optional[str] = result.next_url
next_uri: furl = furl(next_url)
additional_parameters = [
f"{k}={v}" for k, v in next_uri.args.items()
f"{k}={v}"
for k, v in next_uri.query.params.allitems()
]
# remove any entry for id:above
additional_parameters = list(
Expand Down Expand Up @@ -690,6 +691,7 @@ async def get_batch_result_streaming_async(
last_updated_before: Optional[datetime],
parameters: FhirReceiverParameters,
server_url: Optional[str],
limit: Optional[int] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
assert server_url
result: FhirGetResponse
Expand All @@ -699,40 +701,112 @@ async def get_batch_result_streaming_async(
parameters.log_level if parameters and parameters.log_level else "INFO"
),
)
async for result in FhirReceiverProcessor.send_fhir_request_async(
logger=logger,
parameters=parameters,
resource_id=None,
server_url=server_url,
last_updated_after=last_updated_after,
last_updated_before=last_updated_before,
):
try:
batch_result1: GetBatchResult = (
FhirReceiverProcessor.read_resources_and_errors_from_response(
response=result
)
)
yield dataclasses.asdict(batch_result1)
except JSONDecodeError as e:
if parameters.error_view:
error = GetBatchError(
url=result.url,
status_code=result.status,
error_text=str(e) + " : " + result.responses,
request_id=result.request_id,
)
yield dataclasses.asdict(
GetBatchResult(resources=[], errors=[error])
# Iterate till we have next page. We are using cursor based pagination here so next page will have records
# with id greater than previous page last record id.
has_next_page: bool = True
additional_parameters = parameters.additional_parameters or []
count: int = 0
while has_next_page:
async for result in FhirReceiverProcessor.send_fhir_request_async(
logger=logger,
resource_id=None,
server_url=server_url,
page_size=limit,
last_updated_after=last_updated_after,
last_updated_before=last_updated_before,
parameters=parameters.clone().set_additional_parameters(
additional_parameters
),
):
resources: List[str] = []
errors: List[GetBatchError] = []
result_response: List[str] = []
try:
batch_result1: GetBatchResult = (
FhirReceiverProcessor.read_resources_and_errors_from_response(
response=result
)
)
result_response = batch_result1.resources
errors = batch_result1.errors
except JSONDecodeError as e:
if parameters.error_view:
errors.append(
GetBatchError(
url=result.url,
status_code=result.status,
error_text=str(e) + " : " + result.responses,
request_id=result.request_id,
)
)
else:
raise FhirParserException(
url=result.url,
message="Parsing result as json failed",
json_data=result.responses,
response_status_code=result.status,
request_id=result.request_id,
) from e
if len(result_response) > 0:
resources = resources + result_response
count += len(resources)
# get id of last resource
json_resources: List[Dict[str, Any]] = [
json.loads(r) for r in result_response
]
if isinstance(json_resources, list):
if len(json_resources) > 0: # received any resources back
last_json_resource = json_resources[-1]
id_of_last_resource = None
if parameters.use_uuid_for_id_above:
for identifier in last_json_resource.get(
"identifier", []
):
if identifier.get("id") == "uuid":
id_of_last_resource = identifier.get("value")
elif "id" in last_json_resource:
id_of_last_resource = last_json_resource["id"]
if id_of_last_resource:
# use id:above to optimize the next query and remove any entry for id:above
additional_parameters = list(
filter(
lambda x: not x.startswith("id:above"),
additional_parameters,
)
)
additional_parameters.append(
f"id:above={id_of_last_resource}"
)
else:
has_next_page = False
if limit and 0 < limit <= count:
has_next_page = False
elif result.status == 200:
# no resources returned but status is 200, so we're done
has_next_page = False
else:
raise FhirParserException(
url=result.url,
message="Parsing result as json failed",
json_data=result.responses,
response_status_code=result.status,
request_id=result.request_id,
) from e
if result.status == 404:
yield dataclasses.asdict(
GetBatchResult(resources=[], errors=[])
)
elif result.status not in parameters.ignore_status_codes:
raise FhirReceiverException(
url=result.url,
json_data=result.responses,
response_text=result.responses,
response_status_code=result.status,
message=(
"Error received from server"
if len(errors) == 0
else "\n".join([e.error_text for e in errors])
),
request_id=result.request_id,
)
has_next_page = False

yield dataclasses.asdict(
GetBatchResult(resources=resources, errors=errors)
)

@staticmethod
def read_resources_and_errors_from_response(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ async def get_batch_result_streaming_dataframe_async(
parameters: FhirReceiverParameters,
server_url: Optional[str],
results_per_batch: Optional[int],
limit: Optional[int] = None,
) -> DataFrame:
"""
Converts the results from a batch streaming request to a DataFrame iteratively using
Expand All @@ -202,6 +203,7 @@ async def get_batch_result_streaming_dataframe_async(
:param parameters: FhirReceiverParameters
:param server_url: Optional[str]
:param results_per_batch: int
:param limit: int
:return: DataFrame
"""
return await AsyncHelper.async_generator_to_dataframe(
Expand All @@ -211,6 +213,7 @@ async def get_batch_result_streaming_dataframe_async(
last_updated_before=last_updated_before,
parameters=parameters,
server_url=server_url,
limit=limit,
),
schema=schema,
results_per_batch=results_per_batch,
Expand Down Expand Up @@ -267,6 +270,7 @@ async def get_all_resources_async(
last_updated_before=last_updated_before,
mode=mode,
parameters=parameters,
limit=limit,
)
)
else:
Expand Down Expand Up @@ -385,6 +389,7 @@ async def get_all_resources_streaming_async(
last_updated_before: Optional[datetime],
mode: str,
parameters: FhirReceiverParameters,
limit: Optional[int] = None,
) -> DataFrame:
"""
Get all resources from the FHIR server based on the resourceType and any additional query parameters
Expand All @@ -398,6 +403,7 @@ async def get_all_resources_streaming_async(
:param last_updated_before: only get records older than this
:param mode: if output files exist, should we overwrite or append
:param parameters: the parameters
:param limit: maximum number of resources to get
:return: the data frame
"""
list_df: DataFrame = (
Expand All @@ -409,6 +415,7 @@ async def get_all_resources_streaming_async(
last_updated_before=last_updated_before,
schema=GetBatchResult.get_schema(),
results_per_batch=batch_size,
limit=limit,
)
)
resource_df = list_df.select(explode(col("resources")).alias("resource"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async def test_get_batch_result_streaming_async() -> None:
# Mock the HTTP request
with aioresponses() as m:
m.get(f"{server_url}/Patient", payload=mock_response_data)
m.get(f"{server_url}/Patient?id:above=1", payload=[])

# Call the function
results: List[Dict[str, Any]] = []
Expand All @@ -67,7 +68,7 @@ async def test_get_batch_result_streaming_async() -> None:
results.append(result)

# Assert the results
assert len(results) == 1
assert len(results) == 2
assert results[0] == expected_result.to_dict()


Expand Down Expand Up @@ -111,6 +112,7 @@ async def test_get_batch_result_streaming_async_single_result() -> None:
# Mock the HTTP request
with aioresponses() as m:
m.get(f"{server_url}/Patient", payload=mock_response_data)
m.get(f"{server_url}/Patient?id:above=1", payload=[])

# Call the function
results: List[Dict[str, Any]] = []
Expand All @@ -123,7 +125,7 @@ async def test_get_batch_result_streaming_async_single_result() -> None:
results.append(result)

# Assert the results
assert len(results) == 1
assert len(results) == 2
assert results[0] == expected_result.to_dict()


Expand Down Expand Up @@ -182,6 +184,7 @@ async def test_get_batch_result_streaming_async_multiple_results() -> None:
# Mock the HTTP request
with aioresponses() as m:
m.get(f"{server_url}/Patient", payload=mock_response_data)
m.get(f"{server_url}/Patient?id:above=2", payload=[])

# Call the function
results: List[Dict[str, Any]] = []
Expand All @@ -194,7 +197,8 @@ async def test_get_batch_result_streaming_async_multiple_results() -> None:
results.append(result)

# Assert the results
assert len(results) == 1
assert len(results) == 2
# Asserting the first dictionary only, since the second one is empty.
assert results[0] == expected_results[0].to_dict()


Expand Down
Loading
Loading