Skip to content

Commit 1ee7154

Browse files
KAFKA-18446 Remove MetadataCacheControllerNodeProvider (apache#18437)
Reviewers: PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent d4aee71 commit 1ee7154

File tree

6 files changed

+23
-84
lines changed

6 files changed

+23
-84
lines changed

core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala

+6-65
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,24 @@
1717

1818
package kafka.server
1919

20-
import java.util.concurrent.LinkedBlockingDeque
21-
import java.util.concurrent.atomic.AtomicReference
2220
import kafka.raft.RaftManager
23-
import kafka.server.metadata.ZkMetadataCache
2421
import kafka.utils.Logging
2522
import org.apache.kafka.clients._
26-
import org.apache.kafka.common.{Node, Reconfigurable}
2723
import org.apache.kafka.common.metrics.Metrics
2824
import org.apache.kafka.common.network._
2925
import org.apache.kafka.common.protocol.Errors
3026
import org.apache.kafka.common.requests.AbstractRequest
3127
import org.apache.kafka.common.security.JaasContext
3228
import org.apache.kafka.common.security.auth.SecurityProtocol
3329
import org.apache.kafka.common.utils.{LogContext, Time}
30+
import org.apache.kafka.common.{Node, Reconfigurable}
3431
import org.apache.kafka.server.common.{ApiMessageAndVersion, ControllerRequestCompletionHandler, NodeToControllerChannelManager}
3532
import org.apache.kafka.server.util.{InterBrokerSendThread, RequestAndCompletionHandler}
3633

3734
import java.util
3835
import java.util.Optional
36+
import java.util.concurrent.LinkedBlockingDeque
37+
import java.util.concurrent.atomic.AtomicReference
3938
import scala.collection.Seq
4039
import scala.jdk.CollectionConverters._
4140
import scala.jdk.OptionConverters.{RichOption, RichOptionalInt}
@@ -44,45 +43,13 @@ case class ControllerInformation(
4443
node: Option[Node],
4544
listenerName: ListenerName,
4645
securityProtocol: SecurityProtocol,
47-
saslMechanism: String,
48-
isZkController: Boolean
46+
saslMechanism: String
4947
)
5048

5149
trait ControllerNodeProvider {
5250
def getControllerInfo(): ControllerInformation
5351
}
5452

55-
class MetadataCacheControllerNodeProvider(
56-
val metadataCache: ZkMetadataCache,
57-
val config: KafkaConfig,
58-
val quorumControllerNodeProvider: () => Option[ControllerInformation]
59-
) extends ControllerNodeProvider {
60-
61-
private val zkControllerListenerName = config.interBrokerListenerName
62-
private val zkControllerSecurityProtocol = config.interBrokerSecurityProtocol
63-
private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol
64-
65-
val emptyZkControllerInfo = ControllerInformation(
66-
None,
67-
zkControllerListenerName,
68-
zkControllerSecurityProtocol,
69-
zkControllerSaslMechanism,
70-
isZkController = true)
71-
72-
override def getControllerInfo(): ControllerInformation = {
73-
metadataCache.getControllerId.map {
74-
case ZkCachedControllerId(id) => ControllerInformation(
75-
metadataCache.getAliveBrokerNode(id, zkControllerListenerName),
76-
zkControllerListenerName,
77-
zkControllerSecurityProtocol,
78-
zkControllerSaslMechanism,
79-
isZkController = true)
80-
case KRaftCachedControllerId(_) =>
81-
quorumControllerNodeProvider.apply().getOrElse(emptyZkControllerInfo)
82-
}.getOrElse(emptyZkControllerInfo)
83-
}
84-
}
85-
8653
object RaftControllerNodeProvider {
8754
def apply(
8855
raftManager: RaftManager[ApiMessageAndVersion],
@@ -115,7 +82,7 @@ class RaftControllerNodeProvider(
11582

11683
override def getControllerInfo(): ControllerInformation =
11784
ControllerInformation(raftManager.leaderAndEpoch.leaderId.toScala.flatMap(idToNode),
118-
listenerName, securityProtocol, saslMechanism, isZkController = false)
85+
listenerName, securityProtocol, saslMechanism)
11986
}
12087

12188
/**
@@ -198,8 +165,6 @@ class NodeToControllerChannelManagerImpl(
198165
val controllerInformation = controllerNodeProvider.getControllerInfo()
199166
new NodeToControllerRequestThread(
200167
buildNetworkClient(controllerInformation),
201-
controllerInformation.isZkController,
202-
buildNetworkClient,
203168
manualMetadataUpdater,
204169
controllerNodeProvider,
205170
config,
@@ -243,8 +208,6 @@ case class NodeToControllerQueueItem(
243208

244209
class NodeToControllerRequestThread(
245210
initialNetworkClient: KafkaClient,
246-
var isNetworkClientForZkController: Boolean,
247-
networkClientFactory: ControllerInformation => KafkaClient,
248211
metadataUpdater: ManualMetadataUpdater,
249212
controllerNodeProvider: ControllerNodeProvider,
250213
config: KafkaConfig,
@@ -261,22 +224,6 @@ class NodeToControllerRequestThread(
261224

262225
this.logIdent = logPrefix
263226

264-
private def maybeResetNetworkClient(controllerInformation: ControllerInformation): Unit = {
265-
if (isNetworkClientForZkController != controllerInformation.isZkController) {
266-
debug("Controller changed to " + (if (isNetworkClientForZkController) "kraft" else "zk") + " mode. " +
267-
s"Resetting network client with new controller information : ${controllerInformation}")
268-
// Close existing network client.
269-
val oldClient = networkClient
270-
oldClient.initiateClose()
271-
oldClient.close()
272-
273-
isNetworkClientForZkController = controllerInformation.isZkController
274-
updateControllerAddress(controllerInformation.node.orNull)
275-
controllerInformation.node.foreach(n => metadataUpdater.setNodes(Seq(n).asJava))
276-
networkClient = networkClientFactory(controllerInformation)
277-
}
278-
}
279-
280227
private val requestQueue = new LinkedBlockingDeque[NodeToControllerQueueItem]()
281228
private val activeController = new AtomicReference[Node](null)
282229

@@ -370,19 +317,13 @@ class NodeToControllerRequestThread(
370317

371318
override def doWork(): Unit = {
372319
val controllerInformation = controllerNodeProvider.getControllerInfo()
373-
maybeResetNetworkClient(controllerInformation)
374320
if (activeControllerAddress().isDefined) {
375321
super.pollOnce(Long.MaxValue)
376322
} else {
377323
debug("Controller isn't cached, looking for local metadata changes")
378324
controllerInformation.node match {
379325
case Some(controllerNode) =>
380-
val controllerType = if (controllerInformation.isZkController) {
381-
"ZK"
382-
} else {
383-
"KRaft"
384-
}
385-
info(s"Recorded new $controllerType controller, from now on will use node $controllerNode")
326+
info(s"Recorded new KRaft controller, from now on will use node $controllerNode")
386327
updateControllerAddress(controllerNode)
387328
metadataUpdater.setNodes(Seq(controllerNode).asJava)
388329
case None =>

core/src/test/scala/kafka/server/NodeToControllerRequestThreadTest.scala

+10-10
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.mockito.Mockito._
4040
class NodeToControllerRequestThreadTest {
4141

4242
private def controllerInfo(node: Option[Node]): ControllerInformation = {
43-
ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
43+
ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "")
4444
}
4545

4646
private def emptyControllerInfo: ControllerInformation = {
@@ -59,7 +59,7 @@ class NodeToControllerRequestThreadTest {
5959

6060
val retryTimeoutMs = 30000
6161
val testRequestThread = new NodeToControllerRequestThread(
62-
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
62+
mockClient, new ManualMetadataUpdater(),
6363
controllerNodeProvider, config, time, "", retryTimeoutMs)
6464
testRequestThread.started = true
6565

@@ -97,7 +97,7 @@ class NodeToControllerRequestThreadTest {
9797

9898
val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2))
9999
val testRequestThread = new NodeToControllerRequestThread(
100-
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
100+
mockClient, new ManualMetadataUpdater(),
101101
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
102102
testRequestThread.started = true
103103
mockClient.prepareResponse(expectedResponse)
@@ -141,7 +141,7 @@ class NodeToControllerRequestThreadTest {
141141

142142
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
143143
val testRequestThread = new NodeToControllerRequestThread(
144-
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
144+
mockClient, new ManualMetadataUpdater(),
145145
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
146146
testRequestThread.started = true
147147

@@ -193,7 +193,7 @@ class NodeToControllerRequestThreadTest {
193193
Collections.singletonMap("a", 2))
194194
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
195195
val testRequestThread = new NodeToControllerRequestThread(
196-
mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
196+
mockClient, new ManualMetadataUpdater(),
197197
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
198198
testRequestThread.started = true
199199

@@ -260,7 +260,7 @@ class NodeToControllerRequestThreadTest {
260260
val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2))
261261

262262
val testRequestThread = new NodeToControllerRequestThread(
263-
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
263+
mockClient, new ManualMetadataUpdater(),
264264
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
265265
testRequestThread.started = true
266266

@@ -323,7 +323,7 @@ class NodeToControllerRequestThreadTest {
323323
Collections.singletonMap("a", Errors.NOT_CONTROLLER),
324324
Collections.singletonMap("a", 2))
325325
val testRequestThread = new NodeToControllerRequestThread(
326-
mockClient, isNetworkClientForZkController = true,_ => mockClient, new ManualMetadataUpdater(),
326+
mockClient, new ManualMetadataUpdater(),
327327
controllerNodeProvider, config, time, "", retryTimeoutMs)
328328
testRequestThread.started = true
329329

@@ -382,7 +382,7 @@ class NodeToControllerRequestThreadTest {
382382
mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == ApiKeys.METADATA)
383383

384384
val testRequestThread = new NodeToControllerRequestThread(
385-
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
385+
mockClient, new ManualMetadataUpdater(),
386386
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
387387
testRequestThread.started = true
388388

@@ -420,7 +420,7 @@ class NodeToControllerRequestThreadTest {
420420
mockClient.createPendingAuthenticationError(activeController, 50)
421421

422422
val testRequestThread = new NodeToControllerRequestThread(
423-
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
423+
mockClient, new ManualMetadataUpdater(),
424424
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
425425
testRequestThread.started = true
426426

@@ -443,7 +443,7 @@ class NodeToControllerRequestThreadTest {
443443
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
444444

445445
val testRequestThread = new NodeToControllerRequestThread(
446-
mockClient, isNetworkClientForZkController = true, _ => mockClient, new ManualMetadataUpdater(),
446+
mockClient, new ManualMetadataUpdater(),
447447
controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
448448

449449
val completionHandler = new TestControllerRequestCompletionHandler(None)

core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala

+1-3
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,8 @@ class BrokerRegistrationRequestTest {
5757

5858
val saslMechanism: String = ""
5959

60-
def isZkController: Boolean = false
61-
6260
override def getControllerInfo(): ControllerInformation =
63-
ControllerInformation(node, listenerName, securityProtocol, saslMechanism, isZkController)
61+
ControllerInformation(node, listenerName, securityProtocol, saslMechanism)
6462
},
6563
Time.SYSTEM,
6664
new Metrics(),

core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ class ForwardingManagerTest {
6666
}
6767

6868
private def controllerInfo = {
69-
ControllerInformation(Some(new Node(0, "host", 1234)), new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
69+
ControllerInformation(Some(new Node(0, "host", 1234)), new ListenerName(""), SecurityProtocol.PLAINTEXT, "")
7070
}
7171

7272
private def emptyControllerInfo = {
73-
ControllerInformation(None, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
73+
ControllerInformation(None, new ListenerName(""), SecurityProtocol.PLAINTEXT, "")
7474
}
7575

7676
@Test

core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class SimpleControllerNodeProvider extends ControllerNodeProvider {
4343
def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
4444

4545
override def getControllerInfo(): ControllerInformation = ControllerInformation(Option(node.get()),
46-
listenerName, securityProtocol, saslMechanism, isZkController = false)
46+
listenerName, securityProtocol, saslMechanism)
4747
}
4848

4949
class RegistrationTestContext(

server-common/src/main/java/org/apache/kafka/server/util/InterBrokerSendThread.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@
4343
*/
4444
public abstract class InterBrokerSendThread extends ShutdownableThread {
4545

46-
protected volatile KafkaClient networkClient;
46+
protected final KafkaClient networkClient;
4747

4848
private final int requestTimeoutMs;
4949
private final Time time;
5050
private final UnsentRequests unsentRequests;
5151

52-
public InterBrokerSendThread(
52+
protected InterBrokerSendThread(
5353
String name,
5454
KafkaClient networkClient,
5555
int requestTimeoutMs,
@@ -58,7 +58,7 @@ public InterBrokerSendThread(
5858
this(name, networkClient, requestTimeoutMs, time, true);
5959
}
6060

61-
public InterBrokerSendThread(
61+
protected InterBrokerSendThread(
6262
String name,
6363
KafkaClient networkClient,
6464
int requestTimeoutMs,

0 commit comments

Comments
 (0)