Skip to content

Commit

Permalink
[Java] Fix nullref when Stop is called during connect (dotnet#34046)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy authored Jul 27, 2021
1 parent f5d6f84 commit bad82b8
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,28 +411,37 @@ private Single<NegotiateResponse> startNegotiate(String url, int negotiateAttemp
* @return A Completable that completes when the connection has been stopped.
*/
private Completable stop(String errorMessage) {
Transport transport;
ConnectionState connectionState;
Completable startTask;
this.state.lock();
try {
if (this.state.getHubConnectionState() == HubConnectionState.DISCONNECTED) {
return Completable.complete();
}

connectionState = this.state.getConnectionStateUnsynchronized(false);

if (errorMessage != null) {
this.state.getConnectionStateUnsynchronized(false).stopError = errorMessage;
connectionState.stopError = errorMessage;
logger.error("HubConnection disconnected with an error: {}.", errorMessage);
} else {
logger.debug("Stopping HubConnection.");
}

transport = this.state.getConnectionStateUnsynchronized(false).transport;
startTask = connectionState.startTask;
} finally {
this.state.unlock();
}

Completable stop = transport.stop();
stop.onErrorComplete().subscribe();
return stop;
Completable stopTask = startTask.onErrorComplete().andThen(Completable.defer(() ->
{
Completable stop = connectionState.transport.stop();
stop.onErrorComplete().subscribe();
return stop;
}));
stopTask.onErrorComplete().subscribe();

return stopTask;
}

private void ReceiveLoop(ByteBuffer payload)
Expand Down Expand Up @@ -1396,7 +1405,7 @@ public void handleHandshake(ByteBuffer payload) {
if (!handshakeReceived) {
List<Byte> handshakeByteList = new ArrayList<Byte>();
byte curr = payload.get();
// Add the handshake to handshakeBytes, but not the record separator
// Add the handshake to handshakeBytes, but not the record separator
while (curr != RECORD_SEPARATOR) {
handshakeByteList.add(curr);
curr = payload.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import ch.qos.logback.classic.spi.ILoggingEvent;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
Expand All @@ -44,7 +45,7 @@ public void checkHubConnectionState() {
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());

hubConnection.stop();
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}

Expand Down Expand Up @@ -99,7 +100,7 @@ public void constructHubConnectionWithHttpConnectionOptions() {
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.CONNECTED, hubConnection.getConnectionState());

hubConnection.stop();
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
}

Expand Down Expand Up @@ -2564,7 +2565,7 @@ public void onClosedCallbackRunsWhenStopIsCalled() {
assertNull(value1.get());
value1.set("Closed callback ran.");
});
hubConnection.stop();
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();

assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
assertEquals(value1.get(), "Closed callback ran.");
Expand All @@ -2589,7 +2590,7 @@ public void multipleOnClosedCallbacksRunWhenStopIsCalled() {

assertNull(value1.get());
assertNull(value2.get());
hubConnection.stop();
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();

assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
assertEquals("Closed callback ran.",value1.get());
Expand Down Expand Up @@ -3891,4 +3892,40 @@ public void hubConnectionCloseCallsStop() {

assertTrue(close.blockingAwait(30, TimeUnit.SECONDS));
}

@Test
public void hubConnectionStopDuringConnecting() {
MockTransport mockTransport = new MockTransport();
CompletableSubject waitForStop = CompletableSubject.create();
TestHttpClient client = new TestHttpClient()
.on("POST", "http://example.com/negotiate?negotiateVersion=1", (req) ->
{
return Single.defer(() -> {
waitForStop.blockingAwait();
return Single.just(new HttpResponse(200, "",
TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\"availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
}).subscribeOn(Schedulers.computation());
});

CompletableSubject close = CompletableSubject.create();

HubConnection hubConnection = HubConnectionBuilder
.create("http://example.com")
.withTransportImplementation(mockTransport)
.withHttpClient(client)
.build();

hubConnection.onClosed(e -> {
close.onComplete();
});
hubConnection.start();
assertEquals(HubConnectionState.CONNECTING, hubConnection.getConnectionState());

Completable stopTask = hubConnection.stop();
waitForStop.onComplete();
stopTask.timeout(30, TimeUnit.SECONDS).blockingAwait();
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());

assertTrue(close.blockingAwait(30, TimeUnit.SECONDS));
}
}

0 comments on commit bad82b8

Please sign in to comment.