Skip to content

Commit

Permalink
List blobs method (bluesky-social#662)
Browse files Browse the repository at this point in the history
* add list blobs method

* tests + small bugfix

* one more test
  • Loading branch information
dholms authored Mar 15, 2023
1 parent ba428a7 commit 197f40b
Show file tree
Hide file tree
Showing 11 changed files with 348 additions and 15 deletions.
32 changes: 32 additions & 0 deletions lexicons/com/atproto/sync/listBlobs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"lexicon": 1,
"id": "com.atproto.sync.listBlobs",
"defs": {
"main": {
"type": "query",
"description": "List blob cids for some range of commits",
"parameters": {
"type": "params",
"required": ["did"],
"properties": {
"did": {"type": "string", "description": "The DID of the repo."},
"latest": { "type": "string", "description": "The most recent commit"},
"earliest": { "type": "string", "description": "The earliest commit to start from"}
}
},
"output": {
"encoding": "application/json",
"schema": {
"type": "object",
"required": ["cids"],
"properties": {
"cids": {
"type": "array",
"items": { "type": "string" }
}
}
}
}
}
}
}
13 changes: 13 additions & 0 deletions packages/api/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommit
import * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
import * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
import * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
import * as ComAtprotoSyncListBlobs from './types/com/atproto/sync/listBlobs'
import * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate'
import * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl'
import * as ComAtprotoSyncSubscribeAllRepos from './types/com/atproto/sync/subscribeAllRepos'
Expand Down Expand Up @@ -147,6 +148,7 @@ export * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommit
export * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
export * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
export * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
export * as ComAtprotoSyncListBlobs from './types/com/atproto/sync/listBlobs'
export * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate'
export * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl'
export * as ComAtprotoSyncSubscribeAllRepos from './types/com/atproto/sync/subscribeAllRepos'
Expand Down Expand Up @@ -782,6 +784,17 @@ export class SyncNS {
})
}

listBlobs(
params?: ComAtprotoSyncListBlobs.QueryParams,
opts?: ComAtprotoSyncListBlobs.CallOptions,
): Promise<ComAtprotoSyncListBlobs.Response> {
return this._service.xrpc
.call('com.atproto.sync.listBlobs', params, undefined, opts)
.catch((e) => {
throw ComAtprotoSyncListBlobs.toKnownErr(e)
})
}

notifyOfUpdate(
params?: ComAtprotoSyncNotifyOfUpdate.QueryParams,
opts?: ComAtprotoSyncNotifyOfUpdate.CallOptions,
Expand Down
44 changes: 44 additions & 0 deletions packages/api/src/client/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2251,6 +2251,49 @@ export const schemaDict = {
},
},
},
ComAtprotoSyncListBlobs: {
lexicon: 1,
id: 'com.atproto.sync.listBlobs',
defs: {
main: {
type: 'query',
description: 'List blob cids for some range of commits',
parameters: {
type: 'params',
required: ['did'],
properties: {
did: {
type: 'string',
description: 'The DID of the repo.',
},
latest: {
type: 'string',
description: 'The most recent commit',
},
earliest: {
type: 'string',
description: 'The earliest commit to start from',
},
},
},
output: {
encoding: 'application/json',
schema: {
type: 'object',
required: ['cids'],
properties: {
cids: {
type: 'array',
items: {
type: 'string',
},
},
},
},
},
},
},
},
ComAtprotoSyncNotifyOfUpdate: {
lexicon: 1,
id: 'com.atproto.sync.notifyOfUpdate',
Expand Down Expand Up @@ -4216,6 +4259,7 @@ export const ids = {
ComAtprotoSyncGetHead: 'com.atproto.sync.getHead',
ComAtprotoSyncGetRecord: 'com.atproto.sync.getRecord',
ComAtprotoSyncGetRepo: 'com.atproto.sync.getRepo',
ComAtprotoSyncListBlobs: 'com.atproto.sync.listBlobs',
ComAtprotoSyncNotifyOfUpdate: 'com.atproto.sync.notifyOfUpdate',
ComAtprotoSyncRequestCrawl: 'com.atproto.sync.requestCrawl',
ComAtprotoSyncSubscribeAllRepos: 'com.atproto.sync.subscribeAllRepos',
Expand Down
39 changes: 39 additions & 0 deletions packages/api/src/client/types/com/atproto/sync/listBlobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import { Headers, XRPCError } from '@atproto/xrpc'
import { ValidationResult } from '@atproto/lexicon'
import { isObj, hasProp } from '../../../../util'
import { lexicons } from '../../../../lexicons'

export interface QueryParams {
/** The DID of the repo. */
did: string
/** The most recent commit */
latest?: string
/** The earliest commit to start from */
earliest?: string
}

export type InputSchema = undefined

export interface OutputSchema {
cids: string[]
[k: string]: unknown
}

export interface CallOptions {
headers?: Headers
}

export interface Response {
success: boolean
headers: Headers
data: OutputSchema
}

export function toKnownErr(e: any) {
if (e instanceof XRPCError) {
}
return e
}
2 changes: 2 additions & 0 deletions packages/pds/src/api/com/atproto/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import getCommitPath from './getCommitPath'
import getHead from './getHead'
import getRecord from './getRecord'
import getRepo from './getRepo'
import listBlobs from './listBlobs'
import subscribe from './subscribeAllRepos'

export default function (server: Server, ctx: AppContext) {
Expand All @@ -17,5 +18,6 @@ export default function (server: Server, ctx: AppContext) {
getHead(server, ctx)
getRecord(server, ctx)
getRepo(server, ctx)
listBlobs(server, ctx)
subscribe(server, ctx)
}
35 changes: 35 additions & 0 deletions packages/pds/src/api/com/atproto/sync/listBlobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { CID } from 'multiformats/cid'
import { InvalidRequestError } from '@atproto/xrpc-server'
import { Server } from '../../../../lexicon'
import SqlRepoStorage from '../../../../sql-repo-storage'
import AppContext from '../../../../context'

export default function (server: Server, ctx: AppContext) {
server.com.atproto.sync.listBlobs(async ({ params }) => {
const { did } = params
const storage = new SqlRepoStorage(ctx.db, did)
const earliest = params.earliest ? CID.parse(params.earliest) : null
const latest = params.latest
? CID.parse(params.latest)
: await storage.getHead()
if (latest === null) {
throw new InvalidRequestError(`Could not find root for DID: ${did}`)
}
const commitPath = await storage.getCommitPath(latest, earliest)
if (commitPath === null) {
throw new InvalidRequestError(
`Could not find a valid commit path from ${latest.toString()} to ${earliest?.toString()}`,
)
}
const cids = await ctx.services
.repo(ctx.db)
.blobs.listForCommits(did, commitPath)

return {
encoding: 'application/json',
body: {
cids: cids.map((c) => c.toString()),
},
}
})
}
8 changes: 8 additions & 0 deletions packages/pds/src/lexicon/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import * as ComAtprotoSyncGetCommitPath from './types/com/atproto/sync/getCommit
import * as ComAtprotoSyncGetHead from './types/com/atproto/sync/getHead'
import * as ComAtprotoSyncGetRecord from './types/com/atproto/sync/getRecord'
import * as ComAtprotoSyncGetRepo from './types/com/atproto/sync/getRepo'
import * as ComAtprotoSyncListBlobs from './types/com/atproto/sync/listBlobs'
import * as ComAtprotoSyncNotifyOfUpdate from './types/com/atproto/sync/notifyOfUpdate'
import * as ComAtprotoSyncRequestCrawl from './types/com/atproto/sync/requestCrawl'
import * as ComAtprotoSyncSubscribeAllRepos from './types/com/atproto/sync/subscribeAllRepos'
Expand Down Expand Up @@ -523,6 +524,13 @@ export class SyncNS {
return this._server.xrpc.method(nsid, cfg)
}

listBlobs<AV extends AuthVerifier>(
cfg: ConfigOf<AV, ComAtprotoSyncListBlobs.Handler<ExtractAuth<AV>>>,
) {
const nsid = 'com.atproto.sync.listBlobs' // @ts-ignore
return this._server.xrpc.method(nsid, cfg)
}

notifyOfUpdate<AV extends AuthVerifier>(
cfg: ConfigOf<AV, ComAtprotoSyncNotifyOfUpdate.Handler<ExtractAuth<AV>>>,
) {
Expand Down
44 changes: 44 additions & 0 deletions packages/pds/src/lexicon/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2251,6 +2251,49 @@ export const schemaDict = {
},
},
},
ComAtprotoSyncListBlobs: {
lexicon: 1,
id: 'com.atproto.sync.listBlobs',
defs: {
main: {
type: 'query',
description: 'List blob cids for some range of commits',
parameters: {
type: 'params',
required: ['did'],
properties: {
did: {
type: 'string',
description: 'The DID of the repo.',
},
latest: {
type: 'string',
description: 'The most recent commit',
},
earliest: {
type: 'string',
description: 'The earliest commit to start from',
},
},
},
output: {
encoding: 'application/json',
schema: {
type: 'object',
required: ['cids'],
properties: {
cids: {
type: 'array',
items: {
type: 'string',
},
},
},
},
},
},
},
},
ComAtprotoSyncNotifyOfUpdate: {
lexicon: 1,
id: 'com.atproto.sync.notifyOfUpdate',
Expand Down Expand Up @@ -4216,6 +4259,7 @@ export const ids = {
ComAtprotoSyncGetHead: 'com.atproto.sync.getHead',
ComAtprotoSyncGetRecord: 'com.atproto.sync.getRecord',
ComAtprotoSyncGetRepo: 'com.atproto.sync.getRepo',
ComAtprotoSyncListBlobs: 'com.atproto.sync.listBlobs',
ComAtprotoSyncNotifyOfUpdate: 'com.atproto.sync.notifyOfUpdate',
ComAtprotoSyncRequestCrawl: 'com.atproto.sync.requestCrawl',
ComAtprotoSyncSubscribeAllRepos: 'com.atproto.sync.subscribeAllRepos',
Expand Down
45 changes: 45 additions & 0 deletions packages/pds/src/lexicon/types/com/atproto/sync/listBlobs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* GENERATED CODE - DO NOT MODIFY
*/
import express from 'express'
import { ValidationResult } from '@atproto/lexicon'
import { lexicons } from '../../../../lexicons'
import { isObj, hasProp } from '../../../../util'
import { HandlerAuth } from '@atproto/xrpc-server'

export interface QueryParams {
/** The DID of the repo. */
did: string
/** The most recent commit */
latest?: string
/** The earliest commit to start from */
earliest?: string
}

export type InputSchema = undefined

export interface OutputSchema {
cids: string[]
[k: string]: unknown
}

export type HandlerInput = undefined

export interface HandlerSuccess {
encoding: 'application/json'
body: OutputSchema
}

export interface HandlerError {
status: number
message?: string
}

export type HandlerOutput = HandlerError | HandlerSuccess
export type Handler<HA extends HandlerAuth = never> = (ctx: {
auth: HA
params: QueryParams
input: HandlerInput
req: express.Request
res: express.Response
}) => Promise<HandlerOutput> | HandlerOutput
15 changes: 14 additions & 1 deletion packages/pds/src/services/repo/blobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import stream from 'stream'
import { CID } from 'multiformats/cid'
import bytes from 'bytes'
import { fromStream as fileTypeFromStream } from 'file-type'
import { BlobStore, WriteOpAction } from '@atproto/repo'
import { BlobStore, CidSet, WriteOpAction } from '@atproto/repo'
import { AtUri } from '@atproto/uri'
import { sha256Stream } from '@atproto/crypto'
import { cloneStream, sha256RawToCid, streamSize } from '@atproto/common'
Expand Down Expand Up @@ -122,6 +122,19 @@ export class RepoBlobs {
.execute()
}

async listForCommits(did: string, commits: CID[]): Promise<CID[]> {
if (commits.length < 1) return []
const commitStrs = commits.map((c) => c.toString())
const res = await this.db.db
.selectFrom('repo_blob')
.where('did', '=', did)
.where('commit', 'in', commitStrs)
.select('cid')
.execute()
const cids = res.map((row) => CID.parse(row.cid))
return new CidSet(cids).toList()
}

async deleteForUser(did: string): Promise<void> {
this.db.assertTransaction()
const [deleted] = await Promise.all([
Expand Down
Loading

0 comments on commit 197f40b

Please sign in to comment.