Skip to content

Commit

Permalink
This PR fixes two things (minio#8772)
Browse files Browse the repository at this point in the history
- Stop spawning store replay routines when testing the notification targets
- Properly honor the target.Close() to clean the resources used

Fixes minio#8707

Co-authored-by: Harshavardhana <[email protected]>
  • Loading branch information
2 people authored and nitisht committed Jan 9, 2020
1 parent c2cde6b commit 4cd1bbb
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 41 deletions.
6 changes: 5 additions & 1 deletion cmd/admin-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,9 @@ func (a adminAPIHandlers) ServerInfoHandler(w http.ResponseWriter, r *http.Reque

func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus {
lambdaMap := make(map[string][]madmin.TargetIDStatus)
targetList, _ := notify.GetNotificationTargets(cfg, GlobalServiceDoneCh, NewCustomHTTPTransport())

// Fetch the targets
targetList, _ := notify.RegisterNotificationTargets(cfg, GlobalServiceDoneCh, NewCustomHTTPTransport(), nil, true)

for targetID, target := range targetList.TargetMap() {
targetIDStatus := make(map[string]madmin.Status)
Expand All @@ -1470,6 +1472,8 @@ func fetchLambdaInfo(cfg config.Config) []map[string][]madmin.TargetIDStatus {
list := lambdaMap[targetID.Name]
list = append(list, targetIDStatus)
lambdaMap[targetID.Name] = list
// Close any leaking connections
_ = target.Close()
}

notify := make([]map[string][]madmin.TargetIDStatus, len(lambdaMap))
Expand Down
20 changes: 10 additions & 10 deletions cmd/config/notify/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
if !args.Enable {
continue
}
newTarget, err := target.NewAMQPTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewAMQPTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
}
Expand All @@ -134,7 +134,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
if !args.Enable {
continue
}
newTarget, err := target.NewElasticsearchTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewElasticsearchTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err

Expand All @@ -152,7 +152,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
continue
}
args.TLS.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewKafkaTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewKafkaTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
}
Expand All @@ -169,7 +169,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
continue
}
args.RootCAs = transport.TLSClientConfig.RootCAs
newTarget, err := target.NewMQTTTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewMQTTTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
}
Expand All @@ -185,7 +185,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
if !args.Enable {
continue
}
newTarget, err := target.NewMySQLTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewMySQLTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
}
Expand All @@ -201,7 +201,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
if !args.Enable {
continue
}
newTarget, err := target.NewNATSTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewNATSTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
}
Expand All @@ -217,7 +217,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
if !args.Enable {
continue
}
newTarget, err := target.NewNSQTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewNSQTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
}
Expand All @@ -233,7 +233,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
if !args.Enable {
continue
}
newTarget, err := target.NewPostgreSQLTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewPostgreSQLTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
}
Expand All @@ -249,7 +249,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
if !args.Enable {
continue
}
newTarget, err := target.NewRedisTarget(id, args, doneCh, logger.LogOnceIf)
newTarget, err := target.NewRedisTarget(id, args, doneCh, logger.LogOnceIf, test)
if err != nil {
return nil, err
}
Expand All @@ -265,7 +265,7 @@ func RegisterNotificationTargets(cfg config.Config, doneCh <-chan struct{}, tran
if !args.Enable {
continue
}
newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf, transport)
newTarget, err := target.NewWebhookTarget(id, args, doneCh, logger.LogOnceIf, transport, test)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/event/target/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,14 @@ func (target *AMQPTarget) Send(eventKey string) error {

// Close - does nothing and available for interface compatibility.
func (target *AMQPTarget) Close() error {
if target.conn != nil {
return target.conn.Close()
}
return nil
}

// NewAMQPTarget - creates new AMQP target.
func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})) (*AMQPTarget, error) {
func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*AMQPTarget, error) {
var conn *amqp.Connection
var err error

Expand Down Expand Up @@ -293,7 +296,7 @@ func NewAMQPTarget(id string, args AMQPArgs, doneCh <-chan struct{}, loggerOnce
loggerOnce: loggerOnce,
}

if target.store != nil {
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())

Expand Down
8 changes: 6 additions & 2 deletions pkg/event/target/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func (target *ElasticsearchTarget) Send(eventKey string) error {

// Close - does nothing and available for interface compatibility.
func (target *ElasticsearchTarget) Close() error {
if target.client != nil {
// Stops the background processes that the client is running.
target.client.Stop()
}
return nil
}

Expand Down Expand Up @@ -242,7 +246,7 @@ func newClient(args ElasticsearchArgs) (*elastic.Client, error) {
}

// NewElasticsearchTarget - creates new Elasticsearch target.
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*ElasticsearchTarget, error) {
func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*ElasticsearchTarget, error) {
var client *elastic.Client
var err error

Expand Down Expand Up @@ -275,7 +279,7 @@ func NewElasticsearchTarget(id string, args ElasticsearchArgs, doneCh <-chan str
store: store,
}

if target.store != nil {
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
Expand Down
4 changes: 2 additions & 2 deletions pkg/event/target/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (k KafkaArgs) pingBrokers() bool {
}

// NewKafkaTarget - creates new Kafka target with auth credentials.
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*KafkaTarget, error) {
func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*KafkaTarget, error) {
config := sarama.NewConfig()

config.Net.SASL.User = args.SASL.User
Expand Down Expand Up @@ -287,7 +287,7 @@ func NewKafkaTarget(id string, args KafkaArgs, doneCh <-chan struct{}, loggerOnc
store: store,
}

if target.store != nil {
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
Expand Down
16 changes: 8 additions & 8 deletions pkg/event/target/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (target *MQTTTarget) Close() error {
}

// NewMQTTTarget - creates new MQTT target.
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*MQTTTarget, error) {
func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MQTTTarget, error) {
if args.MaxReconnectInterval == 0 {
// Default interval
// https://github.com/eclipse/paho.mqtt.golang/blob/master/options.go#L115
Expand Down Expand Up @@ -261,13 +261,13 @@ func NewMQTTTarget(id string, args MQTTArgs, doneCh <-chan struct{}, loggerOnce
return nil, err
}

go retryRegister()

// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())

// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
if !test {
go retryRegister()
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
go sendEvents(target, eventKeyCh, doneCh, loggerOnce)
}
} else {
if token.Wait() && token.Error() != nil {
return nil, token.Error()
Expand Down
4 changes: 2 additions & 2 deletions pkg/event/target/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (target *MySQLTarget) executeStmts() error {
}

// NewMySQLTarget - creates new MySQL target.
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*MySQLTarget, error) {
func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*MySQLTarget, error) {
var firstPing bool
if args.DSN == "" {
config := mysql.Config{
Expand Down Expand Up @@ -395,7 +395,7 @@ func NewMySQLTarget(id string, args MySQLArgs, doneCh <-chan struct{}, loggerOnc
target.firstPing = true
}

if target.store != nil {
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
Expand Down
4 changes: 2 additions & 2 deletions pkg/event/target/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (target *NATSTarget) Close() (err error) {
}

// NewNATSTarget - creates new NATS target.
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*NATSTarget, error) {
func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NATSTarget, error) {
var natsConn *nats.Conn
var stanConn stan.Conn

Expand Down Expand Up @@ -348,7 +348,7 @@ func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}, loggerOnce
store: store,
}

if target.store != nil {
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
Expand Down
10 changes: 6 additions & 4 deletions pkg/event/target/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,15 @@ func (target *NSQTarget) Send(eventKey string) error {

// Close - closes underneath connections to NSQD server.
func (target *NSQTarget) Close() (err error) {
// this blocks until complete:
target.producer.Stop()
if target.producer != nil {
// this blocks until complete:
target.producer.Stop()
}
return nil
}

// NewNSQTarget - creates new NSQ target.
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*NSQTarget, error) {
func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*NSQTarget, error) {
config := nsq.NewConfig()
if args.TLS.Enable {
config.TlsV1 = true
Expand Down Expand Up @@ -211,7 +213,7 @@ func NewNSQTarget(id string, args NSQArgs, doneCh <-chan struct{}, loggerOnce fu
}
}

if target.store != nil {
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
Expand Down
4 changes: 2 additions & 2 deletions pkg/event/target/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (target *PostgreSQLTarget) executeStmts() error {
}

// NewPostgreSQLTarget - creates new PostgreSQL target.
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{})) (*PostgreSQLTarget, error) {
func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), test bool) (*PostgreSQLTarget, error) {
var firstPing bool

params := []string{args.ConnectionString}
Expand Down Expand Up @@ -400,7 +400,7 @@ func NewPostgreSQLTarget(id string, args PostgreSQLArgs, doneCh <-chan struct{},
target.firstPing = true
}

if target.store != nil {
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
Expand Down
8 changes: 4 additions & 4 deletions pkg/event/target/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,13 @@ func (target *RedisTarget) Send(eventKey string) error {
return target.store.Del(eventKey)
}

// Close - does nothing and available for interface compatibility.
// Close - releases the resources used by the pool.
func (target *RedisTarget) Close() error {
return nil
return target.pool.Close()
}

// NewRedisTarget - creates new Redis target.
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{})) (*RedisTarget, error) {
func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, errKind ...interface{}), test bool) (*RedisTarget, error) {
pool := &redis.Pool{
MaxIdle: 3,
IdleTimeout: 2 * 60 * time.Second,
Expand Down Expand Up @@ -314,7 +314,7 @@ func NewRedisTarget(id string, args RedisArgs, doneCh <-chan struct{}, loggerOnc
target.firstPing = true
}

if target.store != nil {
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
Expand Down
6 changes: 4 additions & 2 deletions pkg/event/target/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,13 @@ func (target *WebhookTarget) Send(eventKey string) error {

// Close - does nothing and available for interface compatibility.
func (target *WebhookTarget) Close() error {
// Close idle connection with "keep-alive" states
target.httpClient.CloseIdleConnections()
return nil
}

// NewWebhookTarget - creates new Webhook target.
func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport) (*WebhookTarget, error) {
func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, loggerOnce func(ctx context.Context, err error, id interface{}, kind ...interface{}), transport *http.Transport, test bool) (*WebhookTarget, error) {

var store Store

Expand All @@ -211,7 +213,7 @@ func NewWebhookTarget(id string, args WebhookArgs, doneCh <-chan struct{}, logge
store: store,
}

if target.store != nil {
if target.store != nil && !test {
// Replays the events from the store.
eventKeyCh := replayEvents(target.store, doneCh, loggerOnce, target.ID())
// Start replaying events from the store.
Expand Down

0 comments on commit 4cd1bbb

Please sign in to comment.