Skip to content

Commit

Permalink
Get common code changes ported.
Browse files Browse the repository at this point in the history
  • Loading branch information
sivakku committed Nov 24, 2016
1 parent 7818c81 commit 163b634
Show file tree
Hide file tree
Showing 12 changed files with 549 additions and 5 deletions.
60 changes: 59 additions & 1 deletion common/backoff/retry.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,73 @@
package backoff

import "time"
import (
"sync"
"time"
)

type (
// Operation to retry
Operation func() error

// IsRetryable handler can be used to exclude certain errors during retry
IsRetryable func(error) bool

// ConcurrentRetrier is used for client-side throttling. It determines whether to
// throttle outgoing traffic in case downstream backend server rejects
// requests due to out-of-quota or server busy errors.
ConcurrentRetrier struct {
sync.Mutex
retrier Retrier // Backoff retrier
failureCount int64 // Number of consecutive failures seen
}
)

// Throttle Sleep if there were failures since the last success call.
func (c *ConcurrentRetrier) Throttle() {
c.throttleInternal()
}

func (c *ConcurrentRetrier) throttleInternal() time.Duration {
next := done

// Check if we have failure count.
failureCount := c.failureCount
if failureCount > 0 {
defer c.Unlock()
c.Lock()
if c.failureCount > 0 {
next = c.retrier.NextBackOff()
}
}

if next != done {
time.Sleep(next)
}

return next
}

// Succeeded marks client request succeeded.
func (c *ConcurrentRetrier) Succeeded() {
defer c.Unlock()
c.Lock()
c.failureCount = 0
c.retrier.Reset()
}

// Failed marks client request failed because backend is busy.
func (c *ConcurrentRetrier) Failed() {
defer c.Unlock()
c.Lock()
c.failureCount++
}

// NewConcurrentRetrier returns an instance of concurrent backoff retrier.
func NewConcurrentRetrier(retryPolicy RetryPolicy) *ConcurrentRetrier {
retrier := NewRetrier(retryPolicy, SystemClock)
return &ConcurrentRetrier{retrier: retrier}
}

// Retry function can be used to wrap any call with retry logic using the passed in policy
func Retry(operation Operation, policy RetryPolicy, isRetryable IsRetryable) error {
var err error
Expand Down
46 changes: 46 additions & 0 deletions common/backoff/retry_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backoff

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -116,6 +117,51 @@ func (s *RetrySuite) TestIsRetryableFailure() {
s.Equal(1, i)
}

func (s *RetrySuite) TestConcurrentRetrier() {
policy := NewExponentialRetryPolicy(1 * time.Millisecond)
policy.SetMaximumInterval(10 * time.Millisecond)
policy.SetMaximumAttempts(4)

// Basic checks
retrier := NewConcurrentRetrier(policy)
retrier.Failed()
s.Equal(int64(1), retrier.failureCount)
retrier.Succeeded()
s.Equal(int64(0), retrier.failureCount)
sleepDuration := retrier.throttleInternal()
s.Equal(done, sleepDuration)

// Multiple count check.
retrier.Failed()
retrier.Failed()
s.Equal(int64(2), retrier.failureCount)
// Verify valid sleep times.
ch := make(chan time.Duration, 3)
go func() {
for i := 0; i < 3; i++ {
ch <- retrier.throttleInternal()
}
}()
for i := 0; i < 3; i++ {
val := <-ch
fmt.Printf("Duration: %d\n", val)
s.True(val > 0)
}
retrier.Succeeded()
s.Equal(int64(0), retrier.failureCount)
// Verify we don't have any sleep times.
go func() {
for i := 0; i < 3; i++ {
ch <- retrier.throttleInternal()
}
}()
for i := 0; i < 3; i++ {
val := <-ch
fmt.Printf("Duration: %d\n", val)
s.Equal(done, val)
}
}

func (e *someError) Error() string {
return "Some Error"
}
9 changes: 5 additions & 4 deletions common/backoff/retrypolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

const (
// NoInterval represents Maximim interval
NoInterval = 0
done time.Duration = -1
noMaximumAttempts = 0
noInterval = 0

defaultBackoffCoefficient = 2.0
defaultMaximumInterval = 10 * time.Second
Expand Down Expand Up @@ -117,7 +118,7 @@ func (p *ExponentialRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, num
}

// Stop retrying after expiration interval is elasped
if p.expirationInterval != noInterval && elapsedTime > p.expirationInterval {
if p.expirationInterval != NoInterval && elapsedTime > p.expirationInterval {
return done
}

Expand All @@ -126,11 +127,11 @@ func (p *ExponentialRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, num
if nextInterval <= 0 {
return done
}
if p.maximumInterval != noInterval {
if p.maximumInterval != NoInterval {
nextInterval = math.Min(nextInterval, float64(p.maximumInterval))
}

if p.expirationInterval != noInterval {
if p.expirationInterval != NoInterval {
remainingTime := float64(math.Max(0, float64(p.expirationInterval-elapsedTime)))
nextInterval = math.Min(remainingTime, nextInterval)
}
Expand Down
79 changes: 79 additions & 0 deletions common/cassandra_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package common

import (
"fmt"
"strings"

"bytes"
"os/exec"

log "github.com/Sirupsen/logrus"
"github.com/gocql/gocql"
)

// NewCassandraCluster creates a cassandra cluster given comma separated list of clusterHosts
func NewCassandraCluster(clusterHosts string) *gocql.ClusterConfig {
var hosts []string
for _, h := range strings.Split(clusterHosts, ",") {
if host := strings.TrimSpace(h); len(host) > 0 {
hosts = append(hosts, host)
}
}

return gocql.NewCluster(hosts...)
}

// CreateCassandraKeyspace creates the keyspace using this session for given replica count
func CreateCassandraKeyspace(s *gocql.Session, keyspace string, replicas int, overwrite bool) (err error) {
// if overwrite flag is set, drop the keyspace and create a new one
if overwrite {
DropCassandraKeyspace(s, keyspace)
}
err = s.Query(fmt.Sprintf(`CREATE KEYSPACE %s WITH replication = {
'class' : 'SimpleStrategy', 'replication_factor' : %d}`, keyspace, replicas)).Exec()
if err != nil {
log.WithField(TagErr, err).Error(`create keyspace error`)
return
}
log.WithField(`keyspace`, keyspace).Info(`created namespace`)

return
}

// DropCassandraKeyspace drops the given keyspace, if it exists
func DropCassandraKeyspace(s *gocql.Session, keyspace string) (err error) {
err = s.Query(fmt.Sprintf("DROP KEYSPACE IF EXISTS %s", keyspace)).Exec()
if err != nil {
log.WithField(TagErr, err).Error(`drop keyspace error`)
return
}
log.WithField(`keyspace`, keyspace).Info(`dropped namespace`)
return
}

// LoadCassandraSchema loads the schema from the given .cql file on this keyspace using cqlsh
func LoadCassandraSchema(cqlshpath string, fileName string, keyspace string) (err error) {
// Using cqlsh as I couldn't find a way to execute multiple commands through gocql.Session
var out bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command(cqlshpath, fmt.Sprintf("--keyspace=%v", keyspace), fmt.Sprintf("--file=%v", fileName))
cmd.Stdout = &out
cmd.Stderr = &stderr
err = cmd.Run()

// CQLSH doesn't return non-zero for some errors
if err != nil || strings.Contains(stderr.String(), `Errno`) {
err = fmt.Errorf("LoadSchema %v returned %v. STDERR: %v", cmd.Path, err, stderr.String())
}
return
}

// CQLTimestampToUnixNano converts CQL timestamp to UnixNano
func CQLTimestampToUnixNano(milliseconds int64) int64 {
return milliseconds * 1000 * 1000 // Milliseconds are 10⁻³, nanoseconds are 10⁻⁹, (-3) - (-9) = 6, so multiply by 10⁶
}

// UnixNanoToCQLTimestamp converts UnixNano to CQL timestamp
func UnixNanoToCQLTimestamp(timestamp int64) int64 {
return timestamp / (1000 * 1000) // Milliseconds are 10⁻³, nanoseconds are 10⁻⁹, (-9) - (-3) = -6, so divide by 10⁶
}
29 changes: 29 additions & 0 deletions common/convert.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package common

import (
m "code.uber.internal/devexp/minions/.gen/go/minions"
)

// IntPtr makes a copy and returns the pointer to an int.
func IntPtr(v int) *int {
return &v
Expand Down Expand Up @@ -39,3 +43,28 @@ func BoolPtr(v bool) *bool {
func StringPtr(v string) *string {
return &v
}

// TaskListPtr makes a copy and returns the pointer to a TaskList.
func TaskListPtr(v m.TaskList) *m.TaskList {
return &v
}

// ActivityTypePtr makes a copy and returns the pointer to a ActivityType.
func ActivityTypePtr(v m.ActivityType) *m.ActivityType {
return &v
}

// DecisionTypePtr makes a copy and returns the pointer to a DecisionType.
func DecisionTypePtr(t m.DecisionType) *m.DecisionType {
return &t
}

// EventTypePtr makes a copy and returns the pointer to a EventType.
func EventTypePtr(t m.EventType) *m.EventType {
return &t
}

// WorkflowTypePtr makes a copy and returns the pointer to a WorkflowType.
func WorkflowTypePtr(t m.WorkflowType) *m.WorkflowType {
return &t
}
10 changes: 10 additions & 0 deletions common/daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package common

type (
// Daemon is the base interfaces implemented by
// background tasks within cherami
Daemon interface {
Start()
Stop()
}
)
4 changes: 4 additions & 0 deletions common/log_tag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package common

// TagErr is the tag for error object message
const TagErr = `err`
Loading

0 comments on commit 163b634

Please sign in to comment.