Skip to content

Commit

Permalink
Use Transcript.
Browse files Browse the repository at this point in the history
I'd extracted Recorder and Player into a new module. Changed it to allow
for blocks of parts. I've now reimported it.
  • Loading branch information
flatheadmill committed Dec 25, 2020
1 parent c6b2a58 commit 1785d8c
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 178 deletions.
5 changes: 2 additions & 3 deletions cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class Cursor {
//
insert (index, key, parts, writes, buffers = this.serialize(parts)) {
const header = { method: 'insert', index: index }
const buffer = this._sheaf._recorder(header, buffers)
const buffer = this._sheaf._recordify(header, buffers)
const record = { key: key, parts: parts, heft: buffer.length }

this._entry.heft += record.heft
Expand All @@ -88,9 +88,8 @@ class Cursor {

remove (index, writes) {
const header = { method: 'delete', index: index }
const buffer = this._sheaf._recorder(header, [])
const buffer = this._sheaf._recordify(header, [])

console.log(this.page)
this._sheaf.append(this.page.id, buffer, writes)

const [ spliced ] = this.page.items.splice(index, 1)
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"journalist": "0.1.11",
"magazine": "6.0.0-alpha.1",
"prospective": "^0.3.0",
"transcript": "0.1.3",
"turnstile": "6.0.0-alpha.39",
"whittle": "0.0.0"
},
Expand Down
82 changes: 0 additions & 82 deletions player.js

This file was deleted.

16 changes: 0 additions & 16 deletions recorder.js

This file was deleted.

45 changes: 25 additions & 20 deletions sheaf.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ const Fracture = require('fracture')
const fnv = require('./fnv')

// Serialize a single b-tree record.
const recorder = require('./recorder')
const Recorder = require('transcript/recorder')

// Incrementally read a b-tree page chunk by chunk.
const Player = require('./player')
const Player = require('transcript/player')

// Binary search for a record in a b-tree page.
const find = require('./find')
Expand Down Expand Up @@ -143,7 +143,7 @@ class Sheaf {
return options.comparator
}
} ()
this._recorder = recorder(() => '0')
this.$_recorder = Recorder.create(() => '0')
this._root = null

// **TODO** Do not worry about wrapping anymore.
Expand Down Expand Up @@ -211,7 +211,7 @@ class Sheaf {

await fs.mkdir(this._path('instances', '0'), { recursive: true })
await fs.mkdir(this._path('pages', '0.0'), { recursive: true })
const buffer = this._recorder.call(null, { id: '0.1' }, [])
const buffer = this._recordify({ id: '0.1' }, [])
const hash = fnv(buffer)
await fs.writeFile(this._path('pages', '0.0', hash), buffer)
await fs.mkdir(this._path('pages', '0.1'), { recursive: true })
Expand Down Expand Up @@ -271,20 +271,22 @@ class Sheaf {
const readable = fileSystem.createReadStream(this._path('pages', id, append))
for await (const chunk of readable) {
for (const entry of player.split(chunk)) {
switch (entry.header.method) {
const header = JSON.parse(entry.parts.shift())
console.log(header, entry)
switch (header.method) {
case 'right': {
// TODO Need to use the key section of the record.
page.right = this.serializer.key.deserialize(entry.parts)
assert(page.right != null)
}
break
case 'load': {
const { id, append } = entry.header
const { id, append } = header
const { page: loaded } = await this._read(id, append)
page.items = loaded.items
page.right = loaded.right
page.key = loaded.key
page.vacuum.push({ header: entry.header, vacuum: loaded.vacuum })
page.vacuum.push({ header: header, vacuum: loaded.vacuum })
}
break
case 'slice': {
Expand All @@ -295,23 +297,23 @@ class Sheaf {
}
break
case 'merge': {
const { page: right } = await this._read(entry.header.id, entry.header.append)
const { page: right } = await this._read(header.id, header.append)
page.items.push.apply(page.items, right.items)
page.right = right.right
page.vacuum.push({ header: entry.header, vacuum: right.vacuum })
page.vacuum.push({ header: header, vacuum: right.vacuum })
}
break
case 'insert': {
const parts = this.serializer.parts.deserialize(entry.parts)
page.items.splice(entry.header.index, 0, {
page.items.splice(header.index, 0, {
key: this.extractor(parts),
parts: parts,
heft: entry.sizes.reduce((sum, size) => sum + size, 0)
})
}
break
case 'delete': {
page.items.splice(entry.header.index, 1)
page.items.splice(header.index, 1)
// TODO We do not want to vacuum automatically, we want
// it to be optional, possibly delayed. Expecially for
// MVCC where we are creating short-lived trees, we
Expand Down Expand Up @@ -351,8 +353,9 @@ class Sheaf {
})
const items = []
for (const entry of player.split(buffer)) {
const header = JSON.parse(entry.parts.shift())
items.push({
id: entry.header.id,
id: header.id,
key: entry.parts.length != 0
? this.serializer.key.deserialize(entry.parts)
: null
Expand Down Expand Up @@ -539,7 +542,6 @@ class Sheaf {

async _writeLeaf (id, writes) {
const append = await this._appendable(id)
const recorder = this._recorder
await fs.appendFile(this._path('pages', id, append), Buffer.concat(writes))
}

Expand Down Expand Up @@ -619,7 +621,11 @@ class Sheaf {
}

_serialize (header, parts) {
return this._recorder(header, parts.length == 0 ? parts : this.serializer.parts.serialize(parts))
return this._recordify(header, parts.length == 0 ? parts : this.serializer.parts.serialize(parts))
}

_recordify (header, parts) {
return this.$_recorder([[ Buffer.from(JSON.stringify(header)) ].concat(parts)])
}

_stub (commit, id, append, records) {
Expand All @@ -639,7 +645,7 @@ class Sheaf {
const parts = key != null
? this.serializer.key.serialize(key)
: []
buffers.push(this._recorder({ id }, parts))
buffers.push(this._recordify({ id }, parts))
}
const buffer = Buffer.concat(buffers)
entry.heft = buffer.length
Expand Down Expand Up @@ -773,22 +779,21 @@ class Sheaf {

await commit.unlink(path.join('pages', leaf.entry.value.id, first))

const recorder = this._recorder
const buffers = []
const { id, right, key } = leaf.entry.value

if (right != null) {
buffers.push(recorder({ method: 'right' }, this.serializer.key.serialize(right)))
buffers.push(this._recordify({ method: 'right' }, this.serializer.key.serialize(right)))
}
// Write out a new page slowly, a record at a time.
for (let index = 0, I = items.length; index < I; index++) {
const parts = this.serializer.parts.serialize(items[index].parts)
buffers.push(recorder({ method: 'insert', index }, parts))
buffers.push(this._recordify({ method: 'insert', index }, parts))
}
if (key != null) {
buffers.push(recorder({ method: 'key' }, this.serializer.key.serialize(key)))
buffers.push(this._recordify({ method: 'key' }, this.serializer.key.serialize(key)))
}
buffers.push(recorder({
buffers.push(this._recordify({
method: 'dependent', id: id, append: second
}, []))

Expand Down
6 changes: 4 additions & 2 deletions shifter.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ module.exports = function (checksum) {
if (array.length == 0) {
return null
}
const checksum = array.shift(), header = array.shift()
return [ header ].concat(array.splice(0, header.lengths.length))
const checksum = array.shift()
const lengths = array.shift()
const header = array.shift()
return [ header ].concat(array.splice(0, lengths[0].length - 1))
}
}
31 changes: 0 additions & 31 deletions test/player.t.js

This file was deleted.

18 changes: 0 additions & 18 deletions test/recorder.t.js

This file was deleted.

16 changes: 10 additions & 6 deletions utilities.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ const path = require('path')
const fileSystem = require('fs')
const fs = require('fs').promises
const shifter = require('./shifter')(() => '0')
const recorder = require('./recorder')(() => '0')
const recorder = require('transcript/recorder').create(() => '0')
const fnv = require('./fnv')

function recordify (header, parts) {
return recorder([[ Buffer.from(JSON.stringify(header)) ].concat(parts)])
}

const appendable = require('./appendable')

exports.directory = path.resolve(__dirname, './test/tmp')
Expand All @@ -31,7 +35,7 @@ exports.vivify = async function (directory) {
const entries = lines.map(line => JSON.parse(line))
const records = []
while (entries.length != 0) {
const record = shifter(entries), header = record[0].header
const record = shifter(entries), header = record[0]
switch (header.method) {
case 'right':
records.push([ header.method, header.right ])
Expand Down Expand Up @@ -60,7 +64,7 @@ exports.vivify = async function (directory) {
const items = []
while (entries.length) {
const record = shifter(entries)
items.push([ record[0].header.id, record.length == 2 ? record[1] : null ])
items.push([ record[0].id, record.length == 2 ? record[1] : null ])
}
vivified[file] = items
}
Expand All @@ -75,7 +79,7 @@ exports.serialize = async function (directory, files) {
await fs.mkdir(path.resolve(directory, 'pages', id), { recursive: true })
if (+id.split('.')[1] % 2 == 0) {
const buffers = files[id].map(record => {
return recorder({
return recordify({
id: record[0]
}, record[1] != null ? [ Buffer.from(JSON.stringify(record[1])) ] : [])
})
Expand Down Expand Up @@ -109,9 +113,9 @@ exports.serialize = async function (directory, files) {
console.log(record)
break
}
}).map(entry => recorder(entry.header, entry.parts))
}).map(entry => recordify(entry.header, entry.parts))
if (key != null) {
writes.push(recorder({ method: 'key' }, key))
writes.push(recordify({ method: 'key' }, key))
}
const file = path.resolve(directory, 'pages', id, '0.0')
await fs.writeFile(file, Buffer.concat(writes))
Expand Down

0 comments on commit 1785d8c

Please sign in to comment.