Skip to content

Commit

Permalink
more support for sharded clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Jan 9, 2018
1 parent cabb262 commit daeee14
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions monstache.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ func (args *stringargs) Set(value string) error {
return nil
}

func (config *configOptions) isSharded() bool {
return config.MongoConfigURL != ""
}

func (config *configOptions) parseElasticsearchVersion(number string) (err error) {
if number == "" {
err = errors.New("elasticsearch version cannot be blank")
Expand Down Expand Up @@ -1251,7 +1255,7 @@ func (config *configOptions) setDefaults() *configOptions {
}

func (config *configOptions) getAuthURL(inURL string) string {
cred := strings.SplitN(config.MongoConfigURL, "@", 2)
cred := strings.SplitN(config.MongoURL, "@", 2)
if len(cred) == 2 {
return cred[0] + "@" + inURL
} else {
Expand Down Expand Up @@ -2096,6 +2100,20 @@ func notifySd(config *configOptions) {
}
}

func (config *configOptions) makeShardInsertHandler() gtm.ShardInsertHandler {
return func(shardInfo *gtm.ShardInfo) (*mgo.Session, error) {
infoLog.Printf("Adding shard found at %s\n", shardInfo.GetURL())
shardURL := config.getAuthURL(shardInfo.GetURL())
shard, err := config.dialMongo(shardURL)
if err == nil {
config.configureMongo(shard)
return shard, nil
} else {
return nil, err
}
}
}

func main() {
enabled := true
config := &configOptions{
Expand Down Expand Up @@ -2201,7 +2219,7 @@ func main() {
var filter gtm.OpFilter
var directReadFilter gtm.OpFilter
filterChain := []gtm.OpFilter{notMonstache, notSystem, notChunks}
if config.MongoConfigURL != "" {
if config.isSharded() {
filterChain = append(filterChain, notConfig)
}
if config.NsRegex != "" {
Expand Down Expand Up @@ -2250,16 +2268,16 @@ func main() {
errorLog.Panicf("Unable to parse gtm buffer duration %s: %s", config.GtmSettings.BufferDuration, err)
}
var mongos []*mgo.Session
if config.MongoConfigURL != "" {
var configSession *mgo.Session
if config.isSharded() {
// if we have a config server URL then we are running in a sharded cluster
configSession, err := config.dialMongo(config.MongoConfigURL)
configSession, err = config.dialMongo(config.MongoConfigURL)
if err != nil {
errorLog.Panicf("Unable to connect to mongodb config server using URL %s: %s", config.MongoConfigURL, err)
}
config.configureMongo(configSession)
// get the list of shard servers
shardInfos := gtm.GetShards(configSession)
configSession.Close()
if len(shardInfos) == 0 {
errorLog.Fatalln("Shards enabled but none found in config.shards collection")
}
Expand All @@ -2279,7 +2297,7 @@ func main() {
mongos = append(mongos, mongo)
}

gtmCtx := gtm.StartMulti(mongos, &gtm.Options{
gtmOpts := &gtm.Options{
After: after,
Filter: filter,
OpLogDatabaseName: oplogDatabaseName,
Expand All @@ -2295,7 +2313,13 @@ func main() {
DirectReadBatchSize: config.DirectReadBatchSize,
DirectReadersPerCol: config.DirectReadersPerCol,
DirectReadFilter: directReadFilter,
})
}

gtmCtx := gtm.StartMulti(mongos, gtmOpts)

if config.isSharded() {
gtmCtx.AddShardListener(configSession, gtmOpts, config.makeShardInsertHandler())
}
if config.ClusterName != "" {
if enabled {
infoLog.Printf("Starting work for cluster %s", config.ClusterName)
Expand Down

0 comments on commit daeee14

Please sign in to comment.