Skip to content

Commit

Permalink
Move expired and deleted shard logic to MetaStore
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Jun 5, 2015
1 parent ae3e8c7 commit 5e5f2cd
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
22 changes: 22 additions & 0 deletions meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,28 @@ func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *Shar
return nil
}

// ExpiredShardGroups returns the Shard Groups which are considered expired, for the given time.
func (rpi *RetentionPolicyInfo) ExpiredShardGroups(t time.Time) []*ShardGroupInfo {
groups := make([]*ShardGroupInfo, 0)
for i := range rpi.ShardGroups {
if rpi.Duration != 0 && rpi.ShardGroups[i].EndTime.Add(rpi.Duration).Before(t) {
groups = append(groups, &rpi.ShardGroups[i])
}
}
return groups
}

// DeletedShardGroups returns the Shard Groups which are marked as deleted.
func (rpi *RetentionPolicyInfo) DeletedShardGroups() []*ShardGroupInfo {
groups := make([]*ShardGroupInfo, 0)
for i := range rpi.ShardGroups {
if rpi.ShardGroups[i].Deleted() {
groups = append(groups, &rpi.ShardGroups[i])
}
}
return groups
}

// protobuf returns a protocol buffers object.
func (rpi *RetentionPolicyInfo) protobuf() *internal.RetentionPolicyInfo {
return &internal.RetentionPolicyInfo{
Expand Down
13 changes: 13 additions & 0 deletions meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,19 @@ func (s *Store) ShardGroups(database, policy string) (a []ShardGroupInfo, err er
return
}

// VisitRetentionPolicies calls the given function with full retention policy details.
func (s *Store) VisitRetentionPolicies(f func(d DatabaseInfo, r RetentionPolicyInfo)) {
s.read(func(data *Data) error {
for _, di := range data.Databases {
for _, rp := range di.RetentionPolicies {
f(di, rp)
}
}
return nil
})
return
}

// VisitShardGroups calls the given function with full shard group details.
func (s *Store) VisitShardGroups(f func(d DatabaseInfo, r RetentionPolicyInfo, s ShardGroupInfo)) {
s.read(func(data *Data) error {
Expand Down
11 changes: 5 additions & 6 deletions services/retention/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type Service struct {
MetaStore interface {
IsLeader() bool
VisitShardGroups(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo, s meta.ShardGroupInfo))
VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo))
DeleteShardGroup(database, policy string, id uint64) error
}
TSDBStore interface {
Expand Down Expand Up @@ -73,8 +73,8 @@ func (s *Service) deleteShardGroups() {
}
s.logger.Println("retention policy enforcement commencing")

s.MetaStore.VisitShardGroups(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo, g meta.ShardGroupInfo) {
if r.Duration != 0 && g.EndTime.Add(r.Duration).Before(time.Now().UTC()) {
s.MetaStore.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) {
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaStore.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
s.logger.Printf("failed to delete shard group %d from database %s, retention policy %s: %s",
g.ID, d.Name, r.Name, err.Error())
Expand All @@ -83,7 +83,6 @@ func (s *Service) deleteShardGroups() {
g.ID, d.Name, r.Name)
}
}

})
}
}
Expand All @@ -103,8 +102,8 @@ func (s *Service) deleteShards() {
s.logger.Println("retention policy shard deletion commencing")

deletedShardIDs := make(map[uint64]struct{}, 0)
s.MetaStore.VisitShardGroups(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo, g meta.ShardGroupInfo) {
if g.Deleted() {
s.MetaStore.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) {
for _, g := range r.DeletedShardGroups() {
for _, sh := range g.Shards {
deletedShardIDs[sh.ID] = struct{}{}
}
Expand Down

0 comments on commit 5e5f2cd

Please sign in to comment.