Skip to content

Commit

Permalink
Implement message attribute parsing for PublishBatch
Browse files Browse the repository at this point in the history
Factor out attribute parsing for SendMessage, SendMessageBatch, and Publish in the process.

I believe this change is adequately covered by the smoke tests, but am
happy to be convinced otherwise
  • Loading branch information
toastwaffle authored and Admiral-Piett committed Dec 18, 2024
1 parent 1bdee6e commit 93f2751
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 89 deletions.
116 changes: 29 additions & 87 deletions app/models/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"fmt"
"net/url"
"strconv"
"strings"

"golang.org/x/text/cases"
"golang.org/x/text/language"

log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -187,30 +183,41 @@ type SendMessageRequest struct {
QueueUrl string `json:"QueueUrl" schema:"QueueUrl"`
}

func (r *SendMessageRequest) SetAttributesFromForm(values url.Values) {
func parseMessageAttributes(values url.Values, keyPrefix string) map[string]MessageAttribute {
result := map[string]MessageAttribute{}

for i := 1; true; i++ {
nameKey := fmt.Sprintf("MessageAttribute.%d.Name", i)
nameKey := fmt.Sprintf("%s.%d.Name", keyPrefix, i)
name := values.Get(nameKey)
if name == "" {
break
}

dataTypeKey := fmt.Sprintf("MessageAttribute.%d.Value.DataType", i)
dataTypeKey := fmt.Sprintf("%s.%d.Value.DataType", keyPrefix, i)
dataType := values.Get(dataTypeKey)
if dataType == "" {
log.Warnf("DataType of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
log.Warnf("DataType of message attribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
continue
}

stringValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.StringValue", i))
binaryValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.BinaryValue", i))
stringValue := values.Get(fmt.Sprintf("%s.%d.Value.StringValue", keyPrefix, i))
binaryValue := values.Get(fmt.Sprintf("%s.%d.Value.BinaryValue", keyPrefix, i))

r.MessageAttributes[name] = MessageAttribute{
result[name] = MessageAttribute{
DataType: dataType,
StringValue: stringValue,
BinaryValue: []byte(binaryValue),
}
}

if len(result) > 0 {
return result
}
return nil
}

func (r *SendMessageRequest) SetAttributesFromForm(values url.Values) {
r.MessageAttributes = parseMessageAttributes(values, "MessageAttribute")
}

func NewSendMessageBatchRequest() *SendMessageBatchRequest {
Expand All @@ -223,53 +230,8 @@ type SendMessageBatchRequest struct {
}

func (r *SendMessageBatchRequest) SetAttributesFromForm(values url.Values) {
for key := range values {

keySegments := strings.Split(key, ".")
//If index value size is 3 or less, there is no attribute value
if len(keySegments) <= 3 {
continue
}

// Both patterns below are supported here.
// strconv.Atoi(keySegments[1] - targets the index value in pattern: `Entries.1.MessageBody`
// strconv.Atoi(keySegments[3] - targets the index value in pattern: `Entries.1.MessageAttributes.1.Name`
entryIndex, err1 := strconv.Atoi(keySegments[1])
attributeIndex, err2 := strconv.Atoi(keySegments[3])

// If the entry index and attribute index cannot be obtained, the attribute will not be set, so skip
if err1 != nil || err2 != nil {
continue
}

nameKey := fmt.Sprintf("Entries.%d.MessageAttributes.%d.Name", entryIndex, attributeIndex)
if key != nameKey {
continue
}
name := values.Get(nameKey)
dataTypeKey := fmt.Sprintf("Entries.%d.MessageAttributes.%d.Value.DataType", entryIndex, attributeIndex)
dataType := values.Get(dataTypeKey)
if dataType == "" {
log.Warnf("DataType of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
continue
}

stringValue := values.Get(fmt.Sprintf("Entries.%d.MessageAttributes.%d.Value.StringValue", entryIndex, attributeIndex))
binaryValue := values.Get(fmt.Sprintf("Entries.%d.MessageAttributes.%d.Value.BinaryValue", entryIndex, attributeIndex))

if r.Entries[entryIndex].MessageAttributes == nil {
r.Entries[entryIndex].MessageAttributes = make(map[string]MessageAttribute)
}

r.Entries[entryIndex].MessageAttributes[name] = MessageAttribute{
DataType: dataType,
StringValue: stringValue,
BinaryValue: []byte(binaryValue),
}

if _, ok := r.Entries[entryIndex].MessageAttributes[name]; !ok {
log.Warnf("StringValue or BinaryValue of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
}
for entryIndex := range r.Entries {
r.Entries[entryIndex].MessageAttributes = parseMessageAttributes(values, fmt.Sprintf("Entries.%d.MessageAttributes", entryIndex))
}
}

Expand Down Expand Up @@ -742,34 +704,7 @@ type PublishRequest struct {
}

func (r *PublishRequest) SetAttributesFromForm(values url.Values) {
attributes := map[string]MessageAttribute{}
for i := 1; true; i++ {
nameKey := fmt.Sprintf("MessageAttributes.entry.%d.Name", i)
name := values.Get(nameKey)
if name == "" {
break
}

dataTypeKey := fmt.Sprintf("MessageAttributes.entry.%d.Value.DataType", i)
dataType := values.Get(dataTypeKey)
if dataType == "" {
log.Warnf("DataType of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
continue
}

stringValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.StringValue", i))
binaryValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.BinaryValue", i))

if r.MessageAttributes == nil {
r.MessageAttributes = make(map[string]MessageAttribute)
}
attributes[name] = MessageAttribute{
DataType: cases.Title(language.AmericanEnglish).String(dataType), // capitalize
StringValue: stringValue,
BinaryValue: []byte(binaryValue),
}
}
r.MessageAttributes = attributes
r.MessageAttributes = parseMessageAttributes(values, "MessageAttributes.entry")
}

// Satisfy the AbstractPublishEntry interface
Expand Down Expand Up @@ -888,7 +823,14 @@ type PublishBatchRequest struct {
}

func (r *PublishBatchRequest) SetAttributesFromForm(values url.Values) {
// TAG - Implement me
for entryIndex, entry := range r.PublishBatchRequestEntries.Member {
if entry == nil {
// The form values are 1-indexed; at the point that this is called, the first element in the
// list of requests will be nil.
continue
}
entry.MessageAttributes = parseMessageAttributes(values, fmt.Sprintf("PublishBatchRequestEntries.member.%d.MessageAttributes.entry", entryIndex))
}
}

type PublishBatchRequestEntries struct {
Expand Down
78 changes: 78 additions & 0 deletions app/models/requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,3 +659,81 @@ func TestPublishRequest_SetAttributesFromForm_success_concurrent(t *testing.T) {
wg.Wait()
}
}

func TestParseMessageAttributes(t *testing.T) {
for _, tc := range []struct {
description string
values url.Values
keyPrefix string
want map[string]MessageAttribute
}{
{
description: "empty",
values: url.Values{},
keyPrefix: "foo",
want: nil,
},
{
description: "simple",
values: url.Values{
"MessageAttribute.1.Name": []string{"Attr1"},
"MessageAttribute.1.Value.DataType": []string{"String"},
"MessageAttribute.1.Value.StringValue": []string{"Value1"},
"MessageAttribute.2.Name": []string{"Attr2"},
"MessageAttribute.2.Value.DataType": []string{"Binary"},
"MessageAttribute.2.Value.BinaryValue": []string{"VmFsdWUy"},
},
keyPrefix: "MessageAttribute",
want: map[string]MessageAttribute{
"Attr1": {
DataType: "String",
StringValue: "Value1",
BinaryValue: []byte{},
},
"Attr2": {
DataType: "Binary",
BinaryValue: []byte("VmFsdWUy"),
},
},
},
{
description: "attributes after empty name ignored",
values: url.Values{
"MessageAttribute.1.Name": []string{""},
"MessageAttribute.1.Value.DataType": []string{"String"},
"MessageAttribute.1.Value.StringValue": []string{"Value4"},
"MessageAttribute.2.Name": []string{"Attr2"},
"MessageAttribute.2.Value.DataType": []string{"Binary"},
"MessageAttribute.2.Value.BinaryValue": []string{"VmFsdWUy"},
},
keyPrefix: "MessageAttribute",
want: nil,
},
{
description: "attributes after missing number ignored",
values: url.Values{
// Note starting from 2
"MessageAttribute.2.Name": []string{"Attr2"},
"MessageAttribute.2.Value.DataType": []string{"Binary"},
"MessageAttribute.2.Value.BinaryValue": []string{"VmFsdWUy"},
},
keyPrefix: "MessageAttribute",
want: nil,
},
{
description: "empty DataType ignored",
values: url.Values{
"MessageAttribute.1.Name": []string{"Attr4"},
"MessageAttribute.1.Value.DataType": []string{""},
"MessageAttribute.1.Value.StringValue": []string{"Value4"},
},
keyPrefix: "MessageAttribute",
want: nil,
},
} {
t.Run(tc.description, func(t *testing.T) {
got := parseMessageAttributes(tc.values, tc.keyPrefix)
assert.Equal(t, tc.want, got)
})
}
}
5 changes: 3 additions & 2 deletions smoke_tests/sns_publish_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,9 @@ func Test_Publish_batch_sqs_json_raw_with_optional_fields(t *testing.T) {

assert.Equal(t, message, *receivedMessage.Messages[0].Body)
assert.Equal(t, "649b2c548f103e499304eda4d6d4c5a2", *receivedMessage.Messages[0].MD5OfBody)
assert.Equal(t, "d41d8cd98f00b204e9800998ecf8427e", *receivedMessage.Messages[0].MD5OfMessageAttributes)
assert.Len(t, receivedMessage.Messages[0].MessageAttributes, 0)
assert.Equal(t, "45a48b32ccd821cc81a8c28fbac4cd97", *receivedMessage.Messages[0].MD5OfMessageAttributes)
assert.Equal(t, *receivedMessage.Messages[0].MessageAttributes["test"].DataType, "String")
assert.Equal(t, *receivedMessage.Messages[0].MessageAttributes["test"].StringValue, "string-value")
assert.NotNil(t, receivedMessage.Messages[0].MessageId)
assert.NotNil(t, receivedMessage.Messages[0].ReceiptHandle)

Expand Down

0 comments on commit 93f2751

Please sign in to comment.