Skip to content

Commit

Permalink
Do billboard rollup in batches (forem#21272)
Browse files Browse the repository at this point in the history
  • Loading branch information
benhalpern authored Sep 9, 2024
1 parent ceacc24 commit 6e66500
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
20 changes: 14 additions & 6 deletions app/services/billboard_event_rollup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,41 @@ def initialize(relation:)

attr_reader :aggregator, :relation

def rollup(date)
def rollup(date, batch_size: 1000)
created = []

rows = relation.where(created_at: date.all_day)
aggregate_into_groups(rows).each do |compacted_events|
created << compact_records(date, compacted_events)
relation.connection.execute("SET LOCAL statement_timeout = '#{STATEMENT_TIMEOUT}s'") # Set temp timeout

relation.where(created_at: date.all_day).in_batches(of: batch_size) do |rows_batch|
aggregate_into_groups(rows_batch).each do |compacted_events|
created << compact_records(date, compacted_events)
end
end

created
ensure
relation.connection.execute("RESET statement_timeout") # Reset after fetching batches
end

private

def aggregate_into_groups(rows)
relation.connection.execute("SET LOCAL statement_timeout = '#{STATEMENT_TIMEOUT}s'") # Set temp timeout

rows.in_batches.each_record do |event|
aggregator << event
end

aggregator
ensure
relation.connection.execute("RESET statement_timeout") # Reset after aggregation
end

def compact_records(date, compacted)
result = nil

relation.connection.execute("SET LOCAL statement_timeout = '#{STATEMENT_TIMEOUT}s'") # Set temp timeout

relation.transaction do
relation.connection.execute("SET LOCAL statement_timeout = '#{STATEMENT_TIMEOUT}s'") # Set temp timeout
result = relation.create!(compacted.to_h) do |event|
event.created_at = date
end
Expand Down
5 changes: 3 additions & 2 deletions spec/services/billboard_event_rollup_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ def days_ago_as_range(num)
described_class.rollup(Date.current - 2)
end.not_to raise_error

expect(BillboardEvent.connection).to have_received(:execute).with("SET LOCAL statement_timeout = '#{BillboardEventRollup::STATEMENT_TIMEOUT}s'")
expect(BillboardEvent.connection).to have_received(:execute).with("RESET statement_timeout")
expect(BillboardEvent.connection).to have_received(:execute)
.with("SET LOCAL statement_timeout = '#{BillboardEventRollup::STATEMENT_TIMEOUT}s'").thrice
expect(BillboardEvent.connection).to have_received(:execute).with("RESET statement_timeout").thrice
end

it "fails if new attributes would be lost" do
Expand Down

0 comments on commit 6e66500

Please sign in to comment.