Skip to content

Commit

Permalink
Also maintain the zone on the ChannelBuffers.push callback (flutter#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
Hixie authored Nov 17, 2020
1 parent f3cc39a commit 1a5ec52
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 37 deletions.
26 changes: 16 additions & 10 deletions lib/ui/channel_buffers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ typedef ChannelCallback = void Function(ByteData? data, PlatformMessageResponseC
///
/// This tracks (and applies) the [Zone].
class _ChannelCallbackRecord {
_ChannelCallbackRecord(this.callback) : zone = Zone.current;
final ChannelCallback callback;
final Zone zone;
_ChannelCallbackRecord(this._callback) : _zone = Zone.current;
final ChannelCallback _callback;
final Zone _zone;

/// Call [callback] in [zone], using the given arguments.
void invoke(ByteData? dataArg, PlatformMessageResponseCallback callbackArg) {
_invoke2<ByteData?, PlatformMessageResponseCallback>(callback, zone, dataArg, callbackArg);
_invoke2<ByteData?, PlatformMessageResponseCallback>(_callback, _zone, dataArg, callbackArg);
}
}

Expand All @@ -52,13 +52,19 @@ class _StoredMessage {
/// payload of the message and a [PlatformMessageResponseCallback]
/// that represents the callback that will be called when the message
/// is handled.
const _StoredMessage(this.data, this.callback);
_StoredMessage(this.data, this._callback) : _zone = Zone.current;

/// Representation of the message's payload.
final ByteData? data;

/// Callback to be used when replying to the message.
final PlatformMessageResponseCallback callback;
final PlatformMessageResponseCallback _callback;

final Zone _zone;

void invoke(ByteData? dataArg) {
_invoke1(_callback, _zone, dataArg);
}
}

/// The internal storage for a platform channel.
Expand Down Expand Up @@ -123,7 +129,7 @@ class _Channel {
bool push(_StoredMessage message) {
if (!_draining && _channelCallbackRecord != null) {
assert(_queue.isEmpty);
_channelCallbackRecord!.invoke(message.data, message.callback);
_channelCallbackRecord!.invoke(message.data, message.invoke);
return false;
}
if (_capacity <= 0) {
Expand Down Expand Up @@ -151,7 +157,7 @@ class _Channel {
bool result = false;
while (_queue.length > lengthLimit) {
final _StoredMessage message = _queue.removeFirst();
message.callback(null); // send empty reply to the plugin side
message.invoke(null); // send empty reply to the plugin side
result = true;
}
return result;
Expand Down Expand Up @@ -215,7 +221,7 @@ class _Channel {
assert(_draining);
if (_queue.isNotEmpty && _channelCallbackRecord != null) {
final _StoredMessage message = pop();
_channelCallbackRecord!.invoke(message.data, message.callback);
_channelCallbackRecord!.invoke(message.data, message.invoke);
scheduleMicrotask(_drainStep);
} else {
_draining = false;
Expand Down Expand Up @@ -384,7 +390,7 @@ class ChannelBuffers {
final _Channel? channel = _channels[name];
while (channel != null && !channel._queue.isEmpty) {
final _StoredMessage message = channel.pop();
await callback(message.data, message.callback);
await callback(message.data, message.invoke);
}
}

Expand Down
28 changes: 17 additions & 11 deletions lib/web_ui/lib/src/ui/channel_buffers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// following exceptions:
//
// * All comments except this one are removed.
// * _invoke2 is replaced with engine.invoke2
// * _invokeX is replaced with engine.invokeX (X=1,2)
// * _printDebug is replaced with print in an assert.

part of ui;
Expand All @@ -18,21 +18,27 @@ typedef DrainChannelCallback = Future<void> Function(ByteData? data, PlatformMes
typedef ChannelCallback = void Function(ByteData? data, PlatformMessageResponseCallback callback);

class _ChannelCallbackRecord {
_ChannelCallbackRecord(this.callback) : zone = Zone.current;
final ChannelCallback callback;
final Zone zone;
_ChannelCallbackRecord(this._callback) : _zone = Zone.current;
final ChannelCallback _callback;
final Zone _zone;

void invoke(ByteData? dataArg, PlatformMessageResponseCallback callbackArg) {
engine.invoke2<ByteData?, PlatformMessageResponseCallback>(callback, zone, dataArg, callbackArg);
engine.invoke2<ByteData?, PlatformMessageResponseCallback>(_callback, _zone, dataArg, callbackArg);
}
}

class _StoredMessage {
const _StoredMessage(this.data, this.callback);
_StoredMessage(this.data, this._callback) : _zone = Zone.current;

final ByteData? data;

final PlatformMessageResponseCallback callback;
final PlatformMessageResponseCallback _callback;

final Zone _zone;

void invoke(ByteData? dataArg) {
engine.invoke1(_callback, _zone, dataArg);
}
}

class _Channel {
Expand All @@ -57,7 +63,7 @@ class _Channel {
bool push(_StoredMessage message) {
if (!_draining && _channelCallbackRecord != null) {
assert(_queue.isEmpty);
_channelCallbackRecord!.invoke(message.data, message.callback);
_channelCallbackRecord!.invoke(message.data, message.invoke);
return false;
}
if (_capacity <= 0) {
Expand All @@ -74,7 +80,7 @@ class _Channel {
bool result = false;
while (_queue.length > lengthLimit) {
final _StoredMessage message = _queue.removeFirst();
message.callback(null); // send empty reply to the plugin side
message.invoke(null); // send empty reply to the plugin side
result = true;
}
return result;
Expand Down Expand Up @@ -103,7 +109,7 @@ class _Channel {
assert(_draining);
if (_queue.isNotEmpty && _channelCallbackRecord != null) {
final _StoredMessage message = pop();
_channelCallbackRecord!.invoke(message.data, message.callback);
_channelCallbackRecord!.invoke(message.data, message.invoke);
scheduleMicrotask(_drainStep);
} else {
_draining = false;
Expand Down Expand Up @@ -152,7 +158,7 @@ class ChannelBuffers {
final _Channel? channel = _channels[name];
while (channel != null && !channel._queue.isEmpty) {
final _StoredMessage message = channel.pop();
await callback(message.data, message.callback);
await callback(message.data, message.invoke);
}
}

Expand Down
44 changes: 36 additions & 8 deletions lib/web_ui/test/channel_buffers_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@ void testMain() {
const String channel = 'foo';
final ByteData data = _makeByteData('bar');
final ui.ChannelBuffers buffers = ui.ChannelBuffers();
final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {};
bool called = false;
final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {
called = true;
};
buffers.push(channel, data, callback);
await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) {
expect(drainedData, equals(data));
expect(drainedCallback, equals(callback));
assert(!called);
drainedCallback(drainedData);
assert(called);
return;
});
});
Expand Down Expand Up @@ -119,15 +124,12 @@ void testMain() {
switch (counter) {
case 0:
expect(drainedData, equals(two));
expect(drainedCallback, equals(callback));
break;
case 1:
expect(drainedData, equals(three));
expect(drainedCallback, equals(callback));
break;
case 2:
expect(drainedData, equals(four));
expect(drainedCallback, equals(callback));
break;
}
counter += 1;
Expand All @@ -151,7 +153,6 @@ void testMain() {
switch (counter) {
case 0:
expect(drainedData, equals(two));
expect(drainedCallback, equals(callback));
}
counter += 1;
return;
Expand Down Expand Up @@ -325,7 +326,7 @@ void testMain() {
final ui.ChannelBuffers buffers = _TestChannelBuffers(log);
// Created as follows:
// print(StandardMethodCodec().encodeMethodCall(MethodCall('resize', ['abcdef', 12345])).buffer.asUint8List());
// ...with three 0xFF bytes on either side to ensure the method works with an offer on the underlying buffer.
// ...with three 0xFF bytes on either side to ensure the method works with an offset on the underlying buffer.
buffers.handleMessage(ByteData.sublistView(Uint8List.fromList(<int>[255, 255, 255, 7, 6, 114, 101, 115, 105, 122, 101, 12, 2, 7, 6, 97, 98, 99, 100, 101, 102, 3, 57, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255]), 3, 27));
expect(log, const <String>['resize abcdef 12345']);
});
Expand All @@ -335,10 +336,37 @@ void testMain() {
final ui.ChannelBuffers buffers = _TestChannelBuffers(log);
// Created as follows:
// print(StandardMethodCodec().encodeMethodCall(MethodCall('overflow', ['abcdef', false])).buffer.asUint8List());
// ...with three 0xFF bytes on either side to ensure the method works with an offer on the underlying buffer.
// ...with three 0xFF bytes on either side to ensure the method works with an offset on the underlying buffer.
buffers.handleMessage(ByteData.sublistView(Uint8List.fromList(<int>[255, 255, 255, 7, 8, 111, 118, 101, 114, 102, 108, 111, 119, 12, 2, 7, 6, 97, 98, 99, 100, 101, 102, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255]), 3, 24));
expect(log, const <String>['allowOverflow abcdef false']);
});

test('ChannelBuffers uses the right zones', () async {
final List<String> log = <String>[];
final ui.ChannelBuffers buffers = ui.ChannelBuffers();
final Zone zone1 = Zone.current.fork();
final Zone zone2 = Zone.current.fork();
zone1.run(() {
log.add('first zone run: ${Zone.current == zone1}');
buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) {
log.add('callback1: ${Zone.current == zone1}');
callback(data);
});
});
zone2.run(() {
log.add('second zone run: ${Zone.current == zone2}');
buffers.push('a', ByteData.sublistView(Uint8List.fromList(<int>[]), 0, 0), (ByteData data) {
log.add('callback2: ${Zone.current == zone2}');
});
});
await null;
expect(log, <String>[
'first zone run: true',
'second zone run: true',
'callback1: true',
'callback2: true',
]);
});
}

class _TestChannelBuffers extends ui.ChannelBuffers {
Expand Down
44 changes: 36 additions & 8 deletions testing/dart/channel_buffers_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ void main() {
const String channel = 'foo';
final ByteData data = _makeByteData('bar');
final ui.ChannelBuffers buffers = ui.ChannelBuffers();
final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {};
bool called = false;
final ui.PlatformMessageResponseCallback callback = (ByteData responseData) {
called = true;
};
buffers.push(channel, data, callback);
await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) {
expect(drainedData, equals(data));
expect(drainedCallback, equals(callback));
assert(!called);
drainedCallback(drainedData);
assert(called);
return;
});
});
Expand Down Expand Up @@ -108,15 +113,12 @@ void main() {
switch (counter) {
case 0:
expect(drainedData, equals(two));
expect(drainedCallback, equals(callback));
break;
case 1:
expect(drainedData, equals(three));
expect(drainedCallback, equals(callback));
break;
case 2:
expect(drainedData, equals(four));
expect(drainedCallback, equals(callback));
break;
}
counter += 1;
Expand All @@ -140,7 +142,6 @@ void main() {
switch (counter) {
case 0:
expect(drainedData, equals(two));
expect(drainedCallback, equals(callback));
}
counter += 1;
return;
Expand Down Expand Up @@ -314,7 +315,7 @@ void main() {
final ui.ChannelBuffers buffers = _TestChannelBuffers(log);
// Created as follows:
// print(StandardMethodCodec().encodeMethodCall(MethodCall('resize', ['abcdef', 12345])).buffer.asUint8List());
// ...with three 0xFF bytes on either side to ensure the method works with an offer on the underlying buffer.
// ...with three 0xFF bytes on either side to ensure the method works with an offset on the underlying buffer.
buffers.handleMessage(ByteData.sublistView(Uint8List.fromList(<int>[255, 255, 255, 7, 6, 114, 101, 115, 105, 122, 101, 12, 2, 7, 6, 97, 98, 99, 100, 101, 102, 3, 57, 48, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255]), 3, 27));
expect(log, const <String>['resize abcdef 12345']);
});
Expand All @@ -324,10 +325,37 @@ void main() {
final ui.ChannelBuffers buffers = _TestChannelBuffers(log);
// Created as follows:
// print(StandardMethodCodec().encodeMethodCall(MethodCall('overflow', ['abcdef', false])).buffer.asUint8List());
// ...with three 0xFF bytes on either side to ensure the method works with an offer on the underlying buffer.
// ...with three 0xFF bytes on either side to ensure the method works with an offset on the underlying buffer.
buffers.handleMessage(ByteData.sublistView(Uint8List.fromList(<int>[255, 255, 255, 7, 8, 111, 118, 101, 114, 102, 108, 111, 119, 12, 2, 7, 6, 97, 98, 99, 100, 101, 102, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255]), 3, 24));
expect(log, const <String>['allowOverflow abcdef false']);
});

test('ChannelBuffers uses the right zones', () async {
final List<String> log = <String>[];
final ui.ChannelBuffers buffers = ui.ChannelBuffers();
final Zone zone1 = Zone.current.fork();
final Zone zone2 = Zone.current.fork();
zone1.run(() {
log.add('first zone run: ${Zone.current == zone1}');
buffers.setListener('a', (ByteData data, ui.PlatformMessageResponseCallback callback) {
log.add('callback1: ${Zone.current == zone1}');
callback(data);
});
});
zone2.run(() {
log.add('second zone run: ${Zone.current == zone2}');
buffers.push('a', ByteData.sublistView(Uint8List.fromList(<int>[]), 0, 0), (ByteData data) {
log.add('callback2: ${Zone.current == zone2}');
});
});
await null;
expect(log, <String>[
'first zone run: true',
'second zone run: true',
'callback1: true',
'callback2: true',
]);
});
}

class _TestChannelBuffers extends ui.ChannelBuffers {
Expand Down

0 comments on commit 1a5ec52

Please sign in to comment.