Skip to content

Commit

Permalink
Server federation (bluesky-social#68)
Browse files Browse the repository at this point in the history
* start of server federation

* repo post/get roots

* push/pull & basic subscriptions

* working on federation

* wip

* federation works!

* cleanup

* remove old user routes
  • Loading branch information
dholms authored Apr 10, 2022
1 parent a4e8ece commit 94d84d0
Show file tree
Hide file tree
Showing 30 changed files with 454 additions and 198 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ go/bsky
go/server

server/blockstore
server/user_dids
server/user_roots
server/dev.sqlite
server/blockstore2
server/dev2.sqlite
server/test.sqlite
third-party/blockstore
4 changes: 2 additions & 2 deletions cli/src/commands/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import './interactions/like.js'
import './interactions/unlike.js'
import './net/pull.js'
// import './net/pull.js'
import './posts/delete-post.js'
import './posts/edit-post.js'
import './posts/feed.js'
import './posts/post.js'
import './posts/reply.js'
// import './posts/reply.js'
import './posts/timeline.js'
import './setup/init.js'
import './setup/register.js'
Expand Down
2 changes: 1 addition & 1 deletion cli/src/commands/interactions/like.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export default cmd({
const tid = TID.fromStr(args._[1])
const client = await loadDelegate(REPO_PATH)
const like = await client.likePost(author, tid)
const likeTid = TID.fromStr(like.tid)
const likeTid = like.tid
console.log(`Created like: `, likeTid.formatted())
},
})
2 changes: 1 addition & 1 deletion cli/src/commands/posts/feed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export default cmd({
for (const post of posts) {
console.log(post.text)
console.log(chalk.gray(formatDate(post.time)))
console.log(`id: ${chalk.gray(TID.fromStr(post.tid).formatted())}`)
console.log(`id: ${chalk.gray(post.tid.formatted())}`)
console.log(``)
}
},
Expand Down
2 changes: 1 addition & 1 deletion cli/src/commands/posts/post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export default cmd({
const text = args._[0]
const client = await loadDelegate(REPO_PATH)
const post = await client.addPost(text)
const tid = TID.fromStr(post.tid)
const tid = post.tid
console.log(`Created post: `, tid.formatted())
},
})
2 changes: 1 addition & 1 deletion cli/src/commands/posts/timeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export default cmd({
console.log(`"${post.text}" - ${post.author_name}`)
console.log(`Likes: ${post.likes}`)
console.log(chalk.gray(formatDate(post.time)))
console.log(chalk.gray(`id: ${TID.fromStr(post.tid).formatted()}`))
console.log(chalk.gray(`id: ${post.tid.formatted()}`))
console.log(``)
}
},
Expand Down
15 changes: 15 additions & 0 deletions common/src/common/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ import * as ucan from 'ucans'
const cid = z.instanceof(mf.CID)
export type CID = z.infer<typeof cid>

export const isCid = (str: string): boolean => {
try {
mf.CID.parse(str)
return true
} catch (err) {
return false
}
}

const strToCid = z
.string()
.refine(isCid, { message: 'Not a valid CID' })
.transform(mf.CID.parse)

// @TODO improve our DID represnetation
const did = z.string()
export type DID = z.infer<typeof did>
Expand All @@ -24,6 +38,7 @@ const strToInt = z
export const schema = {
string: z.string(),
cid,
strToCid,
did,
bytes,
strToInt,
Expand Down
1 change: 1 addition & 0 deletions common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export * as util from './common/util.js'
export * as service from './network/service.js'
export * as ucanCheck from './auth/ucan-checks.js'
export * as auth from './auth/index.js'
export * as delta from './repo/delta.js'

import { schema as microblogSchema } from './microblog/types.js'
import { schema as commonSchema } from './common/types.js'
Expand Down
23 changes: 8 additions & 15 deletions common/src/microblog/delegator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import * as ucan from 'ucans'
import { Collection, Follow } from '../repo/types.js'
import { Keypair } from '../common/types.js'
import * as auth from '../auth/index.js'
import * as service from '../network/service.js'

export class MicroblogDelegator {
namespace = 'did:bsky:microblog'
Expand Down Expand Up @@ -109,18 +110,11 @@ export class MicroblogDelegator {
}

async lookupDid(username: string): Promise<string | null> {
const params = { resource: username }
try {
const res = await axios.get(`${this.url}/.well-known/webfinger`, {
params,
})
return check.assure(repoSchema.did, res.data.id)
} catch (e) {
const err = assureAxiosError(e)
if (err.response?.status === 404) {
return null
}
throw new Error(err.message)
const [name, host] = username.split('@')
if (!host) {
return service.lookupDid(this.url, name)
} else {
return service.lookupDid(`http://${host}`, name)
}
}

Expand Down Expand Up @@ -265,9 +259,8 @@ export class MicroblogDelegator {
}
}

async followUser(nameOrDid: string): Promise<void> {
const did = await this.resolveDid(nameOrDid)
const data = { creator: this.did, target: did }
async followUser(username: string): Promise<void> {
const data = { creator: this.did, username }
const token = await this.relationshipToken()
try {
await axios.post(`${this.url}/data/relationship`, data, authCfg(token))
Expand Down
145 changes: 79 additions & 66 deletions common/src/network/service.ts
Original file line number Diff line number Diff line change
@@ -1,80 +1,93 @@
import axios from 'axios'
import { CID } from 'multiformats'
import { assureAxiosError } from './util.js'
import * as check from '../common/check.js'
import { schema as repoSchema } from '../repo/types.js'

const SERVER_URL = 'http://localhost:2583'
const THIRD_PARTY_URL = 'http://localhost:2584'

export const register = async (
car: Uint8Array,
authToken: string,
): Promise<void> => {
await axios.post(`${SERVER_URL}/user/register`, car, {
headers: {
'Content-Type': 'application/octet-stream',
Authorization: `Bearer ${authToken}`,
},
})
}

export const updateUser = async (
car: Uint8Array,
authToken: string,
): Promise<void> => {
await axios.post(`${SERVER_URL}/user/update`, car, {
headers: {
'Content-Type': 'application/octet-stream',
Authorization: `Bearer ${authToken}`,
},
})
}

export const fetchUser = async (did: string): Promise<Uint8Array> => {
const res = await axios.get(`${SERVER_URL}/user/${did}`, {
responseType: 'arraybuffer',
})
return new Uint8Array(res.data)
}

export const fetchUsers = async (): Promise<
{ name: string; did: string }[]
> => {
const res = await axios.get(`${SERVER_URL}/users`)
return res.data
}

export const getServerDid = async (): Promise<string> => {
const res = await axios.get(`${SERVER_URL}/.well-known/did.json`)
return res.data.id
export const lookupDid = async (
url: string,
name: string,
): Promise<string | null> => {
const params = { resource: name }
try {
const res = await axios.get(`${url}/.well-known/webfinger`, {
params,
})
return check.assure(repoSchema.did, res.data.id)
} catch (e) {
const err = assureAxiosError(e)
if (err.response?.status === 404) {
return null
}
throw new Error(err.message)
}
}

export const fetchUserDid = async (
username: string,
): Promise<string | null> => {
export const getRemoteRoot = async (
url: string,
did: string,
): Promise<CID | null> => {
const params = { did }
try {
const res = await axios.get(
`${SERVER_URL}/.well-known/webfinger?resource=${username}`,
)
return res.data.id
} catch (_err) {
return null
const res = await axios.get(`${url}/data/root`, { params })
return CID.parse(res.data.root)
} catch (e) {
const err = assureAxiosError(e)
if (err.response?.status === 404) {
return null
}
throw new Error(`Could not retrieve server did ${err.message}`)
}
}

export const getThirdPartyDid = async (): Promise<string> => {
const res = await axios.get(`${THIRD_PARTY_URL}/.well-known/did.json`)
return res.data.id
export const subscribe = async (
url: string,
did: string,
ownUrl: string,
): Promise<void> => {
const data = { did, host: ownUrl }
try {
await axios.post(`${url}/data/subscribe`, data)
} catch (e) {
const err = assureAxiosError(e)
throw new Error(`Could not retrieve server did ${err.message}`)
}
}

export const thirdPartyPost = async (
username: string,
authToken: string,
export const pushRepo = async (
url: string,
did: string,
car: Uint8Array,
): Promise<void> => {
await axios.post(
`${THIRD_PARTY_URL}/post`,
{ username },
{
try {
await axios.post(`${url}/data/repo/${did}`, car, {
headers: {
Authorization: `Bearer ${authToken}`,
'Content-Type': 'application/octet-stream',
},
},
)
})
} catch (e) {
const err = assureAxiosError(e)
throw new Error(`Could not retrieve server did ${err.message}`)
}
}

export const pullRepo = async (
url: string,
did: string,
from?: CID,
): Promise<Uint8Array | null> => {
const params = { did, from: from?.toString() }
try {
const res = await axios.get(`${url}/data/repo`, {
params,
responseType: 'arraybuffer',
})
return new Uint8Array(res.data)
} catch (e) {
const err = assureAxiosError(e)
if (err.response?.status === 404) {
return null
}
throw new Error(`Could not retrieve server did ${err.message}`)
}
}
37 changes: 32 additions & 5 deletions common/src/repo/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
writeCap,
} from '../auth/bluesky-capability.js'
import * as auth from '../auth/index.js'
import * as service from '../network/service.js'
import * as delta from './delta.js'

export class Repo implements CarStreamable {
Expand Down Expand Up @@ -131,6 +132,7 @@ export class Repo implements CarStreamable {
static async fromCarFile(
buf: Uint8Array,
store: IpldStore,
emit?: (evt: delta.Event) => Promise<void>,
keypair?: Keypair,
ucanStore?: ucan.Store,
) {
Expand All @@ -146,7 +148,9 @@ export class Repo implements CarStreamable {
await store.putBytes(block.cid, block.bytes)
}

return Repo.load(store, root, keypair, ucanStore)
const repo = await Repo.load(store, root, keypair, ucanStore)
await repo.verifySetOfUpdates(null, repo.cid, emit)
return repo
}

// ROOT OPERATIONS
Expand Down Expand Up @@ -184,6 +188,7 @@ export class Repo implements CarStreamable {
}
const newCids = update.newCids
const tokenCid = await this.ucanForOperation(update)
newCids.add(tokenCid)
const root: RepoRoot = {
did: this.did,
prev: this.cid,
Expand All @@ -193,8 +198,6 @@ export class Repo implements CarStreamable {
relationships: this.relationships.cid,
}
const rootCid = await this.blockstore.put(root)
newCids.add(tokenCid)
newCids.add(rootCid)
const commit: Commit = {
root: rootCid,
sig: await this.keypair.sign(rootCid.bytes),
Expand Down Expand Up @@ -296,6 +299,23 @@ export class Repo implements CarStreamable {
return this.blockstore.put(foundUcan.ucan.encoded())
}

// PUSH/PULL TO REMOTE
// -----------

async push(url: string): Promise<void> {
const remoteRoot = await service.getRemoteRoot(url, this.did)
const car = await this.getDiffCar(remoteRoot)
await service.pushRepo(url, this.did, car)
}

async pull(url: string): Promise<void> {
const car = await service.pullRepo(url, this.did, this.cid)
if (car === null) {
throw new Error(`Could not find repo for did: ${this.did}`)
}
await this.loadAndVerifyDiff(car)
}

// VERIFYING UPDATES
// -----------

Expand All @@ -315,11 +335,15 @@ export class Repo implements CarStreamable {
to: CID,
emit?: (evt: delta.Event) => Promise<void>,
): Promise<void> {
if (from === null || from.equals(to)) return
if (to.equals(from)) return
const toRepo = await Repo.load(this.blockstore, to)
const root = await toRepo.getRoot()
if (!root.prev) {
throw new Error('Could not find start repo root')
if (from === null) {
return
} else {
throw new Error('Could not find start repo root')
}
}
await this.verifySetOfUpdates(from, root.prev, emit)
const prevRepo = await Repo.load(this.blockstore, root.prev)
Expand Down Expand Up @@ -356,6 +380,9 @@ export class Repo implements CarStreamable {
} else if (!root.prev.equals(prev.cid)) {
throw new Error('Previous version root CID does not match')
}
if (root.did !== prev.did) {
throw new Error('Changes in DID are not allowed at this point')
}
const newCids = new CidSet(root.new_cids)
let events: delta.Event[] = []
const mapDiff = delta.idMapDiff(
Expand Down
Loading

0 comments on commit 94d84d0

Please sign in to comment.