Skip to content

Commit

Permalink
Merge pull request zino-hofmann#481 from EugenePisotsky/fix-subscript…
Browse files Browse the repository at this point in the history
…ions-reconnect

fix: subscriptions reconnect
  • Loading branch information
mainawycliffe authored Dec 10, 2019
2 parents f2cbf18 + 616a877 commit 4d835ea
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 3 deletions.
22 changes: 21 additions & 1 deletion packages/graphql/lib/src/socket_client.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:typed_data';
import 'package:meta/meta.dart';
Expand Down Expand Up @@ -74,6 +75,9 @@ class SocketClient {
final BehaviorSubject<SocketConnectionState> _connectionStateController =
BehaviorSubject<SocketConnectionState>();

final HashMap<String, Function> _subscriptionInitializers = HashMap();
bool _connectionWasLost = false;

Timer _reconnectTimer;
WebSocket _socket;
@visibleForTesting
Expand Down Expand Up @@ -132,6 +136,14 @@ class SocketClient {
onError: (dynamic e) {
print('error: $e');
});

if (_connectionWasLost) {
for (Function callback in _subscriptionInitializers.values) {
callback();
}

_connectionWasLost = false;
}
} catch (e) {
onConnectionLost(e);
}
Expand All @@ -150,6 +162,8 @@ class SocketClient {
return;
}

_connectionWasLost = true;

if (_connectionStateController.value !=
SocketConnectionState.NOT_CONNECTED) {
_connectionStateController.value = SocketConnectionState.NOT_CONNECTED;
Expand Down Expand Up @@ -244,7 +258,7 @@ class SocketClient {
final bool addTimeout = !payload.operation.isSubscription &&
config.queryAndMutationTimeout != null;

response.onListen = () {
final onListen = () {
final Observable<SocketConnectionState>
waitForConnectedStateWithoutTimeout = _connectionStateController
.startWith(
Expand Down Expand Up @@ -322,14 +336,20 @@ class SocketClient {
});
};

response.onListen = onListen;

response.onCancel = () {
_subscriptionInitializers.remove(id);

sub?.cancel();
if (_connectionStateController.value == SocketConnectionState.CONNECTED &&
_socket != null) {
_write(StopOperation(id));
}
};

_subscriptionInitializers[id] = onListen;

return response.stream;
}

Expand Down
47 changes: 45 additions & 2 deletions packages/graphql/test/socket_client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import 'dart:convert';
import 'dart:typed_data';

import 'package:gql/language.dart';
import 'package:graphql/client.dart';
import 'package:graphql/src/link/operation.dart';
import 'package:graphql/src/websocket/messages.dart';
import 'package:test/test.dart';
import 'package:graphql/src/socket_client.dart'
show SocketClient, SocketConnectionState;
import 'package:graphql/src/websocket/messages.dart';
import 'package:test/test.dart';

import 'helpers.dart';

Expand Down Expand Up @@ -63,6 +64,48 @@ void main() {
}));
});

await expectLater(
subscriptionDataStream,
emits(
SubscriptionData(
'01020304-0506-4708-890a-0b0c0d0e0f10',
{'foo': 'bar'},
['error and data can coexist'],
),
),
);
});
test('resubscribe', () async {
final payload = SubscriptionRequest(
Operation(documentNode: gql('subscription {}')),
);
final waitForConnection = true;
final subscriptionDataStream =
socketClient.subscribe(payload, waitForConnection);

socketClient.onConnectionLost();

await socketClient.connectionState
.where((state) => state == SocketConnectionState.CONNECTED)
.first;

// ignore: unawaited_futures
socketClient.socket.stream
.where((message) =>
message ==
r'{"type":"start","id":"01020304-0506-4708-890a-0b0c0d0e0f10","payload":{"operationName":null,"query":"subscription {\n \n}","variables":{}}}')
.first
.then((_) {
socketClient.socket.add(jsonEncode({
'type': 'data',
'id': '01020304-0506-4708-890a-0b0c0d0e0f10',
'payload': {
'data': {'foo': 'bar'},
'errors': ['error and data can coexist']
}
}));
});

await expectLater(
subscriptionDataStream,
emits(
Expand Down

0 comments on commit 4d835ea

Please sign in to comment.