Skip to content

Commit

Permalink
DOM: Implement the flatMap() Observable operator
Browse files Browse the repository at this point in the history
This CL implements the `flatMap()` Observable operator. See
https://wicg.github.io/observable/#dom-observable-flatmap.

To accomplish this, we introduce a class hierarchy that looks like:

```
// The delegate that the Observable returned from flatMap() uses to
// create subscriptions.
class OperatorFlatMapSubscribeDelegate : Observable::SubscribeDelegate {

  // One of these is created for each subscription created via the
  // delegate above. For example:
  //
  // const mapped = source.flatMap(...);
  // const a = mapped.subscribe(...);
  // const b = mapped.subscribe(...);
  //
  // A `SourceInternalObserver` gets created for each of `a` and `b`.
  class SourceInternalObserver : ObservableInternalObserver {

    // For each value that the source Observable emits, we derive a new
    // "inner" Observable from it and subscribe to it for the rest of
    // its lifetime. This class below is the internal Observer for each
    // of these.
    class InnerFlatMapObserver : ObservableInternalObserver {

    };
  };
};
```

For WPTs:
Co-authored-by: [email protected]

Bug: 40282760
Change-Id: I281ed693908f4b7c3a6d5f1ce8a70f4c5187018b
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5381640
Reviewed-by: Mason Freed <[email protected]>
Commit-Queue: Dominic Farolino <[email protected]>
Cr-Commit-Position: refs/heads/main@{#1279959}
  • Loading branch information
domfarolino authored and Chromium LUCI CQ committed Mar 28, 2024
1 parent 787cec1 commit 60af8e8
Show file tree
Hide file tree
Showing 8 changed files with 579 additions and 0 deletions.
254 changes: 254 additions & 0 deletions third_party/blink/renderer/core/dom/observable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,250 @@ class OperatorFromPromiseSubscribeDelegate final
ScriptPromiseUntyped promise_;
};

// This class is the subscriber delegate for Observables returned by
// `flatMap()`. Flat map is a tricky operator, so here's how the flow works.
// Upon subscription, `this` subscribes to the "source" Observable, that had its
// `flatMap()` method called. All values that the source Observable emits, get
// piped to its subscription's internal observer, which is
// `OperatorFlatMapSubscribeDelegate::SourceInternalObserver`. It is that class
// that is responsible for mapping each of the individual source Observable, via
// `mapper`, to an Observable (that we call the "inner" Observable), which then
// gets subscribed to. Through the remainder the "inner" Observable's lifetime,
// its values are exclusively piped to the "outer" Subscriber — this allows the
// IDL `Observer` handlers associated with the Observable returned from
// `flatMap()` to observe the inner Observable's values.
//
// Once the inner Observable completes, the focus is transferred to the *next*
// value that the outer Observable has emitted, if one such exists. That value
// too gets mapped and converted to an Observable, and subscribed to, and so on.
// See also, the documentation above
// `OperatorFlatMapSubscribeDelegate::SourceInternalObserver::InnerFlatMapObserver`.
class OperatorFlatMapSubscribeDelegate final
: public Observable::SubscribeDelegate {
public:
OperatorFlatMapSubscribeDelegate(Observable* source_observable,
V8Mapper* mapper,
const ExceptionContext& exception_context)
: source_observable_(source_observable),
mapper_(mapper),
exception_context_(exception_context) {}
void OnSubscribe(Subscriber* subscriber, ScriptState* script_state) override {
SubscribeOptions* options = MakeGarbageCollected<SubscribeOptions>();
options->setSignal(subscriber->signal());

source_observable_->SubscribeWithNativeObserver(
script_state,
MakeGarbageCollected<SourceInternalObserver>(
subscriber, script_state, mapper_, exception_context_),
options);
}

void Trace(Visitor* visitor) const override {
visitor->Trace(source_observable_);
visitor->Trace(mapper_);

Observable::SubscribeDelegate::Trace(visitor);
}

private:
class SourceInternalObserver final : public ObservableInternalObserver {
public:
SourceInternalObserver(Subscriber* outer_subscriber,
ScriptState* script_state,
V8Mapper* mapper,
const ExceptionContext& exception_context)
: outer_subscriber_(outer_subscriber),
script_state_(script_state),
mapper_(mapper),
exception_context_(exception_context) {
CHECK(outer_subscriber_);
CHECK(script_state_);
CHECK(mapper_);
}

void Next(ScriptValue value) override {
if (active_inner_subscription_) {
queue_.push_back(std::move(value));
return;
}

active_inner_subscription_ = true;

FlatMapProcessNextValueSteps(value);
}
void Error(ScriptState*, ScriptValue error) override {
outer_subscriber_->error(script_state_, error);
}
void Complete() override {
outer_subscription_has_completed_ = true;

if (!active_inner_subscription_ && queue_.empty()) {
outer_subscriber_->complete(script_state_);
}
}

void Trace(Visitor* visitor) const override {
visitor->Trace(outer_subscriber_);
visitor->Trace(script_state_);
visitor->Trace(mapper_);
visitor->Trace(queue_);

ObservableInternalObserver::Trace(visitor);
}

// Analogous to
// https://wicg.github.io/observable/#flatmap-process-next-value-steps.
//
// This method can be called re-entrantly. Imagine the following:
// 1. The source Observable emits a value that gets passed to this method
// (`value` below).
// 2. `this` derives an Observable from that value, and immediately
// subscribes to it.
// 3. Upon subscription, the Observable synchronously `complete()`s.
// 4. Upon completion, `InnerObservableCompleted()` gets called, which has
// to synchronously process the next value in `queue_`, restarting
// these steps from the top.
void FlatMapProcessNextValueSteps(ScriptValue value) {
// `ScriptState::Scope` can only be created in a valid context, so
// early-return if we're in a detached one.
if (!script_state_->ContextIsValid()) {
return;
}

ScriptState::Scope scope(script_state_);
v8::TryCatch try_catch(script_state_->GetIsolate());
v8::Maybe<ScriptValue> mapped_value =
mapper_->Invoke(nullptr, value, ++idx_);
if (try_catch.HasCaught()) {
outer_subscriber_->error(
script_state_,
ScriptValue(script_state_->GetIsolate(), try_catch.Exception()));
return;
}

// Since we handled the exception case above, `mapped_value` must not be
// `v8::Nothing`.
ExceptionState exception_state(script_state_->GetIsolate(),
exception_context_);
Observable* inner_observable = Observable::from(
script_state_, mapped_value.ToChecked(), exception_state);
if (exception_state.HadException()) {
outer_subscriber_->error(script_state_,
ScriptValue(script_state_->GetIsolate(),
exception_state.GetException()));
exception_state.ClearException();
return;
}

SubscribeOptions* options = MakeGarbageCollected<SubscribeOptions>();
options->setSignal(outer_subscriber_->signal());

inner_observable->SubscribeWithNativeObserver(
script_state_,
MakeGarbageCollected<InnerFlatMapObserver>(outer_subscriber_, this),
options);
}

// This method can be called re-entrantly. See the documentation above
// `FlatMapProcessNextValueSteps()`.
void InnerObservableCompleted() {
if (!queue_.empty()) {
ScriptValue value = queue_.front();
// This is inefficient! See the documentation above `queue_` for more.
queue_.EraseAt(0);
FlatMapProcessNextValueSteps(value);
return;
}

// When the `queue_` is empty and the last "inner" Observable has
// completed, we can finally complete `outer_subscriber_`.
active_inner_subscription_ = false;
if (outer_subscription_has_completed_) {
outer_subscriber_->complete(script_state_);
}
}

private:
// This is the internal observer that manages the subscription for each
// "inner" Observable, that is derived from each `any` value that the
// `V8Mapper` omits for each value that the source Observable. So the flow
// looks like this:
// 1. "source observable" emits `any` values, which get processed by
// `SourceInternalObserver::Next()`.
// 2. It then goes through
// `SourceInternalObserver::FlatMapProcessNextValueSteps()`, which
// calls `V8Mapper` on the `any` value, transforming it into an
// `Observable` (via `Observable::from()` semantics).
// 3. That `Observable` gets subscribed to, via this
// `InnerFlatMapObserver`. `InnerFlatMapObserver` subscribes to the
// given "inner" Observable, piping values/errors it omits to
// `outer_subscriber_`, and upon completion, letting calling back to
// `SourceInternalObserver` to let it know of the most recent "inner"
// subscription completion, so it can process any subsequent ones.
class InnerFlatMapObserver final : public ObservableInternalObserver {
public:
InnerFlatMapObserver(Subscriber* outer_subscriber,
SourceInternalObserver* source_observer)
: outer_subscriber_(outer_subscriber),
source_observer_(source_observer) {}

void Next(ScriptValue value) override { outer_subscriber_->next(value); }
void Error(ScriptState* script_state, ScriptValue value) override {
outer_subscriber_->error(script_state, value);
}
void Complete() override { source_observer_->InnerObservableCompleted(); }

void Trace(Visitor* visitor) const override {
visitor->Trace(source_observer_);
visitor->Trace(outer_subscriber_);

ObservableInternalObserver::Trace(visitor);
}

private:
Member<Subscriber> outer_subscriber_;
Member<SourceInternalObserver> source_observer_;
};

uint64_t idx_ = 0;
Member<Subscriber> outer_subscriber_;
Member<ScriptState> script_state_;
Member<V8Mapper> mapper_;
ExceptionContext exception_context_;

// This queue stores all of the values that the "outer" subscription emits
// while there is an active inner subscription (captured by the member below
// this). These values are queued and processed one-by-one; they each get
// passed into `mapper_`.
//
// TODO(crbug.com/40282760): This should be a `WTF::Deque` or `HeapDeque`,
// but neither support holding a `ScriptValue` type at the moment. This
// needs some investigation, so we can avoid using `HeapVector` here, which
// has O(n) performance when removing values from the front.
HeapVector<ScriptValue> queue_;

bool active_inner_subscription_ = false;

// This member keeps track of whether the "outer" subscription has
// completed. This is relevant because while we're currently processing
// "inner" observable subscriptions (i.e., the subscriptions associated with
// individual Observable values that the "outer" subscriber produces), the
// "outer" subscription may very well complete. This member helps us keep
// track of that so we know to complete our subscription once all "inner"
// values are done being processed.
bool outer_subscription_has_completed_ = false;
};

// The `Observable` which `this` will mirror, when `this` is subscribed to.
//
// All of these members are essentially state-less, and are just held here so
// that we can pass them into the `SourceInternalObserver` above, which gets
// created for each new subscription.
Member<Observable> source_observable_;
Member<V8Mapper> mapper_;
ExceptionContext exception_context_;
};

// This delegate is used by the `Observer#from()` operator, in the case where
// the given `any` value is an iterable. In that case, we store the iterable in
// `this` delegate, and upon subscription, synchronously push to the subscriber
Expand Down Expand Up @@ -1048,6 +1292,16 @@ Observable* Observable::drop(ScriptState*, uint64_t number_to_drop) {
return return_observable;
}

Observable* Observable::flatMap(ScriptState*,
V8Mapper* mapper,
ExceptionState& exception_state) {
Observable* return_observable = MakeGarbageCollected<Observable>(
GetExecutionContext(),
MakeGarbageCollected<OperatorFlatMapSubscribeDelegate>(
this, mapper, exception_state.GetContext()));
return return_observable;
}

ScriptPromise<IDLSequence<IDLAny>> Observable::toArray(
ScriptState* script_state,
SubscribeOptions* options) {
Expand Down
5 changes: 5 additions & 0 deletions third_party/blink/renderer/core/dom/observable.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class CORE_EXPORT Observable final : public ScriptWrappable,
Observable* filter(ScriptState*, V8Predicate*);
Observable* take(ScriptState*, uint64_t);
Observable* drop(ScriptState*, uint64_t);
// Does not actually throw exceptions to script, but we need access to the
// `exception_state` to determine if future calls to `from()` succeeded or
// failed. In the failure case, we clear the exception from the stack and
// report it to the relevant `Subscriber`.
Observable* flatMap(ScriptState*, V8Mapper*, ExceptionState& exception_state);

// Promise-returning operators. See
// https://wicg.github.io/observable/#promise-returning-operators.
Expand Down
1 change: 1 addition & 0 deletions third_party/blink/renderer/core/dom/observable.idl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ interface Observable {
[CallWith=ScriptState] Observable filter(Predicate predicate);
[CallWith=ScriptState] Observable take(unsigned long long number_to_take);
[CallWith=ScriptState] Observable drop(unsigned long long number_to_drop);
[CallWith=ScriptState, RaisesException] Observable flatMap(Mapper mapper);

// Promise-returning operators.
// See https://wicg.github.io/observable/#promise-returning-operators.
Expand Down
Loading

0 comments on commit 60af8e8

Please sign in to comment.