Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[api][pulsar-client] make AuthenticationToken serializable (apache#10574
) ### Motivation When trying to authenticate Pulsar in distributed systems (a very good example of this is the Spark connector, see here: https://pulsar.apache.org/docs/en/adaptors-spark/), the following exception is thrown (this is taken from the log of one of the Spark executors): ``` 21/05/13 11:59:34 WARN PulsarClientImpl: [topic: persistent://tenant/namespace/topic] Could not get connection while getPartitionedTopicMetadata -- Will try again in 380 ms 21/05/13 11:59:34 INFO ConnectionPool: [[id: 0x4d13ed61, L:/1.2.3.4:43624 - R:broker.svc.cluster.local/1.2.3.4:6650]] Connected to server 21/05/13 11:59:34 WARN ClientCnx: [broker.svc.cluster.local/1.2.3.4:6650] Got exception java.lang.RuntimeException: failed to get client token at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:62) at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getCommandData(AuthenticationDataToken.java:55) at org.apache.pulsar.client.api.AuthenticationDataProvider.authenticate(AuthenticationDataProvider.java:133) at org.apache.pulsar.client.impl.ClientCnx.newConnectCommand(ClientCnx.java:218) at org.apache.pulsar.client.impl.ClientCnx.channelActive(ClientCnx.java:199) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.fulfillConnectPromise(AbstractEpollChannel.java:620) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:653) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:60) ... 20 more ``` This happens since the token supplier function become null when it gets transferred to eg. a Spark executor. Refer to this issue (which is roughly the same): https://github.com/streamnative/pulsar/issues/905 . ### Modifications Added a new token storage class, `SerializableAuthenticationToken`, which can be serialized properly (this is tested in its corresponding unit test as well). On the other hand, this class does not use supplier functions to fetch token data, it stores the token as a plain string instead. For use-cases requiring a token supplier, this cannot be used, but can load its configuration from file (using the `file://` URL prefix), like the original `AuthenticationToken` class. Corresponding unit test were added to the class. Will also add proper documentation prior to merging once the change can be accepted - if that is required.
- Loading branch information