Skip to content

Commit

Permalink
fix(graphql): fix rebroadcasting by refactoring onData callbacks into…
Browse files Browse the repository at this point in the history
… a simpler async function
  • Loading branch information
micimize committed Jun 4, 2020
1 parent 6ba687e commit 9a5fff1
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 60 deletions.
2 changes: 1 addition & 1 deletion examples/flutter_bloc/test/bloc_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void main() {
final results = QueryResult(
data: decodeGithubResponse['data'],
exception: null,
loading: false,
source: QueryResultSource.network,
);

when(
Expand Down
9 changes: 6 additions & 3 deletions packages/graphql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ First, depend on this package:

```yaml
dependencies:
graphql: ^4.0.0-rc1
graphql: ^4.0.0-alpha
```
And then import it inside your dart code:
Expand All @@ -29,7 +29,7 @@ import 'package:graphql/client.dart';

## Migration Guide

Find the migration from version 2 to version 3 [here](./../../changelog-v2-v3.md).
Find the migration from version 3 to version 4 [here](./../../changelog-v3-v4.md).

### Parsing at build-time

Expand Down Expand Up @@ -61,7 +61,10 @@ final AuthLink _authLink = AuthLink(
final Link _link = _authLink.concat(_httpLink);
final GraphQLClient _client = GraphQLClient(
cache: GraphQLCache(),
cache: GraphQLCache(
// The default store is the InMemoryStore, which does NOT persist to disk
store: HiveStore(),
),
link: _link,
);
Expand Down
11 changes: 9 additions & 2 deletions packages/graphql/lib/src/cache/cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,26 @@ export 'package:graphql/src/cache/data_proxy.dart';
export 'package:graphql/src/cache/store.dart';
export 'package:graphql/src/cache/hive_store.dart';

/// Optimmistic GraphQL Entity cache with [normalize] [TypePolicy] support
/// and configurable [store].
///
/// **NOTE**: The default [InMemoryStore] does _not_ persist to disk.
/// The recommended store for persistent environments is the [HiveStore].
class GraphQLCache extends NormalizingDataProxy {
GraphQLCache({
Store store,
this.dataIdFromObject,
this.typePolicies = const {},
}) : store = store ?? InMemoryStore();

/// Stores the underlying normalized data
/// Stores the underlying normalized data. Defaults to an [InMemoryStore]
@protected
final Store store;

/// `typePolicies` to pass down to `normalize`
/// `typePolicies` to pass down to [normalize]
final Map<String, TypePolicy> typePolicies;

/// Optional `dataIdFromObject` function to pass through to [normalize]
final DataIdResolver dataIdFromObject;

/// tracks the number of ongoing transactions to prevent
Expand Down
81 changes: 51 additions & 30 deletions packages/graphql/lib/src/core/observable_query.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'package:graphql/client.dart';
import 'package:meta/meta.dart';

import 'package:graphql/src/core/query_manager.dart';
Expand Down Expand Up @@ -87,8 +88,14 @@ class ObservableQuery {
@protected
QueryScheduler get scheduler => queryManager.scheduler;

final Set<StreamSubscription<QueryResult>> _onDataSubscriptions =
<StreamSubscription<QueryResult>>{};
/// callbacks registered with [onData]
List<OnData> _onDataCallbacks = [];

/// call [queryManager.maybeRebroadcastQueries] after all other [_onDataCallbacks]
///
/// Automatically appended as an [OnData]
void _maybeRebroadcast(QueryResult result) =>
queryManager.maybeRebroadcastQueries(exclude: this);

/// The most recently seen result from this operation's stream
QueryResult latestResult;
Expand Down Expand Up @@ -177,7 +184,7 @@ class ObservableQuery {

// if onData callbacks have been registered,
// they are waited on by default
lifecycle = _onDataSubscriptions.isNotEmpty
lifecycle = _onDataCallbacks.isNotEmpty
? QueryLifecycle.sideEffectsPending
: QueryLifecycle.pending;

Expand Down Expand Up @@ -214,7 +221,7 @@ class ObservableQuery {
/// if it is set to `null`.
///
/// Called internally by the [QueryManager]
void addResult(QueryResult result) {
void addResult(QueryResult result, {bool fromRebroadcast = false}) {
// don't overwrite results due to some async/optimism issue
if (latestResult != null &&
latestResult.timestamp.isAfter(result.timestamp)) {
Expand All @@ -231,9 +238,14 @@ class ObservableQuery {

latestResult = result;

// TODO should callbacks be applied before or after streaming
if (!controller.isClosed) {
controller.add(result);
}

if (result.isNotLoading) {
_applyCallbacks(result, fromRebroadcast: fromRebroadcast);
}
}

// most mutation behavior happens here
Expand All @@ -245,31 +257,44 @@ class ObservableQuery {
/// handling the resolution of [lifecycle] from
/// [QueryLifecycle.sideEffectsBlocking] to [QueryLifecycle.completed]
/// as appropriate
void onData(Iterable<OnData> callbacks) {
callbacks ??= const <OnData>[];
StreamSubscription<QueryResult> subscription;

subscription = stream.where((result) => result.isNotLoading).listen(
(QueryResult result) async {
for (final callback in callbacks) {
await callback(result);
}
void onData(Iterable<OnData> callbacks) =>
_onDataCallbacks.addAll(callbacks ?? []);

if (result.isConcrete) {
await subscription.cancel();
_onDataSubscriptions.remove(subscription);
/// Applies [onData] callbacks at the end of [addResult]
///
/// [fromRebroadcast] is used to avoid the super-edge case of infinite rebroadcasts
/// (not sure if it's even possible)
void _applyCallbacks(QueryResult result,
{bool fromRebroadcast = false}) async {
final callbacks = [
..._onDataCallbacks,
if (!fromRebroadcast) _maybeRebroadcast
];
for (final callback in callbacks) {
await callback(result);
}

if (_onDataSubscriptions.isEmpty) {
if (lifecycle == QueryLifecycle.sideEffectsBlocking) {
lifecycle = QueryLifecycle.completed;
close();
}
}
}
},
);
if (this == null || lifecycle == QueryLifecycle.closed) {
// .close(force: true) was called
return;
}

if (result.isConcrete) {
// avoid removing new callbacks
_onDataCallbacks.removeWhere((cb) => callbacks.contains(cb));

_onDataSubscriptions.add(subscription);
// if there are new callbacks, there is maybe another inflight mutation
if (_onDataCallbacks.isEmpty) {
if (lifecycle == QueryLifecycle.sideEffectsBlocking) {
lifecycle = QueryLifecycle.completed;
close();
}
if (lifecycle == QueryLifecycle.sideEffectsPending) {
lifecycle = QueryLifecycle.completed;
close();
}
}
}
}

/// Poll the server periodically for results.
Expand Down Expand Up @@ -333,10 +358,6 @@ class ObservableQuery {
queryManager.closeQuery(this, fromQuery: true);
}

for (StreamSubscription<QueryResult> subscription in _onDataSubscriptions) {
await subscription.cancel();
}

stopPolling();

await controller.close();
Expand Down
16 changes: 9 additions & 7 deletions packages/graphql/lib/src/core/query_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,15 @@ class QueryManager {
Response response;
QueryResult queryResult;

final writeToCache = options.fetchPolicy != FetchPolicy.noCache;

try {
// execute the request through the provided link(s)
response = await link.request(request).first;

// save the data from response to the cache
if (response.data != null && options.fetchPolicy != FetchPolicy.noCache) {
cache.writeQuery(request, data: response.data);
if (response.data != null && writeToCache) {
await cache.writeQuery(request, data: response.data);
}

queryResult = mapFetchResultToQueryResult(
Expand All @@ -188,7 +190,7 @@ class QueryManager {
// cleanup optimistic results
cache.removeOptimisticPatch(queryId);

if (options.fetchPolicy != FetchPolicy.noCache) {
if (writeToCache) {
// normalize results if previously written
queryResult.data = cache.readQuery(request);
}
Expand Down Expand Up @@ -275,7 +277,7 @@ class QueryManager {
}

/// Add a result to the [ObservableQuery] specified by `queryId`, if it exists
/// Will [maybeRebroadcastQueries] if the cache has flagged the need to
/// Will [maybeRebroadcastQueries] from [addResult] if the cache has flagged the need to
///
/// Queries are registered via [setQuery] and [watchQuery]
void addQueryResult(
Expand All @@ -296,8 +298,6 @@ class QueryManager {
if (observableQuery != null && !observableQuery.controller.isClosed) {
observableQuery.addResult(queryResult);
}

maybeRebroadcastQueries(exclude: observableQuery);
}

/// Create an optimstic result for the query specified by `queryId`, if it exists
Expand Down Expand Up @@ -330,9 +330,11 @@ class QueryManager {
/// If there are multiple in-flight cache updates, we wait until they all complete
bool maybeRebroadcastQueries({ObservableQuery exclude}) {
final shouldBroadast = cache.shouldBroadcast(claimExecution: true);

if (!shouldBroadast) {
return false;
}

for (ObservableQuery query in queries.values) {
if (query != exclude && query.isRebroadcastSafe) {
final dynamic cachedData = cache.readQuery(
Expand All @@ -344,9 +346,9 @@ class QueryManager {
mapFetchResultToQueryResult(
Response(data: cachedData),
query.options,
// TODO maybe entirely wrong
source: QueryResultSource.cache,
),
fromRebroadcast: true,
);
}
}
Expand Down
3 changes: 3 additions & 0 deletions packages/graphql/lib/src/exceptions/exceptions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import 'package:graphql/src/exceptions/network.dart'
if (dart.library.io) 'package:graphql/src/exceptions/network_io.dart'
as network;

export 'package:graphql/src/exceptions/network.dart'
if (dart.library.io) 'package:graphql/src/exceptions/network_io.dart';

LinkException translateFailure(dynamic failure) {
if (failure is LinkException) {
return failure;
Expand Down
82 changes: 81 additions & 1 deletion packages/graphql/lib/src/graphql_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ import 'package:graphql/src/cache/cache.dart';

import 'package:graphql/src/core/fetch_more.dart';

/// Universal GraphQL Client with configurable caching and [link][] system.
/// modelled after the [`apollo-client`][ac].
///
/// The link is a [Link] over which GraphQL documents will be resolved into a [Response].
/// The cache is the initial [Cache] to use in the data store.
/// The cache is the [GraphQLCache] to use for caching results and optimistic updates.
///
///
/// [ac]: https://www.apollographql.com/docs/react/v3.0-beta/api/core/ApolloClient/
/// [link]: https://github.com/gql-dart/gql/tree/master/links/gql_link
class GraphQLClient {
/// Constructs a [GraphQLClient] given a [Link] and a [Cache].
GraphQLClient({
Expand Down Expand Up @@ -84,6 +91,44 @@ class GraphQLClient {

/// This resolves a single query according to the [QueryOptions] specified and
/// returns a [Future] which resolves with the [QueryResult] or throws an [Exception].
///
/// {@tool snippet}
/// Basic usage
///
/// ```dart
/// final QueryResult result = await client.query(
/// QueryOptions(
/// document: gql(
/// r'''
/// query ReadRepositories($nRepositories: Int!) {
/// viewer {
/// repositories(last: $nRepositories) {
/// nodes {
/// __typename
/// id
/// name
/// viewerHasStarred
/// }
/// }
/// }
/// }
/// ''',
/// ),
/// variables: {
/// 'nRepositories': 50,
/// },
/// ),
/// );
///
/// if (result.hasException) {
/// print(result.exception.toString());
/// }
///
/// final List<dynamic> repositories =
/// result.data['viewer']['repositories']['nodes'] as List<dynamic>;
/// ```
/// {@end-tool}
Future<QueryResult> query(QueryOptions options) {
options.policies = defaultPolicies.query.withOverrides(options.policies);
return queryManager.query(options);
Expand All @@ -98,6 +143,40 @@ class GraphQLClient {

/// This subscribes to a GraphQL subscription according to the options specified and returns a
/// [Stream] which either emits received data or an error.
///
/// {@tool snippet}
/// Basic usage
///
/// ```dart
/// subscription = client.subscribe(
/// SubscriptionOptions(
/// document: gql(
/// r'''
/// subscription reviewAdded {
/// reviewAdded {
/// stars, commentary, episode
/// }
/// }
/// ''',
/// ),
/// ),
/// );
///
/// subscription.listen((result) {
/// if (result.hasException) {
/// print(result.exception.toString());
/// return;
/// }
///
/// if (result.isLoading) {
/// print('awaiting results');
/// return;
/// }
///
/// print('Rew Review: ${result.data}');
/// });
/// ```
/// {@end-tool}
Stream<QueryResult> subscribe(SubscriptionOptions options) {
options.policies = defaultPolicies.subscribe.withOverrides(
options.policies,
Expand All @@ -107,6 +186,7 @@ class GraphQLClient {

/// Fetch more results and then merge them with the given [previousResult]
/// according to [FetchMoreOptions.updateQuery].
@experimental
Future<QueryResult> fetchMore(
FetchMoreOptions fetchMoreOptions, {
@required QueryOptions originalOptions,
Expand Down
Loading

0 comments on commit 9a5fff1

Please sign in to comment.