Skip to content

Commit

Permalink
[CLIENT-2238] Convert batch calls with just a few keys per node in su…
Browse files Browse the repository at this point in the history
…b-batches to Get requests

If the number keys for a sub-batch to a node is equal or less then the value set in BatchPolicy.DirectGetThreshold, the client use direct get instead of batch commands to reduce the load on the server.
  • Loading branch information
khaf committed Mar 24, 2023
1 parent 2a805e7 commit 56ccb91
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 2 deletions.
2 changes: 2 additions & 0 deletions batch_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type batcher interface {
retryBatch(ifc batcher, cluster *Cluster, deadline time.Time, iteration, commandSentCounter int) (bool, error)
generateBatchNodes(*Cluster) ([]*batchNode, error)
setSequence(int, int)

directGet(*Client) error
}

type batchCommand struct {
Expand Down
27 changes: 27 additions & 0 deletions batch_command_exists.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,33 @@ func (cmd *batchCommandExists) parseRecordResults(ifc command, receiveSize int)
return true, nil
}

func (cmd *batchCommandExists) directGet(client *Client) error {
var errs []error
var err error
for _, offset := range cmd.batch.offsets {
cmd.existsArray[offset], err = client.Exists(&cmd.policy.BasePolicy, cmd.keys[offset])
if err != nil {
// Key not found is an error for batch requests
if err == types.ErrKeyNotFound {
continue
}

if err == types.ErrFilteredOut {
cmd.filteredOutCnt++
continue
}

if cmd.policy.AllowPartialResults {
errs = append(errs, err)
continue
} else {
return err
}
}
}
return mergeErrors(errs)
}

func (cmd *batchCommandExists) Execute() error {
return cmd.execute(cmd, true)
}
Expand Down
36 changes: 36 additions & 0 deletions batch_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,42 @@ func (cmd *batchCommandGet) parseRecord(key *Key, opCount int, generation, expir
return newRecord(cmd.node, key, bins, generation, expiration), nil
}

func (cmd *batchCommandGet) directGet(client *Client) error {
var errs []error
for _, offset := range cmd.batch.offsets {
var err error
if cmd.objects == nil {
if (cmd.readAttr & _INFO1_NOBINDATA) == _INFO1_NOBINDATA {
cmd.records[offset], err = client.GetHeader(&cmd.policy.BasePolicy, cmd.keys[offset])
} else {
cmd.records[offset], err = client.Get(&cmd.policy.BasePolicy, cmd.keys[offset], cmd.binNames...)
}
} else {
err = client.getObjectDirect(&cmd.policy.BasePolicy, cmd.keys[offset], cmd.objects[offset])
cmd.objectsFound[offset] = err == nil
}
if err != nil {
// Key not found is an error for batch requests
if err == types.ErrKeyNotFound {
continue
}

if err == types.ErrFilteredOut {
cmd.filteredOutCnt++
continue
}

if cmd.policy.AllowPartialResults {
errs = append(errs, err)
continue
} else {
return err
}
}
}
return mergeErrors(errs)
}

func (cmd *batchCommandGet) Execute() error {
return cmd.execute(cmd, true)
}
Expand Down
33 changes: 33 additions & 0 deletions batch_index_command_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package aerospike

import "github.com/aerospike/aerospike-client-go/types"

type batchIndexCommandGet struct {
batchCommandGet
}
Expand Down Expand Up @@ -54,6 +56,37 @@ func (cmd *batchIndexCommandGet) writeBuffer(ifc command) error {
return cmd.setBatchIndexRead(cmd.policy, cmd.indexRecords, cmd.batch)
}

func (cmd *batchIndexCommandGet) directGet(client *Client) error {
var errs []error
for _, br := range cmd.indexRecords {
var err error
if br.headerOnly() {
br.Record, err = client.GetHeader(&cmd.policy.BasePolicy, br.Key)
} else {
br.Record, err = client.Get(&cmd.policy.BasePolicy, br.Key, br.BinNames...)
}
if err != nil {
// Key not found is an error for batch requests
if err == types.ErrKeyNotFound {
continue
}

if err == types.ErrFilteredOut {
cmd.filteredOutCnt++
continue
}

if cmd.policy.AllowPartialResults {
errs = append(errs, err)
continue
} else {
return err
}
}
}
return mergeErrors(errs)
}

func (cmd *batchIndexCommandGet) Execute() error {
return cmd.execute(cmd, true)
}
Expand Down
12 changes: 12 additions & 0 deletions batch_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ type BatchPolicy struct {
// The downside is extra goroutines will still need to be created (or taken from a goroutine pool).
ConcurrentNodes int // = 1

// DirectGetThreshold determines the maximum number of keys per node which would work around
// the batch request and would turn it into a direct get request.
// Batch requests with few keys per node can have increased latency, due to the way the server
// handles batch requests. Coverting those requests to direct get commands will be beneficial.
// Don't use values greater than 4, it will significantly degrade the performance.
//
// Values:
// <= 0: No threshold. Batch commands will always be used.
// > 0: Batch requests with this many or fewer keys will be converted to direct get requests sequencially.
DirectGetThreshold int // = 1

// Allow batch to be processed immediately in the server's receiving thread when the server
// deems it to be appropriate. If false, the batch will always be processed in separate
// transaction goroutines. This field is only relevant for the new batch index protocol.
Expand Down Expand Up @@ -69,6 +80,7 @@ type BatchPolicy struct {
func NewBatchPolicy() *BatchPolicy {
return &BatchPolicy{
BasePolicy: *NewPolicy(),
DirectGetThreshold: 1,
ConcurrentNodes: 1,
AllowInline: true,
AllowPartialResults: false,
Expand Down
4 changes: 4 additions & 0 deletions batch_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ func NewBatchReadHeader(key *Key) *BatchRead {
func (br *BatchRead) String() string {
return fmt.Sprintf("%s: %v", br.Key, br.BinNames)
}

func (br *BatchRead) headerOnly() bool {
return br.BinNames == nil && !br.ReadAllBins
}
16 changes: 14 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,13 @@ func (clnt *Client) batchExecute(policy *BatchPolicy, batchNodes []*batchNode, c
newCmd := cmd.cloneBatchCommand(batchNode)
go func(cmd command) {
defer wg.Done()
err := cmd.Execute()
var err error
if policy.DirectGetThreshold > 0 && len(batchNode.offsets) <= policy.DirectGetThreshold {
// run direct get commands instead
err = newCmd.directGet(clnt)
} else {
err = cmd.Execute()
}
errm.Lock()
if err != nil {
errs = append(errs, err)
Expand All @@ -1587,7 +1593,13 @@ func (clnt *Client) batchExecute(policy *BatchPolicy, batchNodes []*batchNode, c
go func(cmd command) {
defer sem.Release(1)
defer wg.Done()
err := cmd.Execute()
var err error
if policy.DirectGetThreshold > 0 && len(batchNode.offsets) <= policy.DirectGetThreshold {
// run direct get commands instead
err = newCmd.directGet(clnt)
} else {
err = cmd.Execute()
}
errm.Lock()
if err != nil {
errs = append(errs, err)
Expand Down
16 changes: 16 additions & 0 deletions client_reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ func (clnt *Client) GetObject(policy *BasePolicy, key *Key, obj interface{}) err
return command.Execute()
}

// getObjectDirect reads a record for specified key and puts the result into the provided object.
// The policy can be used to specify timeouts.
// If the policy is nil, the default relevant policy will be used.
func (clnt *Client) getObjectDirect(policy *BasePolicy, key *Key, rval *reflect.Value) error {
policy = clnt.getUsablePolicy(policy)

binNames := objectMappings.getFields(rval.Type())
command, err := newReadCommand(clnt.cluster, policy, key, binNames, nil)
if err != nil {
return err
}

command.object = rval
return command.Execute()
}

// BatchGetObjects reads multiple record headers and bins for specified keys in one batch request.
// The returned objects are in positional order with the original key array order.
// If a key is not found, the positional object will not change, and the positional found boolean will be false.
Expand Down

0 comments on commit 56ccb91

Please sign in to comment.