Skip to content

Commit

Permalink
HEIMDALL-4809 add Opsgenie Action support to OEC
Browse files Browse the repository at this point in the history
  • Loading branch information
MeralBusraTekinsen committed Sep 30, 2019
1 parent ae3f27e commit 1715997
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 104 deletions.
12 changes: 6 additions & 6 deletions queue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ type SqsJob struct {
queueProvider QueueProvider
queueMessage QueueMessage

integrationId string
apiKey string
baseUrl string
ownerId string
apiKey string
baseUrl string

state int32
executeMutex *sync.Mutex
}

func NewSqsJob(queueMessage QueueMessage, queueProvider QueueProvider, apiKey, baseUrl, integrationId string) Job {
func NewSqsJob(queueMessage QueueMessage, queueProvider QueueProvider, apiKey, baseUrl, ownerId string) Job {
return &SqsJob{
queueProvider: queueProvider,
queueMessage: queueMessage,
executeMutex: &sync.Mutex{},
apiKey: apiKey,
baseUrl: baseUrl,
integrationId: integrationId,
ownerId: ownerId,
state: JobInitial,
}
}
Expand Down Expand Up @@ -77,7 +77,7 @@ func (j *SqsJob) Execute() error {
messageAttr := j.SqsMessage().MessageAttributes

if messageAttr == nil ||
*messageAttr[integrationId].StringValue != j.integrationId {
*messageAttr[ownerId].StringValue != j.ownerId {
j.state = JobError
return errors.Errorf("Message[%s] is invalid, will not be processed.", messageId)
}
Expand Down
4 changes: 2 additions & 2 deletions queue/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func newJobTest() *SqsJob {
executeMutex: &sync.Mutex{},
apiKey: mockApiKey,
baseUrl: mockBaseUrl,
integrationId: mockIntegrationId,
ownerId: mockOwnerId,
state: JobInitial,
}
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestExecuteWithInvalidQueueMessage(t *testing.T) {

sqsJob.queueMessage.(*MockQueueMessage).MessageFunc = func() *sqs.Message {
falseIntegrationId := "falseIntegrationId"
messageAttr := map[string]*sqs.MessageAttributeValue{integrationId: {StringValue: &falseIntegrationId}}
messageAttr := map[string]*sqs.MessageAttributeValue{ownerId: {StringValue: &falseIntegrationId}}
return &sqs.Message{MessageAttributes: messageAttr, MessageId: &mockMessageId}
}

Expand Down
2 changes: 1 addition & 1 deletion queue/marid_token.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package queue

type OECToken struct {
IntegrationId string `json:"integrationId,omitempty"`
OwnerId string `json:"ownerId,omitempty"`
OECMetadataList []OECMetadata `json:"queueProperties,omitempty"`
}

Expand Down
8 changes: 4 additions & 4 deletions queue/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type OECPoller struct {
workerPool WorkerPool
queueProvider QueueProvider

integrationId string
ownerId string
conf *conf.Configuration
repositories git.Repositories
queueMessageLogrus *logrus.Logger
Expand All @@ -40,7 +40,7 @@ type OECPoller struct {
}

func NewPoller(workerPool WorkerPool, queueProvider QueueProvider,
conf *conf.Configuration, integrationId string,
conf *conf.Configuration, ownerId string,
repositories git.Repositories) Poller {

return &OECPoller{
Expand All @@ -51,7 +51,7 @@ func NewPoller(workerPool WorkerPool, queueProvider QueueProvider,
startStopMu: &sync.Mutex{},
conf: conf,
repositories: repositories,
integrationId: integrationId,
ownerId: ownerId,
workerPool: workerPool,
queueProvider: queueProvider,
queueMessageLogrus: newQueueMessageLogrus(queueProvider.OECMetadata().Region()),
Expand Down Expand Up @@ -155,7 +155,7 @@ func (p *OECPoller) poll() (shouldWait bool) {
p.queueProvider,
p.conf.ApiKey,
p.conf.BaseUrl,
p.integrationId,
p.ownerId,
)

isSubmitted, err := p.workerPool.Submit(job)
Expand Down
2 changes: 1 addition & 1 deletion queue/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func NewMockPoller() Poller {
}

func NewMockPollerForQueueProcessor(workerPool WorkerPool, queueProvider QueueProvider,
conf *conf.Configuration, integrationId string,
conf *conf.Configuration, ownerId string,
repositories git.Repositories) Poller {
return NewMockPoller()
}
Expand Down
16 changes: 9 additions & 7 deletions queue/queue_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,26 @@ func (qm *OECQueueMessage) Process() (*runbook.ActionResultPayload, error) {
return nil, err
}

alertId := queuePayload.Alert.AlertId
entityId := queuePayload.Entity.Id
entityType := queuePayload.Entity.Type
action := queuePayload.MappedAction.Name
if action == "" {
action = queuePayload.Action
}

if action == "" {
return nil, errors.Errorf("SQS message with alertId[%s] does not contain action property.", alertId)
return nil, errors.Errorf("SQS message with entityId[%s] does not contain action property.", entityId)
}

mappedAction, ok := qm.actionSpecs.ActionMappings[conf.ActionName(action)]
if !ok {
return nil, errors.Errorf("There is no mapped action found for action[%s]. SQS message with alertId[%s] will be ignored.", action, alertId)
return nil, errors.Errorf("There is no mapped action found for action[%s]. SQS message with entityId[%s] will be ignored.", action, entityId)
}

result := &runbook.ActionResultPayload{
AlertId: alertId,
Action: action,
EntityId: entityId,
EntityType: entityType,
Action: action,
}

start := time.Now()
Expand All @@ -72,11 +74,11 @@ func (qm *OECQueueMessage) Process() (*runbook.ActionResultPayload, error) {
switch err := err.(type) {
case *runbook.ExecError:
result.FailureMessage = fmt.Sprintf("Err: %s, Stderr: %s", err.Error(), err.Stderr)
logrus.Debugf("Action[%s] execution of message[%s] with alertId[%s] failed: %s Stderr: %s", action, *qm.message.MessageId, alertId, err.Error(), err.Stderr)
logrus.Debugf("Action[%s] execution of message[%s] with entityId[%s] failed: %s Stderr: %s", action, *qm.message.MessageId, entityId, err.Error(), err.Stderr)

case nil:
result.IsSuccessful = true
logrus.Debugf("Action[%s] execution of message[%s] with alertId[%s] has been completed and it took %f seconds.", action, *qm.message.MessageId, alertId, took.Seconds())
logrus.Debugf("Action[%s] execution of message[%s] with entityId[%s] has been completed and it took %f seconds.", action, *qm.message.MessageId, entityId, took.Seconds())

default:
return nil, err
Expand Down
14 changes: 7 additions & 7 deletions queue/queue_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
)

var (
mockMessageId = "mockMessageId"
mockApiKey = "mockApiKey"
mockBaseUrl = "mockBaseUrl"
mockIntegrationId = "mockIntegrationId"
mockMessageId = "mockMessageId"
mockApiKey = "mockApiKey"
mockBaseUrl = "mockBaseUrl"
mockOwnerId = "mockOwnerId"
)

var mockActionSpecs = &conf.ActionSpecifications{
Expand Down Expand Up @@ -90,7 +90,7 @@ func testProcessMappedActionNotFound(t *testing.T) {
queueMessage := NewOECMessage(message, nil, mockActionSpecs)

_, err := queueMessage.Process()
expectedErr := errors.New("There is no mapped action found for action[Ack]. SQS message with alertId[] will be ignored.")
expectedErr := errors.New("There is no mapped action found for action[Ack]. SQS message with entityId[] will be ignored.")
assert.EqualError(t, err, expectedErr.Error())
}

Expand All @@ -103,7 +103,7 @@ func testProcessFieldMissing(t *testing.T) {
queueMessage := NewOECMessage(message, nil, mockActionSpecs)

_, err := queueMessage.Process()
expectedErr := errors.New("SQS message with alertId[] does not contain action property.")
expectedErr := errors.New("SQS message with entityId[] does not contain action property.")
assert.EqualError(t, err, expectedErr.Error())
}

Expand All @@ -119,7 +119,7 @@ func (mqm *MockQueueMessage) Message() *sqs.Message {
}

body := "mockBody"
messageAttr := map[string]*sqs.MessageAttributeValue{integrationId: {StringValue: &mockIntegrationId}}
messageAttr := map[string]*sqs.MessageAttributeValue{ownerId: {StringValue: &mockOwnerId}}

return &sqs.Message{
MessageId: &mockMessageId,
Expand Down
67 changes: 4 additions & 63 deletions queue/queue_payload.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,17 @@
package queue

type QueuePayload struct {
Alert Alert `json:"alert"`
Entity Entity `json:"entity"`
Action string `json:"action"`
MappedAction MappedAction `json:"mappedActionV2"`
}

type Alert struct {
AlertId string `json:"alertId"`
type Entity struct {
Id string `json:"id"`
Type string `json:"type"`
}

type MappedAction struct {
Name string `json:"name"`
ExtraField string `json:"extraField"`
}

/* Unmarshaling of full payload is not necessary
type QueuePayload struct {
Source Source `json:"source,omitempty"`
Alert Alert `json:"alert,omitempty"`
Action string `json:"action,omitempty"`
MappedAction MappedAction `json:"mappedAction,omitempty"`
IntegrationId string `json:"integrationName,omitempty"`
IntegrationName string `json:"integrationId,omitempty"`
EscalationId string `json:"escalationId,omitempty"`
EscalationName string `json:"escalationName,omitempty"`
EscalationNotify EscalationNotify `json:"escalationNotify,omitempty"`
EscalationTime int64 `json:"escalationTime,omitempty"`
RepeatCount int `json:"repeatCount,omitempty"`
}
type Source struct {
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
}
type Alert struct {
AlertId string `json:"alertId,omitempty"`
Message string `json:"message,omitempty"`
Tags []string `json:"tags,omitempty"`
TinyId string `json:"tinyId,omitempty"`
Entity string `json:"entity,omitempty"`
Alias string `json:"alias,omitempty"`
CreatedAt int64 `json:"createdAt,omitempty"`
UpdatedAt int64 `json:"updatedAt,omitempty"`
Username string `json:"username,omitempty"`
UserId string `json:"userId,omitempty"`
Recipient string `json:"recipient,omitempty"`
Team string `json:"team,omitempty"`
Owner string `json:"owner,omitempty"`
Recipients []string `json:"recipients,omitempty"`
Teams []string `json:"teams,omitempty"`
Actions []string `json:"actions,omitempty"`
SnoozeEndDate string `json:"snoozeEndDate,omitempty"`
SnoozedUntil int64 `json:"snoozedUntil,omitempty"`
AddedTags string `json:"addedTags,omitempty"`
RemovedTags string `json:"removedTags,omitempty"`
Priority string `json:"priority,omitempty"`
OldPriority string `json:"oldPriority,omitempty"`
Source string `json:"source,omitempty"`
}
type MappedAction struct {
Name string `json:"name,omitempty"`
ExtraField string `json:"extraField,omitempty"`
}
type EscalationNotify struct {
Entity string `json:"entity,omitempty"`
Id string `json:"id,omitempty"`
Type string `json:"type,omitempty"`
Name string `json:"name,omitempty"`
}
*/
6 changes: 3 additions & 3 deletions queue/queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ func (qp *OECQueueProcessor) receiveToken() (*OECToken, error) {
return token, nil
}

func (qp *OECQueueProcessor) addPoller(queueProvider QueueProvider, integrationId string) Poller {
func (qp *OECQueueProcessor) addPoller(queueProvider QueueProvider, ownerId string) Poller {
poller := newPollerFunc(
qp.workerPool,
queueProvider,
qp.configuration,
integrationId,
ownerId,
qp.repositories,
)
qp.pollers[queueProvider.OECMetadata().QueueUrl()] = poller
Expand Down Expand Up @@ -249,7 +249,7 @@ func (qp *OECQueueProcessor) refreshPollers(token *OECToken) {
logrus.Errorf("Poller[%s] could not be added: %s.", queueUrl, err)
continue
}
qp.addPoller(queueProvider, token.IntegrationId).StartPolling()
qp.addPoller(queueProvider, token.OwnerId).StartPolling()
logrus.Debugf("Poller[%s] is added.", queueUrl)
}
}
Expand Down
4 changes: 2 additions & 2 deletions queue/queue_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ func TestAddTwoDifferentPollersTest(t *testing.T) {

processor := newQueueProcessorTest()

poller := processor.addPoller(NewMockQueueProvider(), mockIntegrationId).(*OECPoller)
poller := processor.addPoller(NewMockQueueProvider(), mockOwnerId).(*OECPoller)

mockQueueProvider2 := NewMockQueueProvider().(*MockQueueProvider)
mockQueueProvider2.OECMetadataFunc = func() OECMetadata {
return mockOECMetadata2
}

processor.addPoller(mockQueueProvider2, mockIntegrationId)
processor.addPoller(mockQueueProvider2, mockOwnerId)

assert.Equal(t, mockOECMetadata1, poller.QueueProvider().OECMetadata())
assert.Equal(t, processor.configuration.PollerConf, poller.conf.PollerConf)
Expand Down
4 changes: 2 additions & 2 deletions queue/queue_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"sync"
)

const integrationId = "integrationId"
const ownerId = "ownerId"

type SQS interface {
ChangeMessageVisibility(input *sqs.ChangeMessageVisibilityInput) (*sqs.ChangeMessageVisibilityOutput, error)
Expand Down Expand Up @@ -112,7 +112,7 @@ func (qp *OECQueueProvider) ReceiveMessage(maxNumOfMessage int64, visibilityTime

request := &sqs.ReceiveMessageInput{
MessageAttributeNames: []*string{
aws.String(integrationId),
aws.String(ownerId),
},
QueueUrl: &queueUrl,
MaxNumberOfMessages: aws.Int64(maxNumOfMessage),
Expand Down
4 changes: 2 additions & 2 deletions queue/queue_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestReceiveMessage(t *testing.T) {
assert.Equal(t, int64(0), *capturedInput.WaitTimeSeconds) // because of short polling
assert.Equal(t, int64(10), *capturedInput.MaxNumberOfMessages)
assert.Equal(t, 1, len(capturedInput.MessageAttributeNames))
assert.Equal(t, "integrationId", *capturedInput.MessageAttributeNames[0])
assert.Equal(t, "ownerId", *capturedInput.MessageAttributeNames[0])
}

func TestReceiveMessageWithError(t *testing.T) {
Expand Down Expand Up @@ -238,7 +238,7 @@ var mockSuccessReceiveFunc = func(numOfMessage int64, visibilityTimeout int64) (
messages := make([]*sqs.Message, 0)
for i := int64(0); i < numOfMessage; i++ {
id := strconv.FormatInt(i, 10)
messageAttr := map[string]*sqs.MessageAttributeValue{"integrationId": {StringValue: &mockIntegrationId}}
messageAttr := map[string]*sqs.MessageAttributeValue{"ownerId": {StringValue: &mockOwnerId}}
messages = append(messages, &sqs.Message{MessageId: &id, MessageAttributes: messageAttr, Body: &body})
}

Expand Down
3 changes: 2 additions & 1 deletion runbook/action_result_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package runbook

type ActionResultPayload struct {
IsSuccessful bool `json:"isSuccessful,omitempty"`
AlertId string `json:"alertId,omitempty"`
EntityId string `json:"entityId,omitempty"`
EntityType string `json:"entityType,omitempty"`
Action string `json:"action,omitempty"`
FailureMessage string `json:"failureMessage,omitempty"`
}
6 changes: 3 additions & 3 deletions runbook/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestSendResultToOpsGenie(t *testing.T) {
t.Errorf("Expected request to have ‘mappedAction=testAction’, got: ‘%s’", actionResult.Action)
}

if actionResult.AlertId != "testAlert" {
t.Errorf("Expected request to have ‘alertId=testAlert’, got: ‘%s’", actionResult.AlertId)
if actionResult.EntityId != "testAlert" {
t.Errorf("Expected request to have ‘entityId=testAlert’, got: ‘%s’", actionResult.EntityId)
}

if actionResult.IsSuccessful != true {
Expand All @@ -56,7 +56,7 @@ func TestSendResultToOpsGenie(t *testing.T) {

actionResult := &ActionResultPayload{
Action: "testAction",
AlertId: "testAlert",
EntityId: "testAlert",
IsSuccessful: true,
FailureMessage: "fail",
}
Expand Down

0 comments on commit 1715997

Please sign in to comment.