Skip to content

Commit

Permalink
[SPARK-38683][SHUFFLE] It is unnecessary to release the ShuffleManage…
Browse files Browse the repository at this point in the history
…dBufferIterator or ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the client channel's connection is terminated

### What changes were proposed in this pull request?

It is unnecessary to release the ShuffleManagedBufferIterator or ShuffleChunkManagedBufferIterator or ManagedBufferIterator buffers when the client channel's connection is terminated.

If a  client connection is closed before the iterator is fully drained, then the remaining materialized buffers should all be released, but some buffers like `ShuffleManagedBufferIterator`, `ShuffleChunkManagedBufferIterator`, `ManagedBufferIterator` are not materialized until the iterator is traversed by calling next(),  so we should not traverse and release them in order to avoid unnecessary buffer materialization,  which could be I/O based.
### Why are the changes needed?

To reduce I/O operations  for the External Shuffle Service.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unittests.

Closes apache#36000 from weixiuli/SPARK-38683-unnecessary-release.

Authored-by: weixiuli <[email protected]>
Signed-off-by: yi.wu <[email protected]>
  • Loading branch information
weixiuli authored and Ngone51 committed Apr 1, 2022
1 parent 669c625 commit dc995f2
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ private static class StreamState {

// The channel associated to the stream
final Channel associatedChannel;
// Indicates whether the buffers is only materialized when next() is called. Some buffers like
// ShuffleManagedBufferIterator, ShuffleChunkManagedBufferIterator, ManagedBufferIterator are
// not materialized until the iterator is traversed by calling next(). We use it to decide
// whether buffers should be released at connectionTerminated() in order to avoid unnecessary
// buffer materialization, which could be I/O based.
final boolean isBufferMaterializedOnNext;

// Used to keep track of the index of the buffer that the user has retrieved, just to ensure
// that the caller only requests each chunk one at a time, in order.
Expand All @@ -59,10 +65,15 @@ private static class StreamState {
// Used to keep track of the number of chunks being transferred and not finished yet.
final AtomicLong chunksBeingTransferred = new AtomicLong(0L);

StreamState(String appId, Iterator<ManagedBuffer> buffers, Channel channel) {
StreamState(
String appId,
Iterator<ManagedBuffer> buffers,
Channel channel,
boolean isBufferMaterializedOnNext) {
this.appId = appId;
this.buffers = Preconditions.checkNotNull(buffers);
this.associatedChannel = channel;
this.isBufferMaterializedOnNext = isBufferMaterializedOnNext;
}
}

Expand Down Expand Up @@ -130,7 +141,7 @@ public void connectionTerminated(Channel channel) {

try {
// Release all remaining buffers.
while (state.buffers.hasNext()) {
while (!state.isBufferMaterializedOnNext && state.buffers.hasNext()) {
ManagedBuffer buffer = state.buffers.next();
if (buffer != null) {
buffer.release();
Expand Down Expand Up @@ -205,8 +216,11 @@ public long chunksBeingTransferred() {
/**
* Registers a stream of ManagedBuffers which are served as individual chunks one at a time to
* callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a
* client connection is closed before the iterator is fully drained, then the remaining buffers
* will all be release()'d.
* client connection is closed before the iterator is fully drained, then the remaining
* materialized buffers will all be release()'d, but some buffers like
* ShuffleManagedBufferIterator, ShuffleChunkManagedBufferIterator, ManagedBufferIterator should
* not release, because they have not been materialized before requesting the iterator by
* the next method.
*
* If an app ID is provided, only callers who've authenticated with the given app ID will be
* allowed to fetch from this stream.
Expand All @@ -215,12 +229,20 @@ public long chunksBeingTransferred() {
* to be the only reader of the stream. Once the connection is closed, the stream will never
* be used again, enabling cleanup by `connectionTerminated`.
*/
public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel) {
public long registerStream(
String appId,
Iterator<ManagedBuffer> buffers,
Channel channel,
boolean isBufferMaterializedOnNext) {
long myStreamId = nextStreamId.getAndIncrement();
streams.put(myStreamId, new StreamState(appId, buffers, channel));
streams.put(myStreamId, new StreamState(appId, buffers, channel, isBufferMaterializedOnNext));
return myStreamId;
}

public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel) {
return registerStream(appId, buffers, channel, false);
}

@VisibleForTesting
public int numStreamStates() {
return streams.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,37 @@ public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrows
Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
Assert.assertEquals(0, manager.numStreamStates());
}

@Test
public void streamStatesAreFreeOrNotWhenConnectionIsClosed() {
OneForOneStreamManager manager = new OneForOneStreamManager();
ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class);

Iterator<ManagedBuffer> buffers1 = Mockito.mock(Iterator.class);
Mockito.when(buffers1.hasNext()).thenReturn(true).thenReturn(false);
Mockito.when(buffers1.next()).thenReturn(mockManagedBuffer);

Iterator<ManagedBuffer> buffers2 = Mockito.mock(Iterator.class);
Mockito.when(buffers2.hasNext()).thenReturn(true);
Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer);

Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
// should Release,
manager.registerStream("appId", buffers1, dummyChannel, false);
// should NOT Release
manager.registerStream("appId", buffers2, dummyChannel, true);
Assert.assertEquals(2, manager.numStreamStates());

// connectionTerminated
manager.connectionTerminated(dummyChannel);

Mockito.verify(buffers1, Mockito.times(2)).hasNext();
Mockito.verify(buffers1, Mockito.times(1)).next();

Mockito.verify(buffers2, Mockito.times(0)).hasNext();
Mockito.verify(buffers2, Mockito.times(0)).next();
// only buffers1 has been released
Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
Assert.assertEquals(0, manager.numStreamStates());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ protected void handleMessage(
iterator = new ShuffleChunkManagedBufferIterator((FetchShuffleBlockChunks) msgObj);
}
streamId = streamManager.registerStream(client.getClientId(), iterator,
client.getChannel());
client.getChannel(), true);
} else {
// For the compatibility with the old version, still keep the support for OpenBlocks.
OpenBlocks msg = (OpenBlocks) msgObj;
numBlockIds = msg.blockIds.length;
checkAuth(client, msg.appId);
streamId = streamManager.registerStream(client.getClientId(),
new ManagedBufferIterator(msg), client.getChannel());
new ManagedBufferIterator(msg), client.getChannel(), true);
}
if (logger.isTraceEnabled()) {
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ private void checkOpenBlocksReceive(BlockTransferMessage msg, ManagedBuffer[] bl
ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>)
(ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
verify(streamManager, times(1)).registerStream(anyString(), stream.capture(),
any());
any(), anyBoolean());
Iterator<ManagedBuffer> buffers = stream.getValue();
for (ManagedBuffer blockMarker : blockMarkers) {
assertEquals(blockMarker, buffers.next());
Expand Down Expand Up @@ -451,7 +451,8 @@ private void verifyBlockChunkFetches(boolean useOpenBlocks) {
@SuppressWarnings("unchecked")
ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>)
(ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
verify(streamManager, times(1)).registerStream(any(), stream.capture(), any());
verify(streamManager, times(1)).registerStream(any(), stream.capture(),
any(), anyBoolean());
Iterator<ManagedBuffer> bufferIter = stream.getValue();
for (int reduceId = 0; reduceId < 2; reduceId++) {
for (int chunkId = 0; chunkId < 2; chunkId++) {
Expand Down

0 comments on commit dc995f2

Please sign in to comment.