Skip to content

Commit

Permalink
KAFKA-14658: Do not open broker ports until we are ready to accept tr…
Browse files Browse the repository at this point in the history
…affic (apache#13169)

When we are listening on fixed ports, we should defer opening ports until we're ready to accept
traffic. If we open the broker port too early, it can confuse monitoring and deployment systems.
This is a particular concern when in KRaft mode, since in that mode, we create the SocketServer
object earlier in the startup process than when in ZK mode.

The approach taken in this PR is to defer opening the acceptor port until Acceptor.start is called.
Note that when we are listening on a random port, we continue to open the port "early," in the
SocketServer constructor. The reason for doing this is that there is no other way to find the
random port number the kernel has selected. Since random port assignment is not used in production
deployments, this should be reasonable.

FutureUtils.java: add chainFuture and tests.

SocketServerTest.scala: add timeouts to cases where we call get() on futures.

Reviewers: David Arthur <[email protected]>, Alexandre Dupriez <[email protected]>
  • Loading branch information
cmccabe authored Feb 1, 2023
1 parent 7d61d45 commit 6625214
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 55 deletions.
99 changes: 72 additions & 27 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.channels.{Selector => NSelector, _}
import java.util
import java.util.concurrent._
import java.util.concurrent.atomic._

import kafka.cluster.{BrokerEndPoint, EndPoint}
import kafka.metrics.KafkaMetricsGroup
import kafka.network.ConnectionQuotas._
Expand All @@ -48,6 +47,7 @@ import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestContext, Req
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.server.util.FutureUtils
import org.slf4j.event.Level

import scala.collection._
Expand Down Expand Up @@ -189,30 +189,51 @@ class SocketServer(val config: KafkaConfig,
* processor corresponding to the [[EndPoint]]. Any endpoint
* that does not appear in this map will be started once all
* authorizerFutures are complete.
*
* @return A future which is completed when all of the acceptor threads have
* successfully started. If any of them do not start, the future will
* be completed with an exception.
*/
def enableRequestProcessing(
authorizerFutures: Map[Endpoint, CompletableFuture[Void]]
): Unit = this.synchronized {
): CompletableFuture[Void] = this.synchronized {
if (stopped) {
throw new RuntimeException("Can't enable request processing: SocketServer is stopped.")
}

def chainAcceptorFuture(acceptor: Acceptor): Unit = {
// Because of ephemeral ports, we need to match acceptors to futures by looking at
// the listener name, rather than the endpoint object.
authorizerFutures.find {
val authorizerFuture = authorizerFutures.find {
case (endpoint, _) => acceptor.endPoint.listenerName.value().equals(endpoint.listenerName().get())
} match {
case None => chainFuture(allAuthorizerFuturesComplete, acceptor.startFuture)
case Some((_, future)) => chainFuture(future, acceptor.startFuture)
case None => allAuthorizerFuturesComplete
case Some((_, future)) => future
}
authorizerFuture.whenComplete((_, e) => {
if (e != null) {
// If the authorizer failed to start, fail the acceptor's startedFuture.
acceptor.startedFuture.completeExceptionally(e)
} else {
// Once the authorizer has started, attempt to start the associated acceptor. The Acceptor.start()
// function will complete the acceptor started future (either successfully or not)
acceptor.start()
}
})
}

info("Enabling request processing.")
controlPlaneAcceptorOpt.foreach(chainAcceptorFuture)
dataPlaneAcceptors.values().forEach(chainAcceptorFuture)
chainFuture(CompletableFuture.allOf(authorizerFutures.values.toArray: _*),
FutureUtils.chainFuture(CompletableFuture.allOf(authorizerFutures.values.toArray: _*),
allAuthorizerFuturesComplete)

// Construct a future that will be completed when all Acceptors have been successfully started.
// Alternately, if any of them fail to start, this future will be completed exceptionally.
val allAcceptors = dataPlaneAcceptors.values().asScala.toSeq ++ controlPlaneAcceptorOpt
val enableFuture = new CompletableFuture[Void]
FutureUtils.chainFuture(CompletableFuture.allOf(allAcceptors.map(_.startedFuture).toArray: _*), enableFuture)
enableFuture
}

def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {
Expand Down Expand Up @@ -289,13 +310,13 @@ class SocketServer(val config: KafkaConfig,
try {
val acceptor = dataPlaneAcceptors.get(endpoints(listenerName))
if (acceptor != null) {
acceptor.serverChannel.socket.getLocalPort
acceptor.localPort
} else {
controlPlaneAcceptorOpt.map(_.serverChannel.socket().getLocalPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane"))
controlPlaneAcceptorOpt.map(_.localPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane"))
}
} catch {
case e: Exception =>
throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
throw new KafkaException("Tried to check for port of non-existing protocol", e)
}
}

Expand All @@ -312,7 +333,13 @@ class SocketServer(val config: KafkaConfig,
val acceptor = dataPlaneAcceptors.get(endpoint)
// There is no authorizer future for this new listener endpoint. So start the
// listener once all authorizer futures are complete.
chainFuture(allAuthorizerFuturesComplete, acceptor.startFuture)
allAuthorizerFuturesComplete.whenComplete((_, e) => {
if (e != null) {
acceptor.startedFuture.completeExceptionally(e)
} else {
acceptor.start()
}
})
}
}

Expand Down Expand Up @@ -388,15 +415,6 @@ object SocketServer {
CoreUtils.swallow(channel.socket().close(), logging, Level.ERROR)
CoreUtils.swallow(channel.close(), logging, Level.ERROR)
}

def chainFuture(sourceFuture: CompletableFuture[Void],
destinationFuture: CompletableFuture[Void]): Unit = {
sourceFuture.whenComplete((_, t) => if (t != null) {
destinationFuture.completeExceptionally(t)
} else {
destinationFuture.complete(null)
})
}
}

object DataPlaneAcceptor {
Expand Down Expand Up @@ -573,7 +591,21 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private val listenBacklogSize = config.socketListenBacklogSize

private val nioSelector = NSelector.open()
private[network] val serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)

// If the port is configured as 0, we are using a wildcard port, so we need to open the socket
// before we can find out what port we have. If it is set to a nonzero value, defer opening
// the socket until we start the Acceptor. The reason for deferring the socket opening is so
// that systems which assume that the socket being open indicates readiness are not confused.
private[network] var serverChannel: ServerSocketChannel = _
private[network] val localPort: Int = if (endPoint.port != 0) {
endPoint.port
} else {
serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)
val newPort = serverChannel.socket().getLocalPort()
info(s"Opened wildcard endpoint ${endPoint.host}:${newPort}")
newPort
}

private[network] val processors = new ArrayBuffer[Processor]()
// Build the metric name explicitly in order to keep the existing name for compatibility
private val blockedPercentMeterMetricName = explicitMetricName(
Expand All @@ -585,23 +617,36 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private var currentProcessorIndex = 0
private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
private var started = false
private[network] val startFuture = new CompletableFuture[Void]()
private[network] val startedFuture = new CompletableFuture[Void]()

val thread = KafkaThread.nonDaemon(
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
this)

startFuture.thenRun(() => synchronized {
if (!shouldRun.get()) {
debug(s"Ignoring start future for ${endPoint.listenerName} since the acceptor has already been shut down.")
} else {
def start(): Unit = synchronized {
try {
if (!shouldRun.get()) {
throw new ClosedChannelException()
}
if (serverChannel == null) {
serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)
debug(s"Opened endpoint ${endPoint.host}:${endPoint.port}")
}
debug(s"Starting processors for listener ${endPoint.listenerName}")
started = true
processors.foreach(_.start())
debug(s"Starting acceptor thread for listener ${endPoint.listenerName}")
thread.start()
startedFuture.complete(null)
started = true
} catch {
case e: ClosedChannelException =>
debug(s"Refusing to start acceptor for ${endPoint.listenerName} since the acceptor has already been shut down.")
startedFuture.completeExceptionally(e)
case t: Throwable =>
error(s"Unable to start acceptor for ${endPoint.listenerName}", t)
startedFuture.completeExceptionally(new RuntimeException(s"Unable to start acceptor for ${endPoint.listenerName}", t))
}
})
}

private[network] case class DelayedCloseSocket(socket: SocketChannel, endThrottleTimeMs: Long) extends Ordered[DelayedCloseSocket] {
override def compare(that: DelayedCloseSocket): Int = endThrottleTimeMs compare that.endThrottleTimeMs
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ class BrokerServer(

// Enable inbound TCP connections. Each endpoint will be started only once its matching
// authorizer future is completed.
socketServer.enableRequestProcessing(authorizerFutures)
val socketServerFuture = socketServer.enableRequestProcessing(authorizerFutures)

// If we are using a ClusterMetadataAuthorizer which stores its ACLs in the metadata log,
// notify it that the loading process is complete.
Expand All @@ -495,6 +495,10 @@ class BrokerServer(
FutureUtils.waitWithLogging(logger.underlying, "all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*), startupDeadline, time)

// Wait for all the SocketServer ports to be open, and the Acceptors to be started.
FutureUtils.waitWithLogging(logger.underlying, "all of the SocketServer Acceptors to be started",
socketServerFuture, startupDeadline, time)

maybeChangeStatus(STARTING, STARTED)
} catch {
case e: Throwable =>
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,16 @@ class ControllerServer(
* metadata log. See @link{QuorumController#maybeCompleteAuthorizerInitialLoad}
* and KIP-801 for details.
*/
socketServer.enableRequestProcessing(authorizerFutures)
val socketServerFuture = socketServer.enableRequestProcessing(authorizerFutures)

// Block here until all the authorizer futures are complete
FutureUtils.waitWithLogging(logger.underlying, "all of the authorizer futures to be completed",
CompletableFuture.allOf(authorizerFutures.values.toSeq: _*), startupDeadline, time)

// Wait for all the SocketServer ports to be open, and the Acceptors to be started.
FutureUtils.waitWithLogging(logger.underlying, "all of the SocketServer Acceptors to be started",
socketServerFuture, startupDeadline, time)

} catch {
case e: Throwable =>
maybeChangeStatus(STARTING, STARTED)
Expand Down
Loading

0 comments on commit 6625214

Please sign in to comment.