Skip to content

Commit

Permalink
fix(relocation): Change relocation_export RPC type (getsentry#75017)
Browse files Browse the repository at this point in the history
Previously, we tried to send a raw `bytes` object. While pydantic allows
this from a type checking perspective, it fails when we try to encode
the data as a JSON payload for transport over the wire, which requires
UTF-8 encoding.

To get around this, we change the type of `encrypted_contents` to
`list[int]`, then cast the `bytes` to/from a `list[int]` when
alternating between sending over the wire and saving to a filestore.

While a change to an RPC schema like this is generally unsafe, because
this endpoint is behind an admin barrier and only being tested
internally, I feel like it is safe to do so in this case.
  • Loading branch information
azaslavsky authored Jul 25, 2024
1 parent 9894a93 commit 9f013d0
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 17 deletions.
8 changes: 5 additions & 3 deletions src/sentry/receivers/outbox/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,17 @@ def process_relocation_reply_with_export(payload: Mapping[str, Any], **kwds):
relocation_storage = get_relocation_storage()
path = f"runs/{uuid}/saas_to_saas_export/{slug}.tar"
try:
encrypted_contents = relocation_storage.open(path)
encrypted_bytes = relocation_storage.open(path)
except Exception:
raise FileNotFoundError("Could not open SaaS -> SaaS export in proxy relocation bucket.")

with encrypted_contents:
with encrypted_bytes:
region_relocation_export_service.reply_with_export(
relocation_uuid=payload["relocation_uuid"],
requesting_region_name=payload["requesting_region_name"],
replying_region_name=payload["replying_region_name"],
org_slug=payload["org_slug"],
encrypted_contents=encrypted_contents.read(),
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
encrypted_contents=None,
encrypted_bytes=[int(byte) for byte in encrypted_bytes.read()],
)
8 changes: 5 additions & 3 deletions src/sentry/receivers/outbox/region.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,19 @@ def process_relocation_reply_with_export(payload: Any, **kwds):
relocation_storage = get_relocation_storage()
path = f"runs/{uuid}/saas_to_saas_export/{slug}.tar"
try:
encrypted_contents = relocation_storage.open(path)
encrypted_bytes = relocation_storage.open(path)
except Exception:
raise FileNotFoundError(
"Could not open SaaS -> SaaS export in export-side relocation bucket."
)

with encrypted_contents:
with encrypted_bytes:
control_relocation_export_service.reply_with_export(
relocation_uuid=uuid,
requesting_region_name=payload["requesting_region_name"],
replying_region_name=payload["replying_region_name"],
org_slug=slug,
encrypted_contents=encrypted_contents.read(),
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
encrypted_contents=None,
encrypted_bytes=[int(byte) for byte in encrypted_bytes.read()],
)
24 changes: 16 additions & 8 deletions src/sentry/relocation/services/relocation_export/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def request_new_export(
"requesting_region_name": requesting_region_name,
"replying_region_name": replying_region_name,
"org_slug": org_slug,
"encrypted_contents_size": len(encrypt_with_public_key),
"encrypted_bytes_size": len(encrypt_with_public_key),
}
logger.info("SaaS -> SaaS request received in exporting region", extra=logger_data)

Expand Down Expand Up @@ -75,7 +75,9 @@ def reply_with_export(
requesting_region_name: str,
replying_region_name: str,
org_slug: str,
encrypted_contents: bytes,
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
encrypted_contents: bytes | None,
encrypted_bytes: list[int] | None = None,
) -> None:
from sentry.tasks.relocation import uploading_complete

Expand All @@ -91,7 +93,8 @@ def reply_with_export(
"requesting_region_name": requesting_region_name,
"replying_region_name": replying_region_name,
"org_slug": org_slug,
"encrypted_contents_size": len(encrypted_contents),
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
"encrypted_bytes_size": len(encrypted_bytes or []),
}
logger.info("SaaS -> SaaS reply received in triggering region", extra=logger_data)

Expand All @@ -102,7 +105,8 @@ def reply_with_export(
capture_exception(e)
return

fp = BytesIO(encrypted_contents)
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
fp = BytesIO(bytes(encrypted_bytes or []))
file = File.objects.create(name="raw-relocation-data.tar", type=RELOCATION_FILE_TYPE)
file.putfile(fp, blob_size=RELOCATION_BLOB_SIZE, logger=logger)
logger.info("SaaS -> SaaS relocation underlying File created", extra=logger_data)
Expand Down Expand Up @@ -167,22 +171,26 @@ def reply_with_export(
requesting_region_name: str,
replying_region_name: str,
org_slug: str,
encrypted_contents: bytes,
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
encrypted_contents: bytes | None,
encrypted_bytes: list[int] | None = None,
) -> None:
logger_data = {
"uuid": relocation_uuid,
"requesting_region_name": requesting_region_name,
"replying_region_name": replying_region_name,
"org_slug": org_slug,
"encrypt_with_public_key_size": len(encrypted_contents),
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
"encrypt_with_public_key_size": len(encrypted_bytes or []),
}
logger.info("SaaS -> SaaS reply received on proxy", extra=logger_data)

# Save the payload into the control silo's "relocation" GCS bucket. This bucket is only used
# for temporary storage of `encrypted_contents` being shuffled between regions like this.
# for temporary storage of `encrypted_bytes` being shuffled between regions like this.
path = f"runs/{relocation_uuid}/saas_to_saas_export/{org_slug}.tar"
relocation_storage = get_relocation_storage()
fp = BytesIO(encrypted_contents)
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
fp = BytesIO(bytes(encrypted_bytes or []))
relocation_storage.save(path, fp)
logger.info("SaaS -> SaaS export contents retrieved", extra=logger_data)

Expand Down
2 changes: 1 addition & 1 deletion src/sentry/relocation/services/relocation_export/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ class RelocationExportReplyWithExportParameters(pydantic.BaseModel):
requesting_region_name: str
replying_region_name: str
org_slug: str
# encrypted_contents excluded, as receivers are expected to manually read them from filestore.
# encrypted_bytes excluded, as receivers are expected to manually read them from filestore.
8 changes: 6 additions & 2 deletions src/sentry/relocation/services/relocation_export/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ def reply_with_export(
requesting_region_name: str,
replying_region_name: str,
org_slug: str,
encrypted_contents: bytes,
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
encrypted_contents: bytes | None,
encrypted_bytes: list[int] | None = None,
) -> None:
"""
This method is responsible for asynchronously sending an already generated and locally-saved
Expand Down Expand Up @@ -130,7 +132,9 @@ def reply_with_export(
requesting_region_name: str,
replying_region_name: str,
org_slug: str,
encrypted_contents: bytes,
# TODO(azaslavsky): finish transfer from `encrypted_contents` -> `encrypted_bytes`.
encrypted_contents: bytes | None,
encrypted_bytes: list[int] | None = None,
) -> None:
"""
This helper method is a proxy handler for the `reply_with_export` method, durably forwarding
Expand Down

0 comments on commit 9f013d0

Please sign in to comment.