Skip to content

Commit

Permalink
[java] fixed bug in the connection negotiation code
Browse files Browse the repository at this point in the history
This patch fixes a typo in the connection negotiation code in the Java
client.  Prior to this fix, channel binding information was not verified
during connection negotiation because the peer certificate was not set.

In addition, I modified the error handing code in Negotiator.java to
abort connection negotiation upon receiving SSLPeerUnverifiedException
due to the absence of the channel binding information or the presence
of the invalid one.

I also added a test to verify that Kudu Java client doesn't connect
to a Kudu server which doesn't provide valid channel binding information
during the connection negotiation phase.

Kudos to Andy Singer for pointing to the bug.

Change-Id: I7bfd428128e224f03901a6cd7b33283495a28d54
Reviewed-on: http://gerrit.cloudera.org:8080/14713
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
Reviewed-by: Todd Lipcon <[email protected]>
(cherry picked from commit a0e8964)
Reviewed-on: http://gerrit.cloudera.org:8080/14727
Reviewed-by: Alexey Serbin <[email protected]>
  • Loading branch information
alexeyserbin committed Nov 16, 2019
1 parent 34dfee4 commit 91d196f
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -451,6 +452,11 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEven
// SSLException if we've already attempted to close, otherwise log the error.
error = new RecoverableException(Status.NetworkError(
String.format("%s disconnected from peer", getLogPrefix())));
} else if (e instanceof SSLPeerUnverifiedException) {
String m = String.format("unable to verify identity of peer %s: %s",
serverInfo, e.getMessage());
error = new NonRecoverableException(Status.NetworkError(m), e);
LOG.error(m, e);
} else {
// If the connection was explicitly disconnected via a call to disconnect(), we should
// have either gotten a ClosedChannelException or an SSLException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,9 @@ private void handleTlsMessage(Channel chan, NegotiatePB response) throws IOExcep
throw new SSLPeerUnverifiedException("no peer cert found");
}

// The first element of the array is the peer's own certificate.
peerCert = certs[0];

// Don't wrap the TLS socket if we are using TLS for authentication only.
boolean isAuthOnly = serverFeatures.contains(RpcFeatureFlag.TLS_AUTHENTICATION_ONLY) &&
isLoopbackConnection(chan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableList;
import com.stumbleupon.async.Deferred;
import org.hamcrest.CoreMatchers;
import org.junit.After;
Expand Down Expand Up @@ -61,8 +62,19 @@ public class TestSecurity {
private enum Option {
LONG_LEADER_ELECTION,
SHORT_TOKENS_AND_TICKETS,
START_TSERVERS
};
START_TSERVERS,
}

static private class KeyValueMessage {
final String key;
final String val;
final String msg;
KeyValueMessage(String k, String v, String m) {
key = k;
val = v;
msg = m;
}
}

private void startCluster(Set<Option> opts) throws IOException {
MiniKuduClusterBuilder mcb = new MiniKuduClusterBuilder();
Expand Down Expand Up @@ -435,4 +447,46 @@ public void testExternallyProvidedSubjectRefreshedExternally() throws Exception
Assert.assertThat(cla.getAppendedText(), CoreMatchers.containsString(
"Using caller-provided subject with Kerberos principal [email protected]."));
}

/**
* Test that if a Kudu server (in this case master) doesn't provide valid
* connection binding information, Java client fails to connect to the server.
*/
@Test(timeout=60000)
public void testNegotiationChannelBindings() throws Exception {
startCluster(ImmutableSet.of(Option.START_TSERVERS));
// Test precondition: all is well with masters -- the client is able
// to connect to the cluster and create a table.
client.createTable("TestSecurity-channel-bindings-0",
getBasicSchema(), getBasicCreateTableOptions());

List<KeyValueMessage> variants = ImmutableList.of(
new KeyValueMessage("rpc_inject_invalid_channel_bindings_ratio", "1.0",
"invalid channel bindings provided by remote peer"),
new KeyValueMessage("rpc_send_channel_bindings", "false",
"no channel bindings provided by remote peer"));

// Make all masters sending invalid channel binding info during connection
// negotiation.
for (KeyValueMessage kvm : variants) {
for (HostAndPort hp : miniCluster.getMasterServers()) {
miniCluster.setMasterFlag(hp, kvm.key, kvm.val);
}

// Now, a client should not be able to connect to any master: negotiation
// fails because client cannot authenticate the servers since it fails
// to verify the connection binding.
try {
KuduClient c = new KuduClient.KuduClientBuilder(
miniCluster.getMasterAddressesAsString()).build();
c.createTable("TestSecurity-channel-bindings-1",
getBasicSchema(), getBasicCreateTableOptions());
Assert.fail("client should not be able to connect to any master");
} catch (NonRecoverableException e) {
Assert.assertThat(e.getMessage(), CoreMatchers.containsString(
"unable to verify identity of peer"));
Assert.assertThat(e.getMessage(), CoreMatchers.containsString(kvm.msg));
}
}
}
}
20 changes: 19 additions & 1 deletion src/kudu/rpc/server_negotiation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ DEFINE_double(rpc_inject_invalid_authn_token_ratio, 0,
TAG_FLAG(rpc_inject_invalid_authn_token_ratio, runtime);
TAG_FLAG(rpc_inject_invalid_authn_token_ratio, unsafe);

DEFINE_double(rpc_inject_invalid_channel_bindings_ratio, 0,
"The ratio of injection of invalid channel bindings during "
"connection negotiation. This is a test-only flag.");
TAG_FLAG(rpc_inject_invalid_channel_bindings_ratio, runtime);
TAG_FLAG(rpc_inject_invalid_channel_bindings_ratio, unsafe);

DEFINE_bool(rpc_send_channel_bindings, true,
"Whether to send channel bindings in NegotiatePB response as "
"prescribed by RFC 5929. This is a test-only flag.");
TAG_FLAG(rpc_send_channel_bindings, runtime);
TAG_FLAG(rpc_send_channel_bindings, unsafe);

DECLARE_bool(rpc_encrypt_loopback_connections);

DEFINE_string(trusted_subnets,
Expand Down Expand Up @@ -879,14 +891,20 @@ Status ServerNegotiation::SendSaslSuccess() {
RETURN_NOT_OK(security::GenerateNonce(nonce_.get_ptr()));
response.set_nonce(*nonce_);

if (tls_negotiated_) {
if (tls_negotiated_ && PREDICT_TRUE(FLAGS_rpc_send_channel_bindings)) {
// Send the channel bindings to the client.
security::Cert cert;
RETURN_NOT_OK(tls_handshake_.GetLocalCert(&cert));

string plaintext_channel_bindings;
RETURN_NOT_OK(cert.GetServerEndPointChannelBindings(&plaintext_channel_bindings));

if (kudu::fault_injection::MaybeTrue(
FLAGS_rpc_inject_invalid_channel_bindings_ratio)) {
DCHECK_GT(plaintext_channel_bindings.size(), 0);
plaintext_channel_bindings[0] += 1;
}

Slice ciphertext;
RETURN_NOT_OK(SaslEncode(sasl_conn_.get(),
plaintext_channel_bindings,
Expand Down

0 comments on commit 91d196f

Please sign in to comment.