Skip to content

Commit 8a1ec2d

Browse files
GEODE-4132: Sending handshake response from locator for new protocol
Changing the locator to send a response to the NewConnectionVersion message to be consistent with the server. This change also allows the locator to handle an authentication message, which means that GetAvailableServers will work if a security manager is enabled and the client sends a validate authentication message. Signed-off-by: Sarge <[email protected]>
1 parent 04f1419 commit 8a1ec2d

File tree

14 files changed

+334
-162
lines changed

14 files changed

+334
-162
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the License
11+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
* or implied. See the License for the specific language governing permissions and limitations under
13+
* the License.
14+
*/
15+
16+
package org.apache.geode.internal.protocol;
17+
18+
19+
import org.apache.geode.annotations.Experimental;
20+
import org.apache.geode.cache.Cache;
21+
import org.apache.geode.distributed.Locator;
22+
import org.apache.geode.internal.exception.InvalidExecutionContextException;
23+
import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
24+
import org.apache.geode.internal.protocol.state.ConnectionTerminatingStateProcessor;
25+
import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
26+
27+
@Experimental
28+
public class LocatorMessageExecutionContext extends MessageExecutionContext {
29+
private final Locator locator;
30+
31+
public LocatorMessageExecutionContext(Locator locator, ProtocolClientStatistics statistics,
32+
ConnectionStateProcessor initialConnectionStateProcessor) {
33+
super(statistics, initialConnectionStateProcessor);
34+
this.locator = locator;
35+
}
36+
37+
/**
38+
* Returns the cache associated with this execution
39+
* <p>
40+
*
41+
* @throws InvalidExecutionContextException if there is no cache available
42+
*/
43+
@Override
44+
public Cache getCache() throws InvalidExecutionContextException {
45+
setConnectionStateProcessor(new ConnectionTerminatingStateProcessor());
46+
throw new InvalidExecutionContextException(
47+
"Operations on the locator should not to try to operate on a server");
48+
}
49+
50+
/**
51+
* Returns the locator associated with this execution
52+
* <p>
53+
*
54+
* @throws InvalidExecutionContextException if there is no locator available
55+
*/
56+
@Override
57+
public Locator getLocator() throws InvalidExecutionContextException {
58+
return locator;
59+
}
60+
61+
}

geode-client-protocol/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java

+8-47
Original file line numberDiff line numberDiff line change
@@ -12,72 +12,33 @@
1212
* or implied. See the License for the specific language governing permissions and limitations under
1313
* the License.
1414
*/
15-
1615
package org.apache.geode.internal.protocol;
1716

18-
1917
import org.apache.geode.annotations.Experimental;
2018
import org.apache.geode.cache.Cache;
2119
import org.apache.geode.distributed.Locator;
22-
import org.apache.geode.distributed.internal.InternalLocator;
2320
import org.apache.geode.internal.exception.InvalidExecutionContextException;
2421
import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
2522
import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
2623

2724
@Experimental
28-
public class MessageExecutionContext {
29-
private final Cache cache;
30-
private final Locator locator;
31-
private final ProtocolClientStatistics statistics;
32-
private ConnectionStateProcessor connectionStateProcessor;
33-
34-
public MessageExecutionContext(Cache cache, ProtocolClientStatistics statistics,
35-
ConnectionStateProcessor initialConnectionStateProcessor) {
36-
this.cache = cache;
37-
this.locator = null;
38-
this.statistics = statistics;
39-
this.connectionStateProcessor = initialConnectionStateProcessor;
40-
}
25+
public abstract class MessageExecutionContext {
26+
protected final ProtocolClientStatistics statistics;
27+
protected ConnectionStateProcessor connectionStateProcessor;
4128

42-
public MessageExecutionContext(InternalLocator locator, ProtocolClientStatistics statistics,
43-
ConnectionStateProcessor initialConnectionStateProcessor) {
44-
this.locator = locator;
45-
this.cache = null;
29+
public MessageExecutionContext(ProtocolClientStatistics statistics,
30+
ConnectionStateProcessor connectionStateProcessor) {
4631
this.statistics = statistics;
47-
connectionStateProcessor = initialConnectionStateProcessor;
32+
this.connectionStateProcessor = connectionStateProcessor;
4833
}
4934

5035
public ConnectionStateProcessor getConnectionStateProcessor() {
5136
return connectionStateProcessor;
5237
}
5338

54-
/**
55-
* Returns the cache associated with this execution
56-
* <p>
57-
*
58-
* @throws InvalidExecutionContextException if there is no cache available
59-
*/
60-
public Cache getCache() throws InvalidExecutionContextException {
61-
if (cache != null) {
62-
return cache;
63-
}
64-
throw new InvalidExecutionContextException(
65-
"Operations on the locator should not to try to operate on a cache");
66-
}
39+
public abstract Cache getCache() throws InvalidExecutionContextException;
6740

68-
/**
69-
* Returns the locator associated with this execution
70-
* <p>
71-
*
72-
* @throws InvalidExecutionContextException if there is no locator available
73-
*/
74-
public Locator getLocator() throws InvalidExecutionContextException {
75-
if (locator != null) {
76-
return locator;
77-
}
78-
throw new InvalidExecutionContextException(
79-
"Operations on the server should not to try to operate on a locator");
80-
}
41+
public abstract Locator getLocator() throws InvalidExecutionContextException;
8142

8243
/**
8344
* Returns the statistics for recording operation stats. In a unit test environment this may not
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3+
* agreements. See the NOTICE file distributed with this work for additional information regarding
4+
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
5+
* "License"); you may not use this file except in compliance with the License. You may obtain a
6+
* copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software distributed under the License
11+
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12+
* or implied. See the License for the specific language governing permissions and limitations under
13+
* the License.
14+
*/
15+
16+
package org.apache.geode.internal.protocol;
17+
18+
19+
import org.apache.geode.annotations.Experimental;
20+
import org.apache.geode.cache.Cache;
21+
import org.apache.geode.distributed.Locator;
22+
import org.apache.geode.distributed.internal.InternalLocator;
23+
import org.apache.geode.internal.exception.InvalidExecutionContextException;
24+
import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
25+
import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
26+
27+
@Experimental
28+
public class ServerMessageExecutionContext extends MessageExecutionContext {
29+
private final Cache cache;
30+
31+
public ServerMessageExecutionContext(Cache cache, ProtocolClientStatistics statistics,
32+
ConnectionStateProcessor initialConnectionStateProcessor) {
33+
super(statistics, initialConnectionStateProcessor);
34+
this.cache = cache;
35+
}
36+
37+
/**
38+
* Returns the cache associated with this execution
39+
* <p>
40+
*
41+
* @throws InvalidExecutionContextException if there is no cache available
42+
*/
43+
@Override
44+
public Cache getCache() throws InvalidExecutionContextException {
45+
return cache;
46+
}
47+
48+
/**
49+
* Returns the locator associated with this execution
50+
* <p>
51+
*
52+
* @throws InvalidExecutionContextException if there is no locator available
53+
*/
54+
@Override
55+
public Locator getLocator() throws InvalidExecutionContextException {
56+
throw new InvalidExecutionContextException(
57+
"Operations on the server should not to try to operate on a locator");
58+
}
59+
60+
}

geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,9 @@ public DistributionConfigImpl getConfig() {
523523
}
524524

525525
public InternalCache getCache() {
526+
if (myCache == null) {
527+
return GemFireCacheImpl.getInstance();
528+
}
526529
return myCache;
527530
}
528531

@@ -1082,7 +1085,11 @@ private void restartWithDS(InternalDistributedSystem newSystem, InternalCache ne
10821085

10831086
@Override
10841087
public DistributedSystem getDistributedSystem() {
1085-
return this.myDs;
1088+
if (myDs == null) {
1089+
return InternalDistributedSystem.getAnyInstance();
1090+
}
1091+
1092+
return myDs;
10861093
}
10871094

10881095
@Override

geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -534,9 +534,11 @@ private void handleProtobufConnection(Socket socket, DataInputStream input) thro
534534
ClientProtocolService clientProtocolService = clientProtocolServiceLoader.lookupService();
535535
clientProtocolService.initializeStatistics("LocatorStats",
536536
internalLocator.getDistributedSystem());
537-
try (ClientProtocolProcessor pipeline =
538-
clientProtocolService.createProcessorForLocator(internalLocator)) {
539-
pipeline.processMessage(input, socket.getOutputStream());
537+
try (ClientProtocolProcessor pipeline = clientProtocolService.createProcessorForLocator(
538+
internalLocator, internalLocator.getCache().getSecurityService())) {
539+
while (!pipeline.socketProcessingIsFinished()) {
540+
pipeline.processMessage(input, socket.getOutputStream());
541+
}
540542
} catch (IncompatibleVersionException e) {
541543
// should not happen on the locator as there is no handshake.
542544
log.error("Unexpected exception in client message processing", e);

geode-core/src/main/java/org/apache/geode/internal/cache/client/protocol/ClientProtocolService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public interface ClientProtocolService {
3737
/**
3838
* Create a locator processor. The locator does not currently provide any authentication.
3939
*/
40-
ClientProtocolProcessor createProcessorForLocator(InternalLocator locator);
40+
ClientProtocolProcessor createProcessorForLocator(InternalLocator locator,
41+
SecurityService securityService);
4142

4243
int getServiceProtocolVersion();
4344
}

geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -133,24 +133,23 @@ private Collection<InetSocketAddress> getAvailableServers() throws IOException {
133133
final Socket locatorSocket = new Socket(locator.getAddress(), locator.getPort());
134134

135135
final OutputStream outputStream = locatorSocket.getOutputStream();
136+
final InputStream inputStream = locatorSocket.getInputStream();
136137
ProtocolVersion.NewConnectionClientVersion.newBuilder()
137138
.setMajorVersion(ProtocolVersion.MajorVersions.CURRENT_MAJOR_VERSION_VALUE)
138139
.setMinorVersion(ProtocolVersion.MinorVersions.CURRENT_MINOR_VERSION_VALUE).build()
139140
.writeDelimitedTo(outputStream);
140141

141142
// The locator does not currently send a reply to the ProtocolVersion...
142-
// if
143-
// (!ProtocolVersion.HandshakeAcknowledgement.parseDelimitedFrom(inputStream).getHandshakePassed())
144-
// {
145-
// throw new IOException("Failed ProtocolVersion.");
146-
// }
143+
if (!ProtocolVersion.VersionAcknowledgement.parseDelimitedFrom(inputStream)
144+
.getVersionAccepted()) {
145+
throw new IOException("Failed ProtocolVersion.");
146+
}
147147

148148
ClientProtocol.Message.newBuilder()
149149
.setRequest(ClientProtocol.Request.newBuilder()
150150
.setGetAvailableServersRequest(LocatorAPI.GetAvailableServersRequest.newBuilder()))
151151
.build().writeDelimitedTo(outputStream);
152152

153-
final InputStream inputStream = locatorSocket.getInputStream();
154153
LocatorAPI.GetAvailableServersResponse getAvailableServersResponse = ClientProtocol.Message
155154
.parseDelimitedFrom(inputStream).getResponse().getGetAvailableServersResponse();
156155
if (getAvailableServersResponse.getServersCount() < 1) {

geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufCachePipeline.java

+3-11
Original file line numberDiff line numberDiff line change
@@ -20,31 +20,23 @@
2020
import java.io.OutputStream;
2121

2222
import org.apache.geode.annotations.Experimental;
23-
import org.apache.geode.cache.Cache;
2423
import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
2524
import org.apache.geode.internal.protocol.MessageExecutionContext;
26-
import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor;
27-
import org.apache.geode.internal.protocol.state.ConnectionStateProcessor;
2825
import org.apache.geode.internal.protocol.statistics.ProtocolClientStatistics;
29-
import org.apache.geode.internal.security.SecurityService;
3026

3127

3228
@Experimental
3329
public final class ProtobufCachePipeline implements ClientProtocolProcessor {
3430
private final ProtocolClientStatistics statistics;
3531
private final ProtobufStreamProcessor streamProcessor;
36-
private final ConnectionStateProcessor initialCacheConnectionStateProcessor;
3732
private final MessageExecutionContext messageExecutionContext;
3833

3934
ProtobufCachePipeline(ProtobufStreamProcessor protobufStreamProcessor,
40-
ProtocolClientStatistics statistics, Cache cache, SecurityService securityService) {
35+
MessageExecutionContext context) {
4136
this.streamProcessor = protobufStreamProcessor;
42-
this.statistics = statistics;
37+
this.statistics = context.getStatistics();
4338
this.statistics.clientConnected();
44-
this.initialCacheConnectionStateProcessor =
45-
new ProtobufConnectionHandshakeStateProcessor(securityService);
46-
this.messageExecutionContext =
47-
new MessageExecutionContext(cache, statistics, initialCacheConnectionStateProcessor);
39+
this.messageExecutionContext = context;
4840
}
4941

5042
@Override

0 commit comments

Comments
 (0)