Skip to content

Commit

Permalink
[FLINK-32751][streaming] Refactors CollectSinkOperatorCoordinator to …
Browse files Browse the repository at this point in the history
…improve its testability

Additionally, a few new test scenarios were added to CollectSinkOperatorCoordinatorTest and SocketConnection was introduced
  • Loading branch information
XComp committed Aug 30, 2023
1 parent ef9e490 commit d343a0b
Show file tree
Hide file tree
Showing 3 changed files with 443 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.flink.streaming.api.operators.collect;

import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
Expand All @@ -41,7 +40,6 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CancellationException;
Expand All @@ -64,15 +62,19 @@ public class CollectSinkOperatorCoordinator
private final int socketTimeout;

private InetSocketAddress address;
private Socket socket;
private DataInputViewStreamWrapper inStream;
private DataOutputViewStreamWrapper outStream;

private SocketConnection socketConnection;

private final Set<CompletableFuture<CoordinationResponse>> ongoingRequests =
ConcurrentHashMap.newKeySet();

private ExecutorService executorService;

@VisibleForTesting
CollectSinkOperatorCoordinator() {
this(0);
}

public CollectSinkOperatorCoordinator(int socketTimeout) {
this.socketTimeout = socketTimeout;
}
Expand Down Expand Up @@ -157,25 +159,18 @@ private CoordinationResponse handleRequestImpl(
throw new NullPointerException("No sinkAddress available.");
}

if (socket == null) {
socket = new Socket();
socket.setSoTimeout(socketTimeout);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);

socket.connect(sinkAddress);
inStream = new DataInputViewStreamWrapper(socket.getInputStream());
outStream = new DataOutputViewStreamWrapper(socket.getOutputStream());
if (socketConnection == null) {
socketConnection = SocketConnection.create(socketTimeout, sinkAddress);
LOG.info("Sink connection established");
}

// send version and offset to sink server
LOG.debug("Forwarding request to sink socket server");
request.serialize(outStream);
request.serialize(socketConnection.getDataOutputView());

// fetch back serialized results
LOG.debug("Fetching serialized result from sink socket server");
return new CollectCoordinationResponse(inStream);
return new CollectCoordinationResponse(socketConnection.getDataInputView());
}

private CollectCoordinationResponse createEmptyResponse(CollectCoordinationRequest request) {
Expand All @@ -189,14 +184,14 @@ private CollectCoordinationResponse createEmptyResponse(CollectCoordinationReque
}

private void closeConnection() {
if (socket != null) {
if (socketConnection != null) {
try {
socket.close();
} catch (IOException e) {
socketConnection.close();
} catch (Exception e) {
LOG.warn("Failed to close sink socket server connection", e);
}
socketConnection = null;
}
socket = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.operators.collect;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;

/**
* {@code SocketConnection} is a helper class to collect Socket-related fields that belong to the
* same {@link Socket} connection.
*/
class SocketConnection implements AutoCloseable {

private final Socket socket;
private final DataInputViewStreamWrapper inStream;
private final DataOutputViewStreamWrapper outStream;

public static SocketConnection create(int socketTimeout, InetSocketAddress address)
throws IOException {
final Socket newSocket = new Socket();
newSocket.setSoTimeout(socketTimeout);
newSocket.setKeepAlive(true);
newSocket.setTcpNoDelay(true);

newSocket.connect(address);

return new SocketConnection(newSocket);
}

@VisibleForTesting
SocketConnection(Socket connectedSocket) throws IOException {
Preconditions.checkArgument(connectedSocket.isConnected());

this.socket = connectedSocket;
this.inStream = new DataInputViewStreamWrapper(socket.getInputStream());
this.outStream = new DataOutputViewStreamWrapper(socket.getOutputStream());
}

public DataInputView getDataInputView() {
return this.inStream;
}

public DataOutputView getDataOutputView() {
return this.outStream;
}

@Override
public void close() throws Exception {
this.outStream.close();
this.inStream.close();
this.socket.close();
}
}
Loading

0 comments on commit d343a0b

Please sign in to comment.