This repository has been archived by the owner on Oct 28, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathupdates_stream.dart
96 lines (87 loc) · 2.6 KB
/
updates_stream.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import 'dart:async';
import 'package:async/async.dart';
import 'package:flutter/foundation.dart';
/// Mixin that adds an updates stream to a component.
///
/// Consumers can listen to stream updates and the host component can
/// produce them.
///
/// Use `addUpdate` and `addError` to emmit events.
/// Use the getters `updates` and `updatesSticky` to subscribe to update events.
///
/// Example:
/// class NumberComponent with UpdatesStream<int> {
/// int number = 0;
///
/// void increment() {
/// number++;
/// addUpdate(number);
/// }
///
/// void decrement() {
/// number--;
/// addUpdate(number);
/// }
/// }
///
/// void main() {
/// final numberComponent = NumberComponent();
///
/// numberComponent.updatesSticky
/// .listen((newNumber) { print(newNumber); });
///
/// numberComponent.increment;
/// }
///
/// // Output: 0, 1;
mixin UpdatesStream<T> {
final StreamController<T?> _streamController = StreamController.broadcast();
T? _lastEmittedItem;
Object? _lastEmittedError;
bool _hasEmittedItem = false;
/// Emits update event to the stream.
@protected
@mustCallSuper
void addUpdate(T? event) {
_hasEmittedItem = true;
_lastEmittedItem = event;
_lastEmittedError = null;
_streamController.add(event);
}
/// Emits error event to the stream.
/// Error events don't break dart streams.
@protected
@mustCallSuper
void addError(Object error) {
_hasEmittedItem = true;
_lastEmittedError = error;
_lastEmittedItem = null;
_streamController.addError(error);
}
/// Permanently closes this updates stream.
@protected
@mustCallSuper
Future<void> closeUpdatesStream() => _streamController.close();
/// Returns a broadcast stream that emits updates by this component.
/// [null] is a valid emitted item.
Stream<T?> get updates => _streamController.stream;
/// Returns a broadcast stream that emits updates by this component,
/// starting with any last emitted item or error first.
/// [null] is a valid emitted item.
Stream<T?> get updatesSticky => (StreamGroup<T?>.broadcast()
..add(_lastEmittedItemStream())
..add(_streamController.stream)
..close())
.stream;
// Single item stream that emits the last item, or last error, if any.
Stream<T?> _lastEmittedItemStream() {
if (_hasEmittedItem) {
if (_lastEmittedError != null) {
return Stream.error(_lastEmittedError!);
}
return Stream.value(_lastEmittedItem);
} else {
return Stream.empty();
}
}
}