Skip to content

Commit

Permalink
Update JsonSocketChannel to send the message length in the uintlist i…
Browse files Browse the repository at this point in the history
…nstead of a trailing \n (invertase#141)

This ensures that if the message contains a \n, the logic continues to work.
  • Loading branch information
rrousselGit authored Apr 13, 2023
1 parent 43e3265 commit 4a06a01
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 17 deletions.
5 changes: 5 additions & 0 deletions packages/custom_lint/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## Unreleased fix

- Update the socket communication logic to avoid possible problem is the message
contains a \n.

## 0.3.3 - 2023-04-06

- Reduce the likelyness of a dependency version conflict.
Expand Down
72 changes: 55 additions & 17 deletions packages/custom_lint/lib/src/channels.dart
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ abstract class AnalyzerPluginClientChannel {
Future<void> close();
}

/// The number of bytes used to store the length of a message
const _lengthBytes = 4;

/// An interface for discussing with analyzer_plugin using a [SendPort]
class JsonSendPortChannel extends AnalyzerPluginClientChannel {
/// An interface for discussing with analyzer_plugin using a [SendPortƒ]
Expand Down Expand Up @@ -80,32 +83,67 @@ class JsonSocketChannel extends AnalyzerPluginClientChannel {
final _controller = StreamController<Uint8List>.broadcast();
late StreamSubscription<Object?> _subscription;

/// Send a message while having the first 4 bytes of the message be the length of the message.
void _sendWithLength(Socket socket, List<int> data) {
final length = data.length;
final buffer = Uint8List(_lengthBytes + length);
final byteData = ByteData.view(buffer.buffer);

byteData.setUint32(0, length);
buffer.setRange(_lengthBytes, _lengthBytes + length, data);
socket.add(buffer);
}

/// The [sendJson] method have messages start with the message length,
/// because a chunk of data can contain multiple separate messages.
///
/// By sending the length with every message, the receiver can know
/// where a message ends and another begins.
Iterable<List<int>> _receiveWithLength(Uint8List input) sync* {
final chunk = ByteData.view(input.buffer);

var startOffset = 0;
var bytesCountNeeded = _lengthBytes;
var isReadingMessageLength = true;

while (startOffset + bytesCountNeeded <= input.length) {
if (isReadingMessageLength) {
// Reading the length of the next message.
bytesCountNeeded = chunk.getUint32(startOffset);

// We have the message length, now reading the message.
startOffset += _lengthBytes;
isReadingMessageLength = false;
} else {
// We have the message length, now reading the message.
final message = input.sublist(
startOffset,
startOffset + bytesCountNeeded,
);
yield message;

// Reset to reading the length of the next message.
startOffset += bytesCountNeeded;
bytesCountNeeded = _lengthBytes;
isReadingMessageLength = true;
}
}
}

@override
late final Stream<Object?> messages = _controller.stream
.expand(_receiveWithLength)
.map(utf8.decode)

/// Sometimes the socket receives multiple messages at once,
/// concatenated with `\n` (see [sendJson]).
/// So we're splitting the message into multiple bits
.expand(
(e) => e.split('\n')
// Since all messages always include a trailing \n, after
// a split the last string will be "". Removing it
..removeLast(),
)
.map<Object?>(jsonDecode);

@override
Future<void> sendJson(Map<String, Object?> json) async {
// ignore: close_sinks
final socket = await _socket;
socket.add(
utf8.encode(
// Adding a trailing \n to handle the scenario where a Socket
// merges multiple messages in one.
// The \n is used as EOL to separate the different messages
'${jsonEncode(json)}\n',
),

_sendWithLength(
socket,
utf8.encode(jsonEncode(json)),
);
}

Expand Down

0 comments on commit 4a06a01

Please sign in to comment.