Skip to content

Commit

Permalink
Allow retrying proxied requests (bluesky-social#2850)
Browse files Browse the repository at this point in the history
* Allow retrying proxied requests

* fix tests

* remove un-necessary code

* Only retry on socket errors
  • Loading branch information
matthieusieben authored Oct 31, 2024
1 parent 7ea0fc1 commit 9ffeb52
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 16 deletions.
5 changes: 5 additions & 0 deletions .changeset/rare-items-arrive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/pds": patch
---

Use a less cryptic error message when proxying fails
5 changes: 5 additions & 0 deletions .changeset/thirty-masks-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/pds": patch
---

Allow retrying proxied requests
5 changes: 5 additions & 0 deletions packages/pds/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => {
headersTimeout: env.proxyHeadersTimeout ?? 10e3,
bodyTimeout: env.proxyBodyTimeout ?? 30e3,
maxResponseSize: env.proxyMaxResponseSize ?? 10 * 1024 * 1024, // 10mb
maxRetries:
env.proxyMaxRetries != null && env.proxyMaxRetries > 0
? env.proxyMaxRetries
: 0,
preferCompressed: env.proxyPreferCompressed ?? false,
}

Expand Down Expand Up @@ -414,6 +418,7 @@ export type ProxyConfig = {
headersTimeout: number
bodyTimeout: number
maxResponseSize: number
maxRetries: number

/**
* When proxying requests that might get intercepted (for read-after-write) we
Expand Down
2 changes: 2 additions & 0 deletions packages/pds/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ export const readEnv = (): ServerEnvironment => {
proxyHeadersTimeout: envInt('PDS_PROXY_HEADERS_TIMEOUT'),
proxyBodyTimeout: envInt('PDS_PROXY_BODY_TIMEOUT'),
proxyMaxResponseSize: envInt('PDS_PROXY_MAX_RESPONSE_SIZE'),
proxyMaxRetries: envInt('PDS_PROXY_MAX_RETRIES'),
proxyPreferCompressed: envBool('PDS_PROXY_PREFER_COMPRESSED'),
}
}
Expand Down Expand Up @@ -254,5 +255,6 @@ export type ServerEnvironment = {
proxyHeadersTimeout?: number
proxyBodyTimeout?: number
proxyMaxResponseSize?: number
proxyMaxRetries?: number
proxyPreferCompressed?: boolean
}
14 changes: 11 additions & 3 deletions packages/pds/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export type AppContextOptions = {
moderationAgent?: AtpAgent
reportingAgent?: AtpAgent
entrywayAgent?: AtpAgent
proxyAgent: undici.Agent
proxyAgent: undici.Dispatcher
safeFetch: Fetch
authProvider?: PdsOAuthProvider
authVerifier: AuthVerifier
Expand All @@ -90,7 +90,7 @@ export class AppContext {
public moderationAgent: AtpAgent | undefined
public reportingAgent: AtpAgent | undefined
public entrywayAgent: AtpAgent | undefined
public proxyAgent: undici.Agent
public proxyAgent: undici.Dispatcher
public safeFetch: Fetch
public authVerifier: AuthVerifier
public authProvider?: PdsOAuthProvider
Expand Down Expand Up @@ -264,7 +264,7 @@ export class AppContext {
})

// An agent for performing HTTP requests based on user provided URLs.
const proxyAgent = new undici.Agent({
const proxyAgentBase = new undici.Agent({
allowH2: cfg.proxy.allowHTTP2, // This is experimental
headersTimeout: cfg.proxy.headersTimeout,
maxResponseSize: cfg.proxy.maxResponseSize,
Expand All @@ -286,6 +286,14 @@ export class AppContext {
lookup: cfg.proxy.disableSsrfProtection ? undefined : unicastLookup,
},
})
const proxyAgent =
cfg.proxy.maxRetries > 0
? new undici.RetryAgent(proxyAgentBase, {
statusCodes: [], // Only retry on socket errors
methods: ['GET', 'HEAD'],
maxRetries: cfg.proxy.maxRetries,
})
: proxyAgentBase

// A fetch() function that protects against SSRF attacks, large responses &
// known bad domains. This function can safely be used to fetch user
Expand Down
25 changes: 13 additions & 12 deletions packages/pds/src/pipethrough.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ async function pipethroughRequest(

function handleUpstreamRequestError(
err: unknown,
message = 'pipethrough network error',
message = 'Upstream service unreachable',
): never {
httpLogger.warn({ err }, message)
httpLogger.error({ err }, message)
throw new XRPCServerError(ResponseType.UpstreamFailure, message, undefined, {
cause: err,
})
Expand Down Expand Up @@ -520,18 +520,11 @@ async function tryParsingError(
}
}

export async function bufferUpstreamResponse(
async function bufferUpstreamResponse(
readable: Readable,
contentEncoding?: string | string[],
): Promise<Buffer> {
try {
// Needed for type-safety (should never happen irl)
if (Array.isArray(contentEncoding)) {
throw new TypeError(
'upstream service returned multiple content-encoding headers',
)
}

return await streamToNodeBuffer(decodeStream(readable, contentEncoding))
} catch (err) {
if (!readable.destroyed) readable.destroy()
Expand Down Expand Up @@ -561,7 +554,11 @@ export async function asPipeThroughBuffer(
// Response parsing/forwarding
// -------------------

const RES_HEADERS_TO_FORWARD = ['atproto-repo-rev', 'atproto-content-labelers']
const RES_HEADERS_TO_FORWARD = [
'atproto-repo-rev',
'atproto-content-labelers',
'retry-after',
]

function* responseHeaders(
headers: IncomingHttpHeaders,
Expand All @@ -584,7 +581,11 @@ function* responseHeaders(
for (let i = 0; i < RES_HEADERS_TO_FORWARD.length; i++) {
const name = RES_HEADERS_TO_FORWARD[i]
const val = headers[name]
if (typeof val === 'string') yield [name, val]

if (val != null) {
const value: string = Array.isArray(val) ? val.join(',') : val
yield [name, value]
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/pds/tests/proxied/proxy-catchall.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describe('proxy header', () => {
for (const lex of lexicons) client.lex.add(lex)

await expect(client.call('com.example.ok')).rejects.toThrow(
'pipethrough network error',
'Upstream service unreachable',
)
})

Expand Down

0 comments on commit 9ffeb52

Please sign in to comment.