Skip to content

Commit

Permalink
Merge pull request RoaringBitmap#384 from anacrolix/clean-bsi-streaming
Browse files Browse the repository at this point in the history
Add BSI WriteTo, ReadFrom and Equal
  • Loading branch information
lemire authored Mar 21, 2023
2 parents 3825d76 + 64586e9 commit 9a63dd3
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 17 deletions.
24 changes: 24 additions & 0 deletions roaring64/bsi64-fuzz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//go:build go1.18
// +build go1.18

package roaring64

import "testing"

func FuzzBsiStreaming(f *testing.F) {
f.Fuzz(func(t *testing.T, b []byte) {
slice, err := bytesToBsiColValPairs(b)
if err != nil {
t.SkipNow()
}
cols := make(map[uint64]struct{}, len(slice))
for _, pair := range slice {
_, ok := cols[pair.col]
if ok {
t.Skip("duplicate column")
}
cols[pair.col] = struct{}{}
}
testBsiRoundTrip(t, slice)
})
}
77 changes: 73 additions & 4 deletions roaring64/bsi64.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package roaring64

import (
"fmt"
"io"
"math/bits"
"runtime"
"sync"
Expand Down Expand Up @@ -261,7 +262,6 @@ type task struct {
// For the RANGE parameter the comparison criteria is >= valueOrStart and <= end.
// The parallelism parameter indicates the number of CPU threads to be applied for processing. A value
// of zero indicates that all available CPU resources will be potentially utilized.
//
func (b *BSI) CompareValue(parallelism int, op Operation, valueOrStart, end int64,
foundSet *Bitmap) *Bitmap {

Expand Down Expand Up @@ -522,7 +522,6 @@ func (b *BSI) minOrMax(op Operation, batch []uint64, resultsChan chan int64, wg

// Sum all values contained within the foundSet. As a convenience, the cardinality of the foundSet
// is also returned (for calculating the average).
//
func (b *BSI) Sum(foundSet *Bitmap) (sum int64, count uint64) {

count = foundSet.GetCardinality()
Expand All @@ -549,7 +548,6 @@ func (b *BSI) Transpose() *Bitmap {
// vectoring one set of integers to another.
//
// TODO: This implementation is functional but not performant, needs to be re-written perhaps using SIMD SSE2 instructions.
//
func (b *BSI) IntersectAndTranspose(parallelism int, foundSet *Bitmap) *Bitmap {

trans := &task{bsi: b}
Expand Down Expand Up @@ -674,6 +672,39 @@ func (b *BSI) UnmarshalBinary(bitData [][]byte) error {
return nil
}

// ReadFrom reads a serialized version of this BSI from stream.
func (b *BSI) ReadFrom(stream io.Reader) (p int64, err error) {
bm, n, err := readBSIContainerFromStream(stream)
p += n
if err != nil {
err = fmt.Errorf("reading existence bitmap: %w", err)
return
}
b.eBM = &bm
b.bA = b.bA[:0]
for {
// This forces a new memory location to be allocated and if we're lucky it only escapes if
// there's no error.
var bm Bitmap
bm, n, err = readBSIContainerFromStream(stream)
p += n
if err == io.EOF {
err = nil
return
}
if err != nil {
err = fmt.Errorf("reading bit slice index %v: %w", len(b.bA), err)
return
}
b.bA = append(b.bA, &bm)
}
}

func readBSIContainerFromStream(r io.Reader) (bm Bitmap, p int64, err error) {
p, err = bm.ReadFrom(r)
return
}

// MarshalBinary serializes a BSI
func (b *BSI) MarshalBinary() ([][]byte, error) {

Expand All @@ -694,6 +725,23 @@ func (b *BSI) MarshalBinary() ([][]byte, error) {
return data, nil
}

// WriteTo writes a serialized version of this BSI to stream.
func (b *BSI) WriteTo(w io.Writer) (n int64, err error) {
n1, err := b.eBM.WriteTo(w)
n += n1
if err != nil {
return
}
for _, bm := range b.bA {
n1, err = bm.WriteTo(w)
n += n1
if err != nil {
return
}
}
return
}

// BatchEqual returns a bitmap containing the column IDs where the values are contained within the list of values provided.
func (b *BSI) BatchEqual(parallelism int, values []int64) *Bitmap {

Expand Down Expand Up @@ -811,7 +859,6 @@ func (b *BSI) addDigit(foundSet *Bitmap, i int) {
// contained within the input BSI. Given that for BSIs, different columnIDs can have the same value. TransposeWithCounts
// is useful for situations where there is a one-to-many relationship between the vectored integer sets. The resulting BSI
// contains the number of times a particular value appeared in the input BSI.
//
func (b *BSI) TransposeWithCounts(parallelism int, foundSet, filterSet *Bitmap) *BSI {

return parallelExecutorBSIResults(parallelism, b, transposeWithCounts, foundSet, filterSet, true)
Expand Down Expand Up @@ -851,3 +898,25 @@ func (b *BSI) Increment(foundSet *Bitmap) {
func (b *BSI) IncrementAll() {
b.Increment(b.GetExistenceBitmap())
}

func (b *BSI) Equals(other *BSI) bool {
if !b.eBM.Equals(other.eBM) {
return false
}
for i := 0; i < len(b.bA) || i < len(other.bA); i++ {
if i >= len(b.bA) {
if !other.bA[i].IsEmpty() {
return false
}
} else if i >= len(other.bA) {
if !b.bA[i].IsEmpty() {
return false
}
} else {
if !b.bA[i].Equals(other.bA[i]) {
return false
}
}
}
return true
}
147 changes: 134 additions & 13 deletions roaring64/bsi64_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package roaring64

import (
"bytes"
"encoding/binary"
"fmt"
_ "fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io"
"io/ioutil"
"math/rand"
"os"
"sort"
"testing"
"time"
"fmt"
"os"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSetAndGet(t *testing.T) {
Expand Down Expand Up @@ -265,47 +270,47 @@ func TestNewBSIRetainSet(t *testing.T) {
func TestLargeFile(t *testing.T) {

datEBM, err := ioutil.ReadFile("./testdata/age/EBM")
if(err != nil) {
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nIMPORTANT: For testing file IO, the roaring library requires disk access.\nWe omit some tests for now.\n\n")
return
}
dat1, err := ioutil.ReadFile("./testdata/age/1")
if(err != nil) {
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nIMPORTANT: For testing file IO, the roaring library requires disk access.\nWe omit some tests for now.\n\n")
return
}
dat2, err := ioutil.ReadFile("./testdata/age/2")
if(err != nil) {
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nIMPORTANT: For testing file IO, the roaring library requires disk access.\nWe omit some tests for now.\n\n")
return
}
dat3, err := ioutil.ReadFile("./testdata/age/3")
if(err != nil) {
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nIMPORTANT: For testing file IO, the roaring library requires disk access.\nWe omit some tests for now.\n\n")
return
}
dat4, err := ioutil.ReadFile("./testdata/age/4")
if(err != nil) {
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nIMPORTANT: For testing file IO, the roaring library requires disk access.\nWe omit some tests for now.\n\n")
return
}
dat5, err := ioutil.ReadFile("./testdata/age/5")
if(err != nil) {
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nIMPORTANT: For testing file IO, the roaring library requires disk access.\nWe omit some tests for now.\n\n")
return
}
dat6, err := ioutil.ReadFile("./testdata/age/6")
if(err != nil) {
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nIMPORTANT: For testing file IO, the roaring library requires disk access.\nWe omit some tests for now.\n\n")
return
}
dat7, err := ioutil.ReadFile("./testdata/age/7")
if(err != nil) {
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nIMPORTANT: For testing file IO, the roaring library requires disk access.\nWe omit some tests for now.\n\n")
return
}
dat8, err := ioutil.ReadFile("./testdata/age/8")
if(err != nil) {
if err != nil {
fmt.Fprintf(os.Stderr, "\n\nIMPORTANT: For testing file IO, the roaring library requires disk access.\nWe omit some tests for now.\n\n")
return
}
Expand Down Expand Up @@ -455,3 +460,119 @@ func TestMinMaxWithRandom(t *testing.T) {
assert.Equal(t, bsi.MinValue, bsi.MinMax(0, MIN, bsi.GetExistenceBitmap()))
assert.Equal(t, bsi.MaxValue, bsi.MinMax(0, MAX, bsi.GetExistenceBitmap()))
}

func TestBSIWriteToReadFrom(t *testing.T) {
file, err := ioutil.TempFile("./testdata", "bsi-test")
if err != nil {
t.Fatal(err)
}
defer t.Cleanup(func() { os.Remove(file.Name()) })
defer file.Close()
bsi := setupRandom()
_, err = bsi.WriteTo(file)
if err != nil {
t.Fatal(err)
}

file.Seek(io.SeekStart, 0)

bsi2 := NewDefaultBSI()
_, err3 := bsi2.ReadFrom(file)
if err3 != nil {
t.Fatal(err3)
}
assert.True(t, bsi.Equals(bsi2))
assert.Equal(t, bsi.MinValue, bsi2.MinMax(0, MIN, bsi2.GetExistenceBitmap()))
assert.Equal(t, bsi.MaxValue, bsi2.MinMax(0, MAX, bsi2.GetExistenceBitmap()))
}

type bsiColValPair struct {
col uint64
val int64
}

func bytesToBsiColValPairs(b []byte) (slice []bsiColValPair, err error) {
r := bytes.NewReader(b)
for {
var pair bsiColValPair
pair.col, err = binary.ReadUvarint(r)
if err == io.EOF {
err = nil
return
}
if err != nil {
return
}
pair.val, err = binary.ReadVarint(r)
if err != nil {
return
}
slice = append(slice, pair)
}
}

// Checks that the given column values write out and read back in to a BSI without changing. Slice
// should not have duplicate column indexes, as iterator will not render duplicates to match,
// and the BSI will contain the last value set.
func testBsiRoundTrip(t *testing.T, pairs []bsiColValPair) {
bsi := NewDefaultBSI()
for _, pair := range pairs {
bsi.SetValue(pair.col, pair.val)
}
var buf bytes.Buffer
_, err := bsi.WriteTo(&buf)
if err != nil {
t.Fatal(err)
}
_, err = bsi.ReadFrom(&buf)
if err != nil {
t.Fatal(err)
}
it := bsi.GetExistenceBitmap().Iterator()
// The column ordering needs to match the one given by the iterator. This reorders the caller's
// slice.
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].col < pairs[j].col
})
for _, pair := range pairs {
if !it.HasNext() {
t.Fatalf("expected more columns: %v", pair.col)
}
bsiCol := it.Next()
if pair.col != bsiCol {
t.Fatalf("expected col %d, got %d", pair.col, bsiCol)
}
bsiVal, ok := bsi.GetValue(bsiCol)
if !ok {
t.Fatalf("expected col %d to exist", bsiCol)
}
if pair.val != bsiVal {
t.Fatalf("expected col %d to have value %d, got %d", bsiCol, pair.val, bsiVal)
}
}
if it.HasNext() {
t.Fatal("expected no more columns")
}

}

func TestBsiStreaming(t *testing.T) {
testBsiRoundTrip(t, []bsiColValPair{})
testBsiRoundTrip(t, []bsiColValPair{{0, 0}})
testBsiRoundTrip(t, []bsiColValPair{{48, 0}})
}

// Test that the BSI can be mutated and still be equal to a fresh BSI with the same values.
func TestMutatedBsiEquality(t *testing.T) {
mutated := NewDefaultBSI()
mutated.SetValue(0, 2)
mutated.SetValue(0, 1)
fresh := NewDefaultBSI()
fresh.SetValue(0, 1)
assert.True(t, fresh.Equals(mutated))
fresh.SetValue(0, 2)
assert.False(t, fresh.Equals(mutated))
// Now fresh has been mutated in the same pattern as mutated.
fresh.SetValue(0, 1)
assert.True(t, fresh.Equals(mutated))
}

0 comments on commit 9a63dd3

Please sign in to comment.