Skip to content

Commit

Permalink
Fixed ProxyConnection to check for existence of auth_data field (apac…
Browse files Browse the repository at this point in the history
…he#12057)

### Motivation

ProxyConnection is not checking whether the optional auth_data field is set or not. 

Additionally, if there's an error before the client instance is created, we should avoid calling close on it. 

```
2021-09-16T00:49:40,213 [pulsar-proxy-io-2-3] WARN  org.apache.pulsar.proxy.server.ProxyConnection - [/10.199.78.158:7165] Unable to authenticate:                                      
java.lang.IllegalStateException: Field 'auth_data' is not set                                                                                                                           
    at org.apache.pulsar.common.api.proto.CommandConnect.getAuthDataSlice(CommandConnect.java:90) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]                                    
    at org.apache.pulsar.common.api.proto.CommandConnect.getAuthData(CommandConnect.java:83) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]                                         
    at org.apache.pulsar.proxy.server.ProxyConnection.handleConnect(ProxyConnection.java:318) [org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]                                          
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:166) [org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]                                            
    at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:192) [org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]                                            
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                  
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                                  
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                                      
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                  
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1374) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]                                                              
    at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1248) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]                                              
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1288) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]                                                              
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                   
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                                       
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                                      
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                  
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                       
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                                
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux- 
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]                   
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]                            
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]                               
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]                                                  
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]                                      
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302]
2021-09-16T00:49:40,215 [pulsar-proxy-io-2-3] WARN  io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usua 
java.lang.NullPointerException: null                                                                                                                                                    
    at org.apache.pulsar.proxy.server.ProxyConnection.close(ProxyConnection.java:416) ~[org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]                                                 
    at org.apache.pulsar.proxy.server.ProxyConnection.handleConnect(ProxyConnection.java:356) ~[org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]                                         
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:166) ~[org.apache.pulsar-pulsar-common-2.8.1.jar:2.8.1]                                           
    at org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:192) ~[org.apache.pulsar-pulsar-proxy-2.8.1.jar:2.8.1]                                           
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                  
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                                  
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                                      
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                  
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1374) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]                                                              
    at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1248) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]                                              
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1288) [io.netty-netty-handler-4.1.66.Final.jar:4.1.66.Final]                                                              
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                   
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                                       
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [io.netty-netty-codec-4.1.66.Final.jar:4.1.66.Final]                                      
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                  
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                       
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.66.Final.jar:4.1.66.Final]                                
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux- 
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]                   
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar:4.1.66.Final]                            
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]                               
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]                                                  
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.66.Final.jar:4.1.66.Final]                                      
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_302] 
```
  • Loading branch information
merlimat authored Sep 16, 2021
1 parent bb80c5b commit 8042512
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
return;
}

AuthData clientData = AuthData.of(connect.getAuthData());
AuthData clientData = AuthData.of(connect.hasAuthData() ? connect.getAuthData() : null);
if (connect.hasAuthMethodName()) {
authMethod = connect.getAuthMethodName();
} else if (connect.hasAuthMethod()) {
Expand Down Expand Up @@ -413,7 +413,9 @@ private void close() {
state = State.Closed;
ctx.close();
try {
client.close();
if (client != null) {
client.close();
}
} catch (PulsarClientException e) {
LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
}
Expand Down

0 comments on commit 8042512

Please sign in to comment.