Skip to content

Commit

Permalink
Merge pull request zino-hofmann#1366 from vytautas-pranskunas-/refres…
Browse files Browse the repository at this point in the history
…h-token-and-auto-reconnect

feat(graphql): added WebSocket token refresh and autoReconnect toggling
  • Loading branch information
vincenzopalazzo authored Jul 24, 2023
2 parents 71d7502 + e1c6d54 commit d41edf5
Show file tree
Hide file tree
Showing 4 changed files with 325 additions and 27 deletions.
110 changes: 96 additions & 14 deletions packages/graphql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,87 @@ subscription = client.subscribe(
subscription.listen(reactToAddedReview)
```

#### Adding headers (including auth) to WebSocket

In order to add auth header or any other header to websocket connection use `initialPayload` property

```dart
initialPayload: () {
var headers = <String, String>{};
headers.putIfAbsent(HttpHeaders.authorizationHeader, () => token);
return headers;
},
```

#### Refreshing headers (including auth)

In order to refresh auth header you need to setup `onConnectionLost` function

```dart
onConnectionLost: (int? code, String? reason) async {
if (code == 4001) {
await authTokenService.issueToken(refresh: true);
return Duration.zero;
}
return null;
}
```

Where `code` and `reason` are values returned from the server on connection close. There is no such code like 401 in WebSockets so you can use your custom and server code could look similar:

```typescript
subscriptions: {
'graphql-ws': {
onConnect: async (context: any) => {
const { connectionParams } = context;

if (!connectionParams) {
throw new Error('Connection params are missing');
}

const authToken = connectionParams.authorization;

if (authToken) {
const isValid await authService.isTokenValid(authToken);

if (!isValid) {
context.extra.socket.close(4001, 'Unauthorized');
}

return;
}
},
},
},
```

`onConnectionLost` function returns `Duration` which is basically `delayBetweenReconnectionAttempts` for current reconnect attempt. If duration is `null` then default `delayBetweenReconnectionAttempts` will be used. Otherwise returned value. For example upon expired auth token there is not much sense to wait after token is refreshed.

#### Handling connection manually

`toggleConnection` stream was introduced to allow connect or disconnect manually.

```dart
var toggleConnection = PublishSubject<ToggleConnectionState>;
SocketClientConfig(
toggleConnection: toggleConnection,
),
```

later from your code call

```dart
toggleConnection.add(ToggleConnectionState.disconnect);
//OR
toggleConnection.add(ToggleConnectionState.connect);
```

When `disconnect` event is called `autoReconnect` stops. When `connect` is called `autoReconnect` resumes.
this is useful when for some reason you want to stop reconnection. For example when user logouts from the system and reconnection would cause auth error from server causing infinite loop.

#### Customizing WebSocket Connections

`WebSocketLink` now has an experimental `connect` parameter that can be
Expand Down Expand Up @@ -427,15 +508,15 @@ class _Connection {
```

2- if you need to update your socket just cancel your subscription and resubscribe again using usual way
2- if you need to update your socket just cancel your subscription and resubscribe again using usual way
and if the token changed it will be reconnect with the new token otherwise it will use the same client



### `client.watchQuery` and `ObservableQuery`

[`client.watchQuery`](https://pub.dev/documentation/graphql/latest/graphql/GraphQLClient/watchQuery.html)
can be used to execute both queries and mutations, then reactively listen to changes to the underlying data in the cache.
can be used to execute both queries and mutations, then reactively listen to changes to the underlying data in the cache.

```dart
final observableQuery = client.watchQuery(
Expand Down Expand Up @@ -506,7 +587,7 @@ To disable cache normalization entirely, you could pass `(data) => null`.
If you only cared about `nodeId`, you could pass `(data) => data['nodeId']`.

Here's a more detailed example where the system involved contains versioned entities you don't want to clobber:
```dart
```dart
String customDataIdFromObject(Map<String, Object> data) {
final typeName = data['__typename'];
final entityId = data['entityId'];
Expand Down Expand Up @@ -589,17 +670,17 @@ query {

```

if you're not providing the possible type map and introspecting the typename, the cache can't be updated.
if you're not providing the possible type map and introspecting the typename, the cache can't be updated.

## Direct Cache Access API

The [`GraphQLCache`](https://pub.dev/documentation/graphql/latest/graphql/GraphQLCache-class.html)
leverages [`normalize`] to give us a fairly apollo-ish [direct cache access] API, which is also available on `GraphQLClient`.
This means we can do [local state management] in a similar fashion as well.

The cache access methods are available on any cache proxy, which includes the `GraphQLCache` the `OptimisticProxy` passed to `update` in the `graphql_flutter` `Mutation` widget, and the `client` itself.
The cache access methods are available on any cache proxy, which includes the `GraphQLCache` the `OptimisticProxy` passed to `update` in the `graphql_flutter` `Mutation` widget, and the `client` itself.
> **NB** counter-intuitively, you likely never want to use use direct cache access methods directly on the `cache`,
> as they will not be rebroadcast automatically.
> as they will not be rebroadcast automatically.
> **Prefer `client.writeQuery` and `client.writeFragment` to those on the `client.cache` for automatic rebroadcasting**
In addition to this overview, a complete and well-commented rundown of can be found in the
Expand Down Expand Up @@ -641,10 +722,10 @@ final data = client.readQuery(queryRequest);
client.writeQuery(queryRequest, data);
```

The cache access methods are available on any cache proxy, which includes the `GraphQLCache` the `OptimisticProxy` passed to `update` in the `graphql_flutter` `Mutation` widget, and the `client` itself.
> **NB** counter-intuitively, you likely never want to use use direct cache access methods on the cache
The cache access methods are available on any cache proxy, which includes the `GraphQLCache` the `OptimisticProxy` passed to `update` in the `graphql_flutter` `Mutation` widget, and the `client` itself.
> **NB** counter-intuitively, you likely never want to use use direct cache access methods on the cache
cache.readQuery(queryRequest);
client.readQuery(queryRequest); //
client.readQuery(queryRequest); //

### `FragmentRequest`, `readFragment`, and `writeFragment`
`FragmentRequest` has almost the same api as `Request`, but is provided directly from `graphql` for consistency.
Expand Down Expand Up @@ -710,7 +791,7 @@ client.query(QueryOptions(
errorPolicy: ErrorPolicy.ignore,
// ignore cache data.
cacheRereadPolicy: CacheRereadPolicy.ignore,
// ...
// ...
));
```
Defaults can also be overridden via `defaultPolices` on the client itself:
Expand All @@ -724,11 +805,11 @@ GraphQLClient(
CacheRereadPolicy.mergeOptimistic,
),
),
// ...
// ...
)
```

**[`FetchPolicy`](https://pub.dev/documentation/graphql/latest/graphql/FetchPolicy-class.html):** determines where the client may return a result from, and whether that result will be saved to the cache.
**[`FetchPolicy`](https://pub.dev/documentation/graphql/latest/graphql/FetchPolicy-class.html):** determines where the client may return a result from, and whether that result will be saved to the cache.
Possible options:

- cacheFirst: return result from cache. Only fetch from network if cached result is not available.
Expand All @@ -737,7 +818,7 @@ Possible options:
- noCache: return result from network, fail if network call doesn't succeed, don't save to cache.
- networkOnly: return result from network, fail if network call doesn't succeed, save to cache.

**[`ErrorPolicy`](https://pub.dev/documentation/graphql/latest/graphql/ErrorPolicy-class.html):** determines the level of events for errors in the execution result.
**[`ErrorPolicy`](https://pub.dev/documentation/graphql/latest/graphql/ErrorPolicy-class.html):** determines the level of events for errors in the execution result.
Possible options:

- none (default): Any GraphQL Errors are treated the same as network errors and any data is ignored from the response.
Expand Down Expand Up @@ -869,7 +950,7 @@ API key, IAM, and Federated provider authorization could be accomplished through

This package does not support code-generation out of the box, but [graphql_codegen](https://pub.dev/packages/graphql_codegen) does!

This package extensions on the client which takes away the struggle of serialization and gives you confidence through type-safety.
This package extensions on the client which takes away the struggle of serialization and gives you confidence through type-safety.
It is also more performant than parsing GraphQL queries at runtime.

For example, by creating the `.graphql` file
Expand Down Expand Up @@ -966,3 +1047,4 @@ HttpLink httpLink = HttpLink('https://api.url/graphql', defaultHeaders: {
[local state management]: https://www.apollographql.com/docs/tutorial/local-state/#update-local-data
[`typepolicies`]: https://www.apollographql.com/docs/react/caching/cache-configuration/#the-typepolicy-type
[direct cache access]: https://www.apollographql.com/docs/react/caching/cache-interaction/

69 changes: 56 additions & 13 deletions packages/graphql/lib/src/links/websocket_link/websocket_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class SubscriptionListener {

enum SocketConnectionState { notConnected, handshake, connecting, connected }

enum ToggleConnectionState { disconnect, connect }

class SocketClientConfig {
const SocketClientConfig({
this.serializer = const RequestSerializer(),
Expand All @@ -48,6 +50,8 @@ class SocketClientConfig {
this.initialPayload,
this.headers,
this.connectFn,
this.onConnectionLost,
this.toggleConnection,
});

/// Serializer used to serialize request
Expand Down Expand Up @@ -98,6 +102,11 @@ class SocketClientConfig {
/// Custom header to add inside the client
final Map<String, dynamic>? headers;

final Future<Duration?>? Function(int? code, String? reason)?
onConnectionLost;

final Stream<ToggleConnectionState>? toggleConnection;

/// Function to define another connection without call directly
/// the connection function
FutureOr<WebSocketChannel> connect(
Expand Down Expand Up @@ -192,6 +201,7 @@ class SocketClient {
@visibleForTesting this.onMessage,
@visibleForTesting this.onStreamError = _defaultOnStreamError,
}) {
_listenToToggleConnection();
_connect();
}

Expand Down Expand Up @@ -232,6 +242,30 @@ class SocketClient {
Response Function(Map<String, dynamic>) get parse =>
config.parser.parseResponse;

bool _isReconnectionPaused = false;
final _unsubscriber = PublishSubject<void>();

void _listenToToggleConnection() {
if (config.toggleConnection != null) {
config.toggleConnection!
.where((_) => !_connectionStateController.isClosed)
.takeUntil(_unsubscriber)
.listen((event) {
if (event == ToggleConnectionState.disconnect &&
_connectionStateController.value ==
SocketConnectionState.connected) {
_isReconnectionPaused = true;
onConnectionLost();
} else if (event == ToggleConnectionState.connect &&
_connectionStateController.value ==
SocketConnectionState.notConnected) {
_isReconnectionPaused = false;
_connect();
}
});
}
}

void _disconnectOnKeepAliveTimeout(Stream<GraphQLSocketMessage> messages) {
_keepAliveSubscription = messages.whereType<ConnectionKeepAlive>().timeout(
config.inactivityTimeout!,
Expand Down Expand Up @@ -334,6 +368,9 @@ class SocketClient {
}

void onConnectionLost([Object? e]) async {
var code = socketChannel!.closeCode;
var reason = socketChannel!.closeReason;

await _closeSocketChannel();
if (e != null) {
print('There was an error causing connection lost: $e');
Expand All @@ -344,27 +381,32 @@ class SocketClient {
_keepAliveSubscription?.cancel();
_messageSubscription?.cancel();

//TODO: do we really need this check here because few lines bellow there is another check
if (_connectionStateController.isClosed || _wasDisposed) {
return;
}

_connectionWasLost = true;
_subscriptionInitializers.values.forEach((s) => s.hasBeenTriggered = false);

if (config.autoReconnect &&
!_connectionStateController.isClosed &&
!_wasDisposed) {
if (config.delayBetweenReconnectionAttempts != null) {
_reconnectTimer = Timer(
config.delayBetweenReconnectionAttempts!,
() {
_connect();
},
);
} else {
Timer.run(() => _connect());
}
if (_isReconnectionPaused ||
!config.autoReconnect ||
_connectionStateController.isClosed ||
_wasDisposed) {
return;
}

var duration = config.delayBetweenReconnectionAttempts ?? Duration.zero;
if (config.onConnectionLost != null) {
duration = (await config.onConnectionLost!(code, reason)) ?? duration;
}

_reconnectTimer = Timer(
duration,
() async {
_connect();
},
);
}

void _enqueuePing() {
Expand All @@ -389,6 +431,7 @@ class SocketClient {
_reconnectTimer?.cancel();
_pingTimer?.cancel();
_keepAliveSubscription?.cancel();
_unsubscriber.close();

await Future.wait([
_closeSocketChannel(),
Expand Down
3 changes: 3 additions & 0 deletions packages/graphql/test/mock_server/ws_echo_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'dart:convert';
import 'dart:io';

const String forceDisconnectCommand = '___force_disconnect___';
const String forceAuthDisconnectCommand = '___force_auth_disconnect___';

/// Main function to create and run the echo server over the web socket.
Future<String> runWebSocketServer(
Expand All @@ -20,6 +21,8 @@ void onWebSocketData(WebSocket client) {
client.listen((data) async {
if (data == forceDisconnectCommand) {
client.close(WebSocketStatus.normalClosure, 'shutting down');
} else if (data == forceAuthDisconnectCommand) {
client.close(4001, 'Unauthorized');
} else {
final message = json.decode(data.toString());
if (message['type'] == 'connection_init' &&
Expand Down
Loading

0 comments on commit d41edf5

Please sign in to comment.