Skip to content

Commit

Permalink
Merge pull request #9 from ipfs/fix/update
Browse files Browse the repository at this point in the history
Bring up-to-date and test with the datastore test suite
  • Loading branch information
Stebalien authored Apr 15, 2020
2 parents b24eb8d + 663bc92 commit 3616788
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 79 deletions.
21 changes: 21 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version: 2.1

orbs:
ci-go: ipfs/[email protected]

executors:
go-postgres:
docker:
- image: circleci/golang:1.13.8
- image: circleci/postgres:latest
environment:
POSTGRES_HOST_AUTH_METHOD: trust

workflows:
version: 2
test:
jobs:
- ci-go/build
- ci-go/lint
- ci-go/test:
executor: go-postgres
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ that can be backed by any sql database.
```
import (
"database/sql"
"github.com/whyrusleeping/sql-datastore"
"github.com/ipfs/go-ds-sql"
)
mydb, _ := sql.Open("yourdb", "yourdbparameters")
Expand Down
72 changes: 44 additions & 28 deletions ds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,45 @@ import (
"crypto/rand"
"database/sql"
"fmt"
"io/ioutil"
"os"
"sort"
"strings"
"sync"
"testing"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
dstest "github.com/ipfs/go-datastore/test"
_ "github.com/lib/pq"
)

// Tests in this package require a postgres database named "test_datastore"
var initOnce sync.Once

// Automatically re-create the test datastore.
func initPG() {
initOnce.Do(func() {
fmtstr := "postgres://%s:%s@%s/?sslmode=disable"
constr := fmt.Sprintf(fmtstr, "postgres", "", "127.0.0.1")
db, err := sql.Open("postgres", constr)
if err != nil {
panic(err)
}

// drop/create the database.
_, err = db.Exec("DROP DATABASE IF EXISTS test_datastore")
if err != nil {
panic(err)
}
_, err = db.Exec("CREATE DATABASE test_datastore")
if err != nil {
panic(err)
}
err = db.Close()
if err != nil {
panic(err)
}
})
}

var testcases = map[string]string{
"/a": "a",
"/a/b": "ab",
Expand Down Expand Up @@ -44,7 +71,7 @@ func (fakeQueries) Get() string {
}

func (fakeQueries) Put() string {
return `INSERT INTO blocks (key, data) SELECT $1, $2 WHERE NOT EXISTS ( SELECT key FROM blocks WHERE key = $1)`
return `INSERT INTO blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2`
}

func (fakeQueries) Query() string {
Expand Down Expand Up @@ -72,10 +99,8 @@ func (fakeQueries) GetSize() string {
// d, close := newDS(t)
// defer close()
func newDS(t *testing.T) (*Datastore, func()) {
path, err := ioutil.TempDir("/tmp", "testing_postgres_")
if err != nil {
t.Fatal(err)
}
initPG()
// connect to that database.
fmtstr := "postgres://%s:%s@%s/%s?sslmode=disable"
constr := fmt.Sprintf(fmtstr, "postgres", "", "127.0.0.1", "test_datastore")
db, err := sql.Open("postgres", constr)
Expand All @@ -88,8 +113,7 @@ func newDS(t *testing.T) (*Datastore, func()) {
}
d := NewDatastore(db, fakeQueries{})
return d, func() {
os.RemoveAll(path)
d.db.Exec("DROP TABLE IF EXISTS blocks")
_, _ = d.db.Exec("DROP TABLE IF EXISTS blocks")
d.Close()
}
}
Expand All @@ -102,14 +126,6 @@ func addTestCases(t *testing.T, d *Datastore, testcases map[string]string) {
}
}

err := d.Put(ds.NewKey("/foo"), nil)
if err != ds.ErrInvalidType {
t.Error("Expected err to be ds.ErrInvalidType")
if err != nil {
t.Fatal(err)
}
}

for k, v := range testcases {
dsk := ds.NewKey(k)
v2, err := d.Get(dsk)
Expand Down Expand Up @@ -328,14 +344,6 @@ func TestBatching(t *testing.T) {
}
}

err = b.Put(ds.NewKey("/foo"), nil)
if err != ds.ErrInvalidType {
t.Error("Expected err to be ds.ErrInvalidType")
if err != nil {
t.Fatal(err)
}
}

err = b.Commit()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -521,7 +529,7 @@ func SubtestManyKeysAndQuery(t *testing.T) {
keystrs = append(keystrs, dsk.String())
keys = append(keys, dsk)
buf := make([]byte, 64)
rand.Read(buf)
_, _ = rand.Read(buf)
values = append(values, buf)
}

Expand Down Expand Up @@ -658,7 +666,7 @@ func TestManyKeysAndQuery(t *testing.T) {
keystrs = append(keystrs, dsk.String())
keys = append(keys, dsk)
buf := make([]byte, 64)
rand.Read(buf)
_, _ = rand.Read(buf)
values = append(values, buf)
}

Expand Down Expand Up @@ -727,7 +735,15 @@ func TestManyKeysAndQuery(t *testing.T) {
SubtestManyKeysAndQuery(t)
}

func TestSuite(t *testing.T) {
d, done := newDS(t)
defer done()

dstest.SubtestAll(t, d)
}

func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
t.Helper()
actual, err := actualR.Rest()
if err != nil {
t.Error(err)
Expand Down
113 changes: 65 additions & 48 deletions dstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"database/sql"
"errors"
"fmt"
"log"

ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -46,7 +45,8 @@ func (b *batch) GetTransaction() (*sql.Tx, error) {
newTransaction, err := b.db.Begin()
if err != nil {
if newTransaction != nil {
newTransaction.Rollback()
// nothing we can do about this error.
_ = newTransaction.Rollback()
}

return nil, err
Expand All @@ -57,19 +57,15 @@ func (b *batch) GetTransaction() (*sql.Tx, error) {
}

func (b *batch) Put(key ds.Key, val []byte) error {
if val == nil {
return ds.ErrInvalidType
}

txn, err := b.GetTransaction()
if err != nil {
b.txn.Rollback()
_ = b.txn.Rollback()
return err
}

_, err = txn.Exec(b.queries.Put(), key.String(), val)
if err != nil {
b.txn.Rollback()
_ = b.txn.Rollback()
return err
}

Expand All @@ -79,12 +75,13 @@ func (b *batch) Put(key ds.Key, val []byte) error {
func (b *batch) Delete(key ds.Key) error {
txn, err := b.GetTransaction()
if err != nil {
b.txn.Rollback()
_ = b.txn.Rollback()
return err
}

_, err = txn.Exec(b.queries.Delete(), key.String())
if err != nil {
b.txn.Rollback()
_ = b.txn.Rollback()
return err
}

Expand All @@ -97,7 +94,7 @@ func (b *batch) Commit() error {
}
var err = b.txn.Commit()
if err != nil {
b.txn.Rollback()
_ = b.txn.Rollback()
return err
}

Expand Down Expand Up @@ -164,10 +161,6 @@ func (d *Datastore) Has(key ds.Key) (exists bool, err error) {
}

func (d *Datastore) Put(key ds.Key, value []byte) error {
if value == nil {
return ds.ErrInvalidType
}

_, err := d.db.Exec(d.queries.Put(), key.String(), value)
if err != nil {
return err
Expand All @@ -188,45 +181,63 @@ func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {

raw = dsq.NaiveOrder(raw, q.Orders...)

// if we have filters or orders, offset and limit won't have been applied in the query
if len(q.Filters) > 0 || len(q.Orders) > 0 {
if q.Offset != 0 {
raw = dsq.NaiveOffset(raw, q.Offset)
}
if q.Limit != 0 {
raw = dsq.NaiveLimit(raw, q.Limit)
}
}

return raw, nil
}

func (d *Datastore) RawQuery(q dsq.Query) (dsq.Results, error) {
var rows *sql.Rows
var err error

if q.Prefix != "" {
rows, err = QueryWithParams(d, q)
} else {
rows, err = d.db.Query(d.queries.Query())
}

rows, err = QueryWithParams(d, q)
if err != nil {
return nil, err
}

var entries []dsq.Entry
defer rows.Close()

for rows.Next() {
var key string
var out []byte
err := rows.Scan(&key, &out)

if err != nil {
log.Fatal("Error reading rows from query")
}

entry := dsq.Entry{
Key: key,
Value: out,
}

entries = append(entries, entry)
it := dsq.Iterator{
Next: func() (dsq.Result, bool) {
if !rows.Next() {
return dsq.Result{}, false
}

var key string
var out []byte

err := rows.Scan(&key, &out)
if err != nil {
return dsq.Result{Error: err}, false
}

entry := dsq.Entry{Key: key}

if !q.KeysOnly {
entry.Value = out
}
if q.ReturnsSizes {
entry.Size = len(out)
}

return dsq.Result{Entry: entry}, true
},
Close: func() error {
return rows.Close()
},
}

results := dsq.ResultsWithEntries(q, entries)
return results, nil
return dsq.ResultsFromIterator(q, it), nil
}

func (d *Datastore) Sync(key ds.Key) error {
return nil
}

func (d *Datastore) GetSize(key ds.Key) (int, error) {
Expand All @@ -248,15 +259,21 @@ func QueryWithParams(d *Datastore, q dsq.Query) (*sql.Rows, error) {
var qNew = d.queries.Query()

if q.Prefix != "" {
qNew += fmt.Sprintf(d.queries.Prefix(), q.Prefix)
}

if q.Limit != 0 {
qNew += fmt.Sprintf(d.queries.Limit(), q.Limit)
// normalize
prefix := ds.NewKey(q.Prefix).String()
if prefix != "/" {
qNew += fmt.Sprintf(d.queries.Prefix(), prefix+"/")
}
}

if q.Offset != 0 {
qNew += fmt.Sprintf(d.queries.Offset(), q.Offset)
// only apply limit and offset if we do not have to naive filter/order the results
if len(q.Filters) == 0 && len(q.Orders) == 0 {
if q.Limit != 0 {
qNew += fmt.Sprintf(d.queries.Limit(), q.Limit)
}
if q.Offset != 0 {
qNew += fmt.Sprintf(d.queries.Offset(), q.Offset)
}
}

return d.db.Query(qNew)
Expand Down
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/ipfs/go-ds-sql

go 1.12

require (
github.com/ipfs/go-datastore v0.4.4
github.com/lib/pq v1.3.0
)
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/ipfs/go-datastore v0.4.4 h1:rjvQ9+muFaJ+QZ7dN5B1MSDNQ0JVZKkkES/rMZmA8X8=
github.com/ipfs/go-datastore v0.4.4/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA=
github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Loading

0 comments on commit 3616788

Please sign in to comment.