Skip to content

Commit

Permalink
Change event buffering to work more like the observe package
Browse files Browse the repository at this point in the history
Previously, events were buffered per GeneratedMessage call.
Instead, deliver events in a microtask by default, and
provide a deliverChanges() method to deliver them earlier.

BUG=
[email protected]

Review URL: https://chromiumcodereview.appspot.com//1260473007.
  • Loading branch information
Brian Slesinsky committed Jul 30, 2015
1 parent 59c42de commit caa07c8
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 157 deletions.
2 changes: 1 addition & 1 deletion lib/mixins_meta.dart
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ const _pbMapMixin = const PbMixin._raw("PbMapMixin",

const _pbEventMixin = const PbMixin._raw("PbEventMixin",
importFrom: "package:protobuf/src/protobuf/mixins/event_mixin.dart",
reservedNames: const ["changes"]);
reservedNames: const ["changes", "deliverChanges"]);

const List<String> _reservedNamesForMap = const [
'[]',
Expand Down
35 changes: 6 additions & 29 deletions lib/src/protobuf/event_plugin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,19 @@ part of protobuf;
/// property. The intent is provide mechanism, not policy; each mixin defines
/// its own public API, perhaps using streams.
///
/// Plugins will probably want to delay event delivery until the end of a
/// GeneratedMessage method call. For example, we probably don't want event
/// receivers to see a GeneratedMessage that's only half-built during a merge.
/// To support this, GeneratedMessage uses [startGroup] and [endGroup]
/// to indicate the start and end of the events in a method call.
/// This is a low-level, synchronous API. Event handlers are called in the
/// middle of protobuf changes. To avoid exposing half-finished changes
/// to user code, plugins should buffer events and send them asynchronously.
/// (See event_mixin.dart for an example.)
abstract class EventPlugin {

/// Initializes the plugin.
///
/// GeneratedMessage calls this once in its constructors.
void attach(GeneratedMessage parent);

/// Starts a group of events to be buffered and sent at once.
///
/// The plugin should return true if it's observing events.
/// If it does, GeneratedMessage will call [beforeSetField] or
/// [beforeClearField] for each event and [endGroup] when finished.
///
/// Otherwise, no more methods will be called for this event group,
/// except that startGroup will be called again for each nested group.
///
/// (The [group] should be used only for assertions and debugging.)
bool startGroup(EventGroup group);

/// Ends a group of events to be sent at once.
/// The [group] will match the preceding [startGroup] call.
void endGroup(EventGroup group);
/// If false, GeneratedMessage will skip calls to event handlers.
bool get hasObservers;

/// Called before setting a field if startGroup returned true.
///
Expand All @@ -44,12 +30,3 @@ abstract class EventPlugin {
/// Called before clearing a field if startGroup returned true.
void beforeClearField(int tag);
}

enum EventGroup {
clear,
binaryMerge,
jsonMerge,
messageMerge,
setField,
clearField
}
78 changes: 12 additions & 66 deletions lib/src/protobuf/generated_message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,8 @@ abstract class GeneratedMessage {
/// to protobuf fields.
EventPlugin get eventPlugin => null;

/// Tells the EventPlugin that we're starting a mutation.
///
/// Returns true if the eventPlugin is observing us.
bool _startEvents(EventGroup group) =>
eventPlugin != null && eventPlugin.startGroup(group);

/// Tells the EventPlugin that we're done with a mutation.
/// This should be called if (and only if) _startEvents returned true.
void _endEvents(EventGroup group) => eventPlugin.endGroup(group);
/// Returns true if we should send events to the plugin.
bool get _hasObservers => eventPlugin != null && eventPlugin.hasObservers;

bool hasRequiredFields() => info_.hasRequiredFields;

Expand All @@ -287,19 +280,12 @@ abstract class GeneratedMessage {
void clear() {
unknownFields.clear();

bool observed = _startEvents(EventGroup.clear);
try {
if (observed) {
// Add an event for each cleared field.
// (They will be buffered until endGroup.)
for (int key in _fieldValues.keys) {
eventPlugin.beforeClearField(key);
}
if (_hasObservers) {
for (int key in _fieldValues.keys) {
eventPlugin.beforeClearField(key);
}
_fieldValues.clear();
} finally {
if (observed) _endEvents(EventGroup.clear);
}
_fieldValues.clear();
}

// TODO(antonm): move to getters.
Expand Down Expand Up @@ -432,17 +418,6 @@ abstract class GeneratedMessage {
CodedBufferReader input,
[ExtensionRegistry extensionRegistry = ExtensionRegistry.EMPTY]) {

bool observed = _startEvents(EventGroup.binaryMerge);
try {
_mergeFromCodedBufferReader(input, extensionRegistry, observed);
} finally {
if (observed) _endEvents(EventGroup.binaryMerge);
}
}

void _mergeFromCodedBufferReader(CodedBufferReader input,
ExtensionRegistry extensionRegistry, bool observed) {

void appendToRepeated(tagNumber, value) {
List list = getField(tagNumber);
list.add(value);
Expand Down Expand Up @@ -770,17 +745,7 @@ abstract class GeneratedMessage {
void _mergeFromJson(
Map<String, dynamic> json,
ExtensionRegistry extensionRegistry) {
bool observed = _startEvents(EventGroup.jsonMerge);
try {
__mergeFromJson(json, extensionRegistry);
} finally {
if (observed) _endEvents(EventGroup.jsonMerge);
}
}

void __mergeFromJson(
Map<String, dynamic> json,
ExtensionRegistry extensionRegistry) {
// Extract a value from its JSON representation.

for (int tagNumber in sorted(json.keys.map(int.parse))) {
Expand Down Expand Up @@ -969,15 +934,10 @@ abstract class GeneratedMessage {

/// Clears the contents of a given field.
void clearField(int tagNumber) {
bool observed = _startEvents(EventGroup.clearField);
try {
if (observed) {
eventPlugin.beforeClearField(tagNumber);
}
_fieldValues.remove(tagNumber);
} finally {
if (observed) _endEvents(EventGroup.clearField);
if (_hasObservers) {
eventPlugin.beforeClearField(tagNumber);
}
_fieldValues.remove(tagNumber);
}

bool extensionsAreInitialized() {
Expand Down Expand Up @@ -1077,15 +1037,6 @@ abstract class GeneratedMessage {
/// in this message. Repeated fields are appended. Singular sub-messages are
/// recursively merged.
void mergeFromMessage(GeneratedMessage other) {
bool observed = _startEvents(EventGroup.messageMerge);
try {
_mergeFromMessage(other);
} finally {
if (observed) _endEvents(EventGroup.messageMerge);
}
}

void _mergeFromMessage(other) {
for (int tagNumber in other._fieldValues.keys) {
var fieldValue = other._fieldValues[tagNumber];

Expand Down Expand Up @@ -1147,15 +1098,10 @@ abstract class GeneratedMessage {
}

void _setField(int tagNumber, value) {
bool observed = _startEvents(EventGroup.setField);
try {
if (observed) {
eventPlugin.beforeSetField(tagNumber, value);
}
_fieldValues[tagNumber] = value;
} finally {
if (observed) _endEvents(EventGroup.setField);
if (_hasObservers) {
eventPlugin.beforeSetField(tagNumber, value);
}
_fieldValues[tagNumber] = value;
}

void _addExtensionToMap(Extension extension) {
Expand Down
88 changes: 28 additions & 60 deletions lib/src/protobuf/mixins/event_mixin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

library protobuf.mixins.event;

import "dart:async" show Stream, StreamController;
import "dart:async" show Stream, StreamController, scheduleMicrotask;
import "dart:collection" show UnmodifiableListView;

import "package:protobuf/protobuf.dart"
show GeneratedMessage, EventPlugin, EventGroup, ListEventPlugin;
show GeneratedMessage, EventPlugin, ListEventPlugin;

/// Provides a stream of changes to fields in a GeneratedMessage.
/// (Experimental.)
Expand All @@ -18,11 +18,17 @@ import "package:protobuf/protobuf.dart"
abstract class PbEventMixin {
final eventPlugin = new EventBuffer();

/// Changes to fields in the GeneratedMessage.
/// A stream of changes to fields in the GeneratedMessage.
///
/// Each item in the stream is a group of related changes made
/// by one GeneratedMessage method call.
/// Events are buffered and delivered via a microtask or in
/// the next call to [deliverChanges], whichever happens first.
Stream<List<PbFieldChange>> get changes => eventPlugin.changes;

/// Delivers buffered field change events synchronously,
/// instead of waiting for the microtask to run.
///
/// Returns false if no events were queued.
bool deliverChanges() => eventPlugin.deliverChanges();
}

/// A change to a field in a GeneratedMessage.
Expand All @@ -44,93 +50,55 @@ class EventBuffer extends EventPlugin {
GeneratedMessage _parent;
StreamController<List<PbFieldChange>> _controller;

// _buffer is non-null when observing field changes.
// It should be non-null only inside a group.
// If _buffer is non-null, at least one event is in the buffer
// and a microtask has been scheduled to empty it.
List<PbFieldChange> _buffer;

// Non-null if we're in a group.
EventGroup _outerGroup;

// Non-empty if we're in a nested group.
List<EventGroup> _groupStack;

@override
void attach(GeneratedMessage newParent) {
assert(_parent == null);
_parent = newParent;
}

// Returns the currently active event groups (for debugging).
List<String> get groupStack {
if (_outerGroup == null) return [];
var result = [_outerGroup];
if (_groupStack != null) result.addAll(_groupStack);
return result;
}

Stream<List<PbFieldChange>> get changes {
if (_controller == null) {
_controller = new StreamController.broadcast(sync: true);
}
return _controller.stream;
}

@override
bool startGroup(EventGroup group) {
assert(group != null);
if (_outerGroup == null) {
if (_controller == null || !_controller.hasListener) {
// skip events for this group (don't enter group)
return false;
}
_outerGroup = group;
_buffer = <PbFieldChange>[];
return true;
} else {
assert(_buffer != null);

// enter nested group
if (_groupStack == null) _groupStack = <EventGroup>[];
_groupStack.add(group);
return true;
}
}
bool get hasObservers => _controller != null && _controller.hasListener;

@override
void endGroup(EventGroup group) {
if (_groupStack != null && _groupStack.isNotEmpty) {
// exit nested group
var startGroup = _groupStack.removeLast();
assert(group == startGroup);
return;
void deliverChanges() {
var records = _buffer;
_buffer = null;
if (records != null && hasObservers) {
_controller.add(new UnmodifiableListView<PbFieldChange>(records));
}
}

assert(_outerGroup == group);
assert(_buffer != null);

// exit outer group
_outerGroup = null;

// send any events
if (_controller != null && _controller.hasListener && _buffer.isNotEmpty) {
_controller.add(new UnmodifiableListView(_buffer));
void addEvent(PbFieldChange change) {
if (!hasObservers) return;
if (_buffer == null) {
_buffer = <PbFieldChange>[];
scheduleMicrotask(deliverChanges);
}
_buffer = null;
_buffer.add(change);
}

@override
void beforeSetField(int tag, newValue) {
var oldValue = _parent.getFieldOrNull(tag);
if (oldValue == null) oldValue = _parent.getDefaultForField(tag);
if (identical(oldValue, newValue)) return;
_buffer.add(new PbFieldChange(_parent, tag, oldValue, newValue));
addEvent(new PbFieldChange(_parent, tag, oldValue, newValue));
}

@override
void beforeClearField(int tag) {
var oldValue = _parent.getFieldOrNull(tag);
if (oldValue == null) return;
var newValue = _parent.getDefaultForField(tag);
_buffer.add(new PbFieldChange(_parent, tag, oldValue, newValue));
addEvent(new PbFieldChange(_parent, tag, oldValue, newValue));
}
}
Loading

0 comments on commit caa07c8

Please sign in to comment.