From daeee143460d615e5c59e14377030e40876d856a Mon Sep 17 00:00:00 2001 From: Ryan Wynn Date: Tue, 9 Jan 2018 16:44:35 -0500 Subject: [PATCH] more support for sharded clusters --- monstache.go | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/monstache.go b/monstache.go index 775dd6a..18bfdb5 100644 --- a/monstache.go +++ b/monstache.go @@ -220,6 +220,10 @@ func (args *stringargs) Set(value string) error { return nil } +func (config *configOptions) isSharded() bool { + return config.MongoConfigURL != "" +} + func (config *configOptions) parseElasticsearchVersion(number string) (err error) { if number == "" { err = errors.New("elasticsearch version cannot be blank") @@ -1251,7 +1255,7 @@ func (config *configOptions) setDefaults() *configOptions { } func (config *configOptions) getAuthURL(inURL string) string { - cred := strings.SplitN(config.MongoConfigURL, "@", 2) + cred := strings.SplitN(config.MongoURL, "@", 2) if len(cred) == 2 { return cred[0] + "@" + inURL } else { @@ -2096,6 +2100,20 @@ func notifySd(config *configOptions) { } } +func (config *configOptions) makeShardInsertHandler() gtm.ShardInsertHandler { + return func(shardInfo *gtm.ShardInfo) (*mgo.Session, error) { + infoLog.Printf("Adding shard found at %s\n", shardInfo.GetURL()) + shardURL := config.getAuthURL(shardInfo.GetURL()) + shard, err := config.dialMongo(shardURL) + if err == nil { + config.configureMongo(shard) + return shard, nil + } else { + return nil, err + } + } +} + func main() { enabled := true config := &configOptions{ @@ -2201,7 +2219,7 @@ func main() { var filter gtm.OpFilter var directReadFilter gtm.OpFilter filterChain := []gtm.OpFilter{notMonstache, notSystem, notChunks} - if config.MongoConfigURL != "" { + if config.isSharded() { filterChain = append(filterChain, notConfig) } if config.NsRegex != "" { @@ -2250,16 +2268,16 @@ func main() { errorLog.Panicf("Unable to parse gtm buffer duration %s: %s", config.GtmSettings.BufferDuration, err) } var mongos []*mgo.Session - if config.MongoConfigURL != "" { + var configSession *mgo.Session + if config.isSharded() { // if we have a config server URL then we are running in a sharded cluster - configSession, err := config.dialMongo(config.MongoConfigURL) + configSession, err = config.dialMongo(config.MongoConfigURL) if err != nil { errorLog.Panicf("Unable to connect to mongodb config server using URL %s: %s", config.MongoConfigURL, err) } config.configureMongo(configSession) // get the list of shard servers shardInfos := gtm.GetShards(configSession) - configSession.Close() if len(shardInfos) == 0 { errorLog.Fatalln("Shards enabled but none found in config.shards collection") } @@ -2279,7 +2297,7 @@ func main() { mongos = append(mongos, mongo) } - gtmCtx := gtm.StartMulti(mongos, >m.Options{ + gtmOpts := >m.Options{ After: after, Filter: filter, OpLogDatabaseName: oplogDatabaseName, @@ -2295,7 +2313,13 @@ func main() { DirectReadBatchSize: config.DirectReadBatchSize, DirectReadersPerCol: config.DirectReadersPerCol, DirectReadFilter: directReadFilter, - }) + } + + gtmCtx := gtm.StartMulti(mongos, gtmOpts) + + if config.isSharded() { + gtmCtx.AddShardListener(configSession, gtmOpts, config.makeShardInsertHandler()) + } if config.ClusterName != "" { if enabled { infoLog.Printf("Starting work for cluster %s", config.ClusterName)