Skip to content

Commit

Permalink
[Issue 12314][Go Functions] Remove extra call to gi.stats.incrTotalPr…
Browse files Browse the repository at this point in the history
…ocessedSuccessfully() (apache#12316)

* Remove extra call to gi.stats.incrTotalProcessedSuccessfully()

- This call will be made by gi.processResult if
    - there is a result
    - there were no system exceptions
- Fixes apache#12314

* Mirror Java function SDK handling logic
  • Loading branch information
flowchartsman authored Mar 1, 2022
1 parent fd9e639 commit 2c9e498
Showing 1 changed file with 25 additions and 16 deletions.
41 changes: 25 additions & 16 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ CLOSE:

gi.stats.processTimeEnd()
gi.processResult(msgInput, output)
gi.stats.incrTotalProcessedSuccessfully()
case <-idleTimer.C:
close(channel)
break CLOSE
Expand Down Expand Up @@ -353,32 +352,42 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) {
atMostOnce := gi.context.instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_ATMOST_ONCE
autoAck := gi.context.instanceConf.funcDetails.AutoAck

// If the function had an output and the user has specified an output topic, the output needs to be sent to the
// assigned output topic.
if output != nil && gi.context.instanceConf.funcDetails.Sink.Topic != "" {
asyncMsg := pulsar.ProducerMessage{
Payload: output,
}
// Attempt to send the message and handle the response
gi.producer.SendAsync(context.Background(), &asyncMsg, func(messageID pulsar.MessageID,
message *pulsar.ProducerMessage, err error) {
if err != nil {
if autoAck && atLeastOnce {
gi.nackInputMessage(msgInput)
// Dispatch an async send for the message with callback in case of error.
gi.producer.SendAsync(context.Background(), &asyncMsg,
func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) {
// Callback after message async send:
// If there was an error, the SDK is entrusted with responding, and we have at-least-once delivery
// semantics, ensure we nack so someone else can get it, in case we are the only handler. Then mark
// exception and fail out.
if err != nil {
if autoAck && atLeastOnce {
gi.nackInputMessage(msgInput)
}
gi.stats.incrTotalSysExceptions(err)
log.Fatal(err)
}
gi.stats.incrTotalSysExceptions(err)
log.Fatal(err)
} else {
// Otherwise the message succeeded. If the SDK is entrusted with responding and we are not using
// at-most-once delivery semantics, ack the message.
if autoAck && !atMostOnce {
gi.ackInputMessage(msgInput)
}
gi.stats.incrTotalProcessedSuccessfully()
}
})
} else if autoAck && atLeastOnce {
},
)
return
}

// No output from the function or no output topic. Ack if we need to and mark the success before rturning.
if autoAck && atLeastOnce {
gi.ackInputMessage(msgInput)
// Report that we processed successfully even though it's not going to an output topic?
// We probably shouldn't...
// gi.stats.incrTotalProcessedSuccessfully()
}
gi.stats.incrTotalProcessedSuccessfully()
}

// ackInputMessage doesn't produce any result, or the user doesn't want the result.
Expand Down

0 comments on commit 2c9e498

Please sign in to comment.