Skip to content

Commit

Permalink
[hotfix][network] Rename ResultPartitionWriter#initializeState to #re…
Browse files Browse the repository at this point in the history
…adRecoveredState
  • Loading branch information
zhijiangW committed Apr 29, 2020
1 parent f87e120 commit 4a1363f
Show file tree
Hide file tree
Showing 10 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
void setup() throws IOException;

/**
* Loads the previous output states with the given reader for unaligned checkpoint.
* Reads the previous output states with the given reader for unaligned checkpoint.
* It should be done before task processing the inputs.
*/
void initializeState(ChannelStateReader stateReader) throws IOException, InterruptedException;
void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException;

ResultPartitionID getPartitionId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class PipelinedSubpartition extends ResultSubpartition {
}

@Override
public void initializeState(ChannelStateReader stateReader) throws IOException, InterruptedException {
public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
for (ReadResult readResult = ReadResult.HAS_MORE_DATA; readResult == ReadResult.HAS_MORE_DATA;) {
BufferBuilder bufferBuilder = parent.getBufferPool().requestBufferBuilderBlocking(subpartitionInfo.getSubPartitionIdx());
BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ public void setup() throws IOException {
}

@Override
public void initializeState(ChannelStateReader stateReader) throws IOException, InterruptedException {
public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.initializeState(stateReader);
subpartition.readRecoveredState(stateReader);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void onConsumedSubpartition() {
parent.onConsumedSubpartition(getSubPartitionIndex());
}

public void initializeState(ChannelStateReader stateReader) throws IOException, InterruptedException {
public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public ResultSubpartition getSubpartition(int subpartitionIndex) {
}

@Override
public void initializeState(ChannelStateReader stateReader) throws IOException, InterruptedException {
partitionWriter.initializeState(stateReader);
public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
partitionWriter.readRecoveredState(stateReader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ public void testEmitRecordWithPartitionStateRecovery() throws Exception {

try {
partition.setup();
partition.initializeState(stateReader);
partition.readRecoveredState(stateReader);

for (int record: records) {
// the record length 4 is also written into buffer for every emit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void setup() {
}

@Override
public void initializeState(ChannelStateReader stateReader) {
public void readRecoveredState(ChannelStateReader stateReader) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ public void testInitializeEmptyState() throws Exception {
final ChannelStateReader stateReader = ChannelStateReader.NO_OP;
try {
partition.setup();
partition.initializeState(stateReader);
partition.readRecoveredState(stateReader);

for (ResultSubpartition subpartition : partition.getAllPartitions()) {
// no buffers are added into the queue for empty states
Expand Down Expand Up @@ -513,7 +513,7 @@ public void testInitializeMoreStateThanBuffer() throws Exception {
Future<Void> result = executor.submit(partitionConsumeTask);

partition.setup();
partition.initializeState(stateReader);
partition.readRecoveredState(stateReader);

// wait the partition consume task finish
result.get(20, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ protected void beforeInvoke() throws Exception {
if (writers != null) {
//TODO we should get proper state reader from getEnvironment().getTaskStateManager().getChannelStateReader()
for (ResultPartitionWriter writer : writers) {
writer.initializeState(ChannelStateReader.NO_OP);
writer.readRecoveredState(ChannelStateReader.NO_OP);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ private static class RecoveryResultPartition extends MockResultPartitionWriter {
}

@Override
public void initializeState(ChannelStateReader stateReader) {
public void readRecoveredState(ChannelStateReader stateReader) {
isStateInitialized = true;
}

Expand Down

0 comments on commit 4a1363f

Please sign in to comment.