Skip to content

Commit

Permalink
enhancement(aws_sqs source): Optimize event send loop (vectordotdev#1…
Browse files Browse the repository at this point in the history
…1925)

* enhancement(aws_sqs source): Optimize event send loop

* Pre-allocate the events vector

* Emit error event
  • Loading branch information
bruceg authored Mar 22, 2022
1 parent dd31447 commit 0575a3a
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions src/sources/aws_sqs/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use futures::FutureExt;
use tokio::{pin, select, time::Duration};
use tokio_util::codec::Decoder as _;
use vector_common::byte_size_of::ByteSizeOf;
use vector_core::{self, internal_event::EventsReceived};

use crate::{
codecs::Decoder,
config::log_schema,
event::{BatchNotifier, BatchStatus, Event},
internal_events::{AwsSqsBytesReceived, SqsMessageDeleteError},
internal_events::{
AwsSqsBytesReceived, EventsReceived, SqsMessageDeleteError, StreamClosedError,
},
shutdown::ShutdownSignal,
sources::util::StreamDecodingError,
SourceSender,
Expand Down Expand Up @@ -90,35 +91,34 @@ impl SqsSource {
};

if let Some(messages) = receive_message_output.messages {
let mut receipts_to_ack = vec![];
let mut receipts_to_ack = Vec::with_capacity(messages.len());
let mut events = Vec::with_capacity(messages.len());
let mut byte_size = 0;

let (batch, batch_receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
for message in messages {
if let Some(body) = message.body {
emit!(AwsSqsBytesReceived {
byte_size: body.len()
});
byte_size += body.len();
// a receipt handle should always exist
if let Some(receipt_handle) = message.receipt_handle {
receipts_to_ack.push(receipt_handle);
}
let timestamp = get_timestamp(&message.attributes);
let events = decode_message(self.decoder.clone(), body.as_bytes(), timestamp);
let send_result = if let Some(batch) = batch.as_ref() {
let events = events.map(|event| event.with_batch_notifier(batch));
out.send_batch(events).await
let decoded = decode_message(self.decoder.clone(), body.as_bytes(), timestamp);
if let Some(batch) = batch.as_ref() {
let decoded = decoded.map(|event| event.with_batch_notifier(batch));
events.extend(decoded);
} else {
out.send_batch(events).await
};

match send_result {
Err(err) => error!(message = "Error sending to sink.", error = %err),
Ok(()) => {
// a receipt handle should always exist
if let Some(receipt_handle) = message.receipt_handle {
receipts_to_ack.push(receipt_handle);
}
}
events.extend(decoded);
}
}
}
drop(batch);
drop(batch); // Drop last reference to batch acknowledgement finalizer
emit!(AwsSqsBytesReceived { byte_size });
let count = events.len();
if let Err(error) = out.send_batch(events).await {
emit!(StreamClosedError { error, count });
}

if let Some(receiver) = batch_receiver {
let client = self.client.clone();
Expand Down

0 comments on commit 0575a3a

Please sign in to comment.