Skip to content

Commit

Permalink
DOM: Implement inspect() Observable operator
Browse files Browse the repository at this point in the history
This CL implements the Observable#inspect() operator. Inspect gives a
way to tap into an Observable's outputs. To inspect(), you can provide
the following callbacks:

```
source
  .inspect({
    next: v => {},
    error: e => {},
    complete: () => {}

    subscribe: () => {}
    abort: reason => {}
  })
```

These are mostly self-explanatory. The subscribe() callback gets called
just before the source Observable gets subscribed to, and the abort()
handler gets called whenever the Subscriber's signal gets aborted
specifically for consumer-initiated unsubscriptions. That is, the abort
handler will not be invoked when the source Observable calls its
`Subscriber#{error(), complete()}` methods. It only gets called for
consumer-initiated subscriptions. This is accomplished by adding an
abort algorithm to the inspect() Subscriber's signal, and removing that
algorithm immediately whenever the
`SourceInternalObserver#{Error(), Complete()}` methods are invoked, so
that the `abort()` handler callback does not get invoked when these
methods error()/complete() the associated Subscriber.

For WPTs:
Co-authored-by: [email protected]

[email protected]

Bug: 40282760
Change-Id: Ia9dfc1291c62e6e665656c09499243ea9930c28a
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/5435614
Commit-Queue: Dominic Farolino <[email protected]>
Reviewed-by: Mason Freed <[email protected]>
Cr-Commit-Position: refs/heads/main@{#1291316}
  • Loading branch information
domfarolino authored and Chromium LUCI CQ committed Apr 23, 2024
1 parent e17dc44 commit 19a2b8a
Show file tree
Hide file tree
Showing 9 changed files with 752 additions and 0 deletions.
6 changes: 6 additions & 0 deletions third_party/blink/renderer/bindings/generated_in_core.gni
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ generated_callback_function_sources_in_core = [
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_navigation_intercept_handler.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_no_argument_constructor.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_no_argument_constructor.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observable_inspector_abort_handler.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observable_inspector_abort_handler.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observer_complete_callback.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observer_complete_callback.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observer_callback.cc",
Expand Down Expand Up @@ -306,6 +308,8 @@ generated_dictionary_sources_in_core = [
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_navigator_ua_brand_version.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observable_event_listener_options.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observable_event_listener_options.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observable_inspector.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observable_inspector.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observer.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_observer.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_optional_effect_timing.cc",
Expand Down Expand Up @@ -1844,6 +1848,8 @@ generated_union_sources_in_core = [
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_union_object_objectarray_string.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_union_object_string.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_union_object_string.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_union_observableinspector_observercallback.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_union_observableinspector_observercallback.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_union_observer_observercallback.cc",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_union_observer_observercallback.h",
"$root_gen_dir/third_party/blink/renderer/bindings/core/v8/v8_union_opaqueproperty_unsignedlong.cc",
Expand Down
314 changes: 314 additions & 0 deletions third_party/blink/renderer/core/dom/observable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
#include "third_party/blink/renderer/bindings/core/v8/script_promise.h"
#include "third_party/blink/renderer/bindings/core/v8/script_promise_resolver.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_mapper.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_observable_inspector.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_observable_inspector_abort_handler.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_observer.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_observer_callback.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_observer_complete_callback.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_predicate.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_subscribe_callback.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_subscribe_options.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_union_observableinspector_observercallback.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_union_observer_observercallback.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_visitor.h"
#include "third_party/blink/renderer/bindings/core/v8/v8_void_function.h"
#include "third_party/blink/renderer/core/dom/abort_controller.h"
#include "third_party/blink/renderer/core/dom/dom_exception.h"
#include "third_party/blink/renderer/core/dom/observable_internal_observer.h"
Expand Down Expand Up @@ -382,6 +386,269 @@ class OperatorFromPromiseSubscribeDelegate final
ScriptPromiseUntyped promise_;
};

// This is the subscribe delegate for the `inspect()` operator. It allows one to
// supply a pseudo "Observer" dictionary, specifically an `ObservableInspector`,
// which can tap into the direct outputs of a source Observable. It mirrors its
// `next()`, `error()`, and `complete()` handlers, as well as letting you pass
// in two supplemental callbacks:
// 1. A `subscribe()` callback, which runs immediately when the
// `Observable`-returned-from-`inspect()` is subscribed to, and just before
// *it* subscribes to its source Observable. Errors from this callback are
// piped to the consumer Subscriber's `error()` handler, and the
// subscription is promptly closed.
// 2. An `abort()` callback, which is run specifically for consumer-initiated
// unsubscriptions/aborts, NOT producer (source-Observable-initiated)
// unsubscriptions (via `complete()` or `error()`). See the documentation
// in `OperatorInspectSubscribeDelegate::SourceInternalObserver::Error()`.
class OperatorInspectSubscribeDelegate final
: public Observable::SubscribeDelegate {
public:
OperatorInspectSubscribeDelegate(
Observable* source_observable,
V8ObserverCallback* next_callback,
V8ObserverCallback* error_callback,
V8ObserverCompleteCallback* complete_callback,
V8VoidFunction* subscribe_callback,
V8ObservableInspectorAbortHandler* abort_callback)
: source_observable_(source_observable),
next_callback_(next_callback),
error_callback_(error_callback),
complete_callback_(complete_callback),
subscribe_callback_(subscribe_callback),
abort_callback_(abort_callback) {}
void OnSubscribe(Subscriber* subscriber, ScriptState* script_state) override {
if (subscribe_callback_) {
// `ScriptState::Scope` can only be created in a valid context, so
// early-return if we're in a detached one.
if (!script_state->ContextIsValid()) {
return;
}

ScriptState::Scope scope(script_state);
v8::TryCatch try_catch(script_state->GetIsolate());
std::ignore = subscribe_callback_->Invoke(nullptr);
if (try_catch.HasCaught()) {
ScriptValue exception(script_state->GetIsolate(),
try_catch.Exception());
subscriber->error(script_state, exception);
return;
}
}

AbortSignal::AlgorithmHandle* abort_algorithm_handle = nullptr;
if (abort_callback_) {
abort_algorithm_handle = subscriber->signal()->AddAlgorithm(
MakeGarbageCollected<InspectorAbortHandlerAlgorithm>(
abort_callback_, subscriber->signal(), script_state));
}

// At this point, the `subscribe_callback_` has been called and has not
// thrown an exception, so we proceed to *actually* subscribe to the
// underlying Observable, invoking *its* callback through the normal flow
// and so on.
SubscribeOptions* options = MakeGarbageCollected<SubscribeOptions>();
options->setSignal(subscriber->signal());

source_observable_->SubscribeWithNativeObserver(
script_state,
MakeGarbageCollected<SourceInternalObserver>(
subscriber, script_state, abort_algorithm_handle, next_callback_,
error_callback_, complete_callback_),
options);
}

void Trace(Visitor* visitor) const override {
visitor->Trace(source_observable_);

visitor->Trace(next_callback_);
visitor->Trace(error_callback_);
visitor->Trace(complete_callback_);
visitor->Trace(abort_callback_);
visitor->Trace(subscribe_callback_);

Observable::SubscribeDelegate::Trace(visitor);
}

private:
class InspectorAbortHandlerAlgorithm final : public AbortSignal::Algorithm {
public:
InspectorAbortHandlerAlgorithm(
V8ObservableInspectorAbortHandler* abort_handler,
AbortSignal* signal,
ScriptState* script_state)
: abort_handler_(abort_handler),
signal_(signal),
script_state_(script_state) {
CHECK(abort_handler_);
CHECK(signal_);
CHECK(script_state_);
}

void Run() override {
abort_handler_->InvokeAndReportException(nullptr,
signal_->reason(script_state_));
}

void Trace(Visitor* visitor) const override {
visitor->Trace(abort_handler_);
visitor->Trace(signal_);
visitor->Trace(script_state_);

Algorithm::Trace(visitor);
}

private:
// Never null. The JS callback that `this` runs when `signal_ is aborted.
Member<V8ObservableInspectorAbortHandler> abort_handler_;
// Never null. We have to store the `signal_` that `this` is associated with
// in order to get the abort reason.
Member<AbortSignal> signal_;
Member<ScriptState> script_state_;
};

class SourceInternalObserver final : public ObservableInternalObserver {
public:
SourceInternalObserver(Subscriber* subscriber,
ScriptState* script_state,
AbortSignal::AlgorithmHandle* abort_algorithm_handle,
V8ObserverCallback* next_callback,
V8ObserverCallback* error_callback,
V8ObserverCompleteCallback* complete_callback)
: subscriber_(subscriber),
script_state_(script_state),
abort_algorithm_handle_(abort_algorithm_handle),
next_callback_(next_callback),
error_callback_(error_callback),
complete_callback_(complete_callback) {
CHECK(subscriber_);
CHECK(script_state_);
// All of `next_callback_`, `error_callback_`, `complete_callback_`,
// `abort_callback`, can all be null, because script may not have provided
// any of them.
}

void ResetAbortAlgorithm() {
if (!abort_algorithm_handle_) {
return;
}

subscriber_->signal()->RemoveAlgorithm(abort_algorithm_handle_);
abort_algorithm_handle_ = nullptr;
}

void Next(ScriptValue value) override {
if (!next_callback_) {
subscriber_->next(value);
return;
}

// `ScriptState::Scope` can only be created in a valid context, so
// early-return if we're in a detached one.
if (!script_state_->ContextIsValid()) {
return;
}

ScriptState::Scope scope(script_state_);
v8::TryCatch try_catch(script_state_->GetIsolate());
// Invoking `callback_` can detach the context, but that's OK, nothing
// below this invocation relies on an attached/valid context.
std::ignore = next_callback_->Invoke(nullptr, value);
if (try_catch.HasCaught()) {
ScriptValue exception(script_state_->GetIsolate(),
try_catch.Exception());
// See the documentation in `Error()` for what this does.
ResetAbortAlgorithm();
subscriber_->error(script_state_, exception);
}

subscriber_->next(value);
}
void Error(ScriptState*, ScriptValue error) override {
// The algorithm represented by `abort_algorithm_handle_` invokes the
// `ObservableInspector` dictionary's `ObservableInspectorAbortHandler`
// callback. However, that callback must only be invoked for
// consumer-initiated aborts, NOT producer-initiated aborts. This means,
// when the source Observable calls `Error()` or `Complete()` on `this`,
// we must remove the algorithm from `subscriber_`'s signal, because said
// signal is about to be aborted for producer-initiated reasons.
ResetAbortAlgorithm();

if (!error_callback_) {
subscriber_->error(script_state_, error);
return;
}

if (!script_state_->ContextIsValid()) {
return;
}

ScriptState::Scope scope(script_state_);
v8::TryCatch try_catch(script_state_->GetIsolate());
std::ignore = error_callback_->Invoke(nullptr, error);
if (try_catch.HasCaught()) {
ScriptValue exception(script_state_->GetIsolate(),
try_catch.Exception());
subscriber_->error(script_state_, exception);
}

subscriber_->error(script_state_, error);
}
void Complete() override {
// See the documentation in `Error()` for what this does.
ResetAbortAlgorithm();

if (!complete_callback_) {
subscriber_->complete(script_state_);
return;
}

if (!script_state_->ContextIsValid()) {
return;
}

ScriptState::Scope scope(script_state_);
v8::TryCatch try_catch(script_state_->GetIsolate());
std::ignore = complete_callback_->Invoke(nullptr);
if (try_catch.HasCaught()) {
ScriptValue exception(script_state_->GetIsolate(),
try_catch.Exception());
subscriber_->error(script_state_, exception);
}

subscriber_->complete(script_state_);
}

void Trace(Visitor* visitor) const override {
visitor->Trace(subscriber_);
visitor->Trace(script_state_);
visitor->Trace(abort_algorithm_handle_);

visitor->Trace(next_callback_);
visitor->Trace(error_callback_);
visitor->Trace(complete_callback_);

ObservableInternalObserver::Trace(visitor);
}

private:
Member<Subscriber> subscriber_;
Member<ScriptState> script_state_;
Member<AbortSignal::AlgorithmHandle> abort_algorithm_handle_;

Member<V8ObserverCallback> next_callback_;
Member<V8ObserverCallback> error_callback_;
Member<V8ObserverCompleteCallback> complete_callback_;
};
// The `Observable` which `this` will mirror, when `this` is subscribed to.
Member<Observable> source_observable_;

Member<V8ObserverCallback> next_callback_;
Member<V8ObserverCallback> error_callback_;
Member<V8ObserverCompleteCallback> complete_callback_;
Member<V8VoidFunction> subscribe_callback_;
Member<V8ObservableInspectorAbortHandler> abort_callback_;
};

class OperatorSwitchMapSubscribeDelegate final
: public Observable::SubscribeDelegate {
public:
Expand Down Expand Up @@ -1631,6 +1898,53 @@ Observable* Observable::switchMap(ScriptState*,
return return_observable;
}

Observable* Observable::inspect(
ScriptState* script_state,
V8UnionObservableInspectorOrObserverCallback* inspector_union) {
V8VoidFunction* subscribe_callback = nullptr;
V8ObserverCallback* next_callback = nullptr;
V8ObserverCallback* error_callback = nullptr;
V8ObserverCompleteCallback* complete_callback = nullptr;
V8ObservableInspectorAbortHandler* abort_callback = nullptr;

if (inspector_union) {
switch (inspector_union->GetContentType()) {
case V8UnionObservableInspectorOrObserverCallback::ContentType::
kObservableInspector: {
ObservableInspector* inspector =
inspector_union->GetAsObservableInspector();
if (inspector->hasSubscribe()) {
subscribe_callback = inspector->subscribe();
}
if (inspector->hasNext()) {
next_callback = inspector->next();
}
if (inspector->hasError()) {
error_callback = inspector->error();
}
if (inspector->hasComplete()) {
complete_callback = inspector->complete();
}
if (inspector->hasAbort()) {
abort_callback = inspector->abort();
}
break;
}
case V8UnionObservableInspectorOrObserverCallback::ContentType::
kObserverCallback:
next_callback = inspector_union->GetAsObserverCallback();
break;
}
}

Observable* return_observable = MakeGarbageCollected<Observable>(
GetExecutionContext(),
MakeGarbageCollected<OperatorInspectSubscribeDelegate>(
this, next_callback, error_callback, complete_callback,
subscribe_callback, abort_callback));
return return_observable;
}

ScriptPromise<IDLSequence<IDLAny>> Observable::toArray(
ScriptState* script_state,
SubscribeOptions* options) {
Expand Down
3 changes: 3 additions & 0 deletions third_party/blink/renderer/core/dom/observable.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class SubscribeOptions;
class V8Mapper;
class V8Predicate;
class V8SubscribeCallback;
class V8UnionObservableInspectorOrObserverCallback;
class V8UnionObserverOrObserverCallback;
class V8Visitor;

Expand Down Expand Up @@ -73,6 +74,8 @@ class CORE_EXPORT Observable final : public ScriptWrappable,
Observable* switchMap(ScriptState*,
V8Mapper*,
ExceptionState& exception_state);
Observable* inspect(ScriptState*,
V8UnionObservableInspectorOrObserverCallback*);

// Promise-returning operators. See
// https://wicg.github.io/observable/#promise-returning-operators.
Expand Down
Loading

0 comments on commit 19a2b8a

Please sign in to comment.