Skip to content

Commit

Permalink
Fix for Bug#113509 (Bug#36154975), closeOnCompletion cause no stateme…
Browse files Browse the repository at this point in the history
…nt reuse and server memory leak.

Change-Id: I09bf3b5964bbcdd6fbfaa7866dede5df507e92f2
  • Loading branch information
fjssilva committed Jul 24, 2024
1 parent c35535d commit de3e3f6
Show file tree
Hide file tree
Showing 32 changed files with 436 additions and 524 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

Version 9.1.0

- Fix for Bug#113509 (Bug#36154975), closeOnCompletion cause no statement reuse and server memory leak.

- Fix for Bug#109418 (Bug#36043556), batch insert threw an unexpected exception.

- Fix for Bug#114410 (Bug#36434816), Code performance issue.
Expand Down
9 changes: 4 additions & 5 deletions src/build/java/instrumentation/TranslateExceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.mysql.cj.jdbc.CallableStatement.CallableStatementParamInfo;
import com.mysql.cj.jdbc.ClientPreparedStatement;
import com.mysql.cj.jdbc.Clob;
import com.mysql.cj.jdbc.CloseOption;
import com.mysql.cj.jdbc.ConnectionImpl;
import com.mysql.cj.jdbc.ConnectionWrapper;
import com.mysql.cj.jdbc.DatabaseMetaData;
Expand Down Expand Up @@ -101,7 +102,6 @@ public static void main(String[] args) throws Exception {
CtClass ctQueryBindings = pool.get(QueryBindings.class.getName());
//CtClass ctByteArray = pool.get(byte[].class.getName());
CtClass ctColumnDefinition = pool.get(ColumnDefinition.class.getName());

CtClass ctLongArray = pool.get(long[].class.getName());
//CtClass ctInputStream = pool.get(InputStream.class.getName());
CtClass ctJdbcConnection = pool.get(JdbcConnection.class.getName());
Expand All @@ -113,8 +113,8 @@ public static void main(String[] args) throws Exception {
CtClass ctStatement = pool.get(java.sql.Statement.class.getName());
CtClass ctStatementImpl = pool.get(StatementImpl.class.getName());
CtClass ctString = pool.get(String.class.getName());

CtClass ctMessageBody = pool.get(Message.class.getName());
CtClass ctCloseOptions = pool.get(CloseOption[].class.getName());

// class we want to instrument
CtClass clazz;
Expand Down Expand Up @@ -332,8 +332,7 @@ public static void main(String[] args) throws Exception {
new CtClass[] { CtClass.intType, ctMessageBody, CtClass.booleanType, CtClass.booleanType, ctColumnDefinition, CtClass.booleanType }),
EXCEPTION_INTERCEPTOR_GETTER);
//catchRuntimeException(clazz, clazz.getDeclaredMethod("isRewritableWithMultiValueClause", new CtClass[] {}), EXCEPTION_INTERCEPTOR_GETTER);
catchRuntimeException(clazz, clazz.getDeclaredMethod("realClose", new CtClass[] { CtClass.booleanType, CtClass.booleanType }),
EXCEPTION_INTERCEPTOR_GETTER);
catchRuntimeException(clazz, clazz.getDeclaredMethod("doClose", new CtClass[] { ctCloseOptions }), EXCEPTION_INTERCEPTOR_GETTER);
catchRuntimeException(clazz, clazz.getDeclaredMethod("serverExecute", new CtClass[] { CtClass.intType, CtClass.booleanType, ctColumnDefinition }),
EXCEPTION_INTERCEPTOR_GETTER);
//catchRuntimeException(clazz, clazz.getDeclaredMethod("serverLongData", new CtClass[] { CtClass.intType, ctServerPreparedQueryBindValue }),
Expand Down Expand Up @@ -398,7 +397,7 @@ public static void main(String[] args) throws Exception {
catchRuntimeException(clazz, clazz.getDeclaredMethod("getResultSetInternal", new CtClass[] {}), EXCEPTION_INTERCEPTOR_GETTER);
catchRuntimeException(clazz, clazz.getDeclaredMethod("processMultiCountsAndKeys", new CtClass[] { ctStatementImpl, CtClass.intType, ctLongArray }),
EXCEPTION_INTERCEPTOR_GETTER);
catchRuntimeException(clazz, clazz.getDeclaredMethod("removeOpenResultSet", new CtClass[] { ctResultSetInternalMethods }),
catchRuntimeException(clazz, clazz.getDeclaredMethod("notifyResultSetClose", new CtClass[] { ctResultSetInternalMethods }),
EXCEPTION_INTERCEPTOR_GETTER);
catchRuntimeException(clazz, clazz.getDeclaredMethod("resetCancelledState", new CtClass[] {}), EXCEPTION_INTERCEPTOR_GETTER);
catchRuntimeException(clazz, clazz.getDeclaredMethod("setHoldResultsOpenOverClose", new CtClass[] { CtClass.booleanType }),
Expand Down
2 changes: 0 additions & 2 deletions src/main/core-api/java/com/mysql/cj/MysqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public interface MysqlConnection {

void checkClosed();

void normalClose();

/**
* Destroys this connection and any underlying resources.
*
Expand Down
5 changes: 2 additions & 3 deletions src/main/core-api/java/com/mysql/cj/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ public enum CancelStatus {

/**
* Get the batched args as added by the addBatch method(s).
* The list is unmodifiable and might contain any combination of String,
* ClientPreparedQueryBindings, or ServerPreparedQueryBindings depending on how the parameters were
* batched.
* The list is unmodifiable and might contain any combination of String, ClientPreparedQueryBindings, or ServerPreparedQueryBindings depending on how the
* parameters were batched.
*
* @return an unmodifiable List of batched args
*/
Expand Down
3 changes: 1 addition & 2 deletions src/main/core-api/java/com/mysql/cj/protocol/Resultset.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ public static Type fromValue(int rsType, Type backupValue) {
Resultset getNextResultset();

/**
* Clears the reference to the next result set in a multi-result set
* "chain".
* Clears the reference to the next result set in a multi-result set "chain".
*/
void clearNextResultset();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ default void beforeLast() {

/**
* We're done.
*
*/
default void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public interface ResultsetRowsOwner {

void closeOwner(boolean calledExplicitly);
void closeOwner();

MysqlConnection getConnection();

Expand Down
6 changes: 3 additions & 3 deletions src/main/core-impl/java/com/mysql/cj/NativeSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class NativeSession extends CoreSession implements Serializable {
private boolean isClosed = true;

/** Why was this session implicitly closed, if known? (for diagnostics) */
private Throwable forceClosedReason;
private Throwable forceClosedReason = null;

private CopyOnWriteArrayList<WeakReference<SessionEventListener>> listeners = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -184,7 +184,7 @@ public void quit() {
super.quit();
}

// TODO: we should examine the call flow here, we shouldn't have to know about the socket connection but this should be address in a wider scope.
// TODO: we should examine the call flow here, we shouldn't have to know about the socket connection but this should be addressed in a wider scope.
@Override
public void forceClose() {
if (this.protocol != null) {
Expand All @@ -198,7 +198,7 @@ public void forceClose() {
} catch (Throwable t) {
// can't do anything about it, and we're forcibly aborting
}
//this.protocol = null; // TODO actually we shouldn't remove protocol instance because some it's methods can be called after closing socket
//this.protocol = null; // TODO actually we shouldn't remove protocol instance because some of its methods can be called after closing the socket
}
getSessionLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@
import com.mysql.cj.protocol.MessageSender;
import com.mysql.cj.protocol.PacketReceivedTimeHolder;
import com.mysql.cj.protocol.PacketSentTimeHolder;
import com.mysql.cj.protocol.Protocol;
import com.mysql.cj.protocol.ProtocolEntity;
import com.mysql.cj.protocol.ProtocolEntityFactory;
import com.mysql.cj.protocol.ProtocolEntityReader;
Expand Down Expand Up @@ -146,7 +145,7 @@
import com.mysql.cj.util.TimeUtil;
import com.mysql.cj.util.Util;

public class NativeProtocol extends AbstractProtocol<NativePacketPayload> implements Protocol<NativePacketPayload>, RuntimePropertyListener {
public class NativeProtocol extends AbstractProtocol<NativePacketPayload> implements RuntimePropertyListener {

protected static final int INITIAL_PACKET_SIZE = 1024;
protected static final int COMP_HEADER_LENGTH = 3;
Expand Down Expand Up @@ -668,7 +667,6 @@ public final NativePacketPayload sendCommand(Message queryPacket, boolean skipCh
}

try {

checkForOutstandingStreamingData();

// Clear serverStatus...this value is guarded by an external mutex, as you can only ever be processing one command at a time
Expand Down Expand Up @@ -869,8 +867,8 @@ public void clearInputStream() {
try {
int len;

// Due to a bug in some older Linux kernels (fixed after the patch "tcp: fix FIONREAD/SIOCINQ"), our SocketInputStream.available() may return 1 even
// if there is no data in the Stream, so, we need to check if InputStream.skip() actually skipped anything.
// Due to a bug in some older Linux kernels (fixed after the patch "tcp: fix FIONREAD/SIOCINQ"), SocketInputStream.available() may return 1 even
// if there is no data in the Stream. Checking the result of InputStream.skip() confirms if it actually skipped anything.
while ((len = this.socketConnection.getMysqlInput().available()) > 0 && this.socketConnection.getMysqlInput().skip(len) > 0) {
continue;
}
Expand Down Expand Up @@ -1213,7 +1211,6 @@ public final void skipPacket() {

/**
* Log-off of the MySQL server and close the socket.
*
*/
public final void quit() {
try {
Expand Down Expand Up @@ -1876,19 +1873,17 @@ public void setStreamingData(ResultsetRows streamingData) {
this.streamingData = streamingData;
}

public void checkForOutstandingStreamingData() {
private void checkForOutstandingStreamingData() {
if (this.streamingData != null) {
boolean shouldClobber = this.propertySet.getBooleanProperty(PropertyKey.clobberStreamingResults).getValue();

if (!shouldClobber) {
throw ExceptionFactory.createException(Messages.getString("MysqlIO.39") + this.streamingData + Messages.getString("MysqlIO.40")
+ Messages.getString("MysqlIO.41") + Messages.getString("MysqlIO.42"), this.exceptionInterceptor);
throw ExceptionFactory.createException(Messages.getString("MysqlIO.39", new Object[] { this.streamingData }), this.exceptionInterceptor);
}

// Close the result set
this.streamingData.getOwner().closeOwner(false);
// Close the result set.
this.streamingData.getOwner().closeOwner();

// clear any pending data....
// Clear any pending data.
clearInputStream();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ public Resultset getNextResultset() {
}
}

/**
* We can't do this ourselves, otherwise the contract for
* Statement.getMoreResults() won't work correctly.
/*
* We can't do this ourselves, otherwise the contract for Statement.getMoreResults() won't work correctly.
*/
@Override
public void clearNextResultset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,39 +50,33 @@
import com.mysql.cj.util.Util;

/**
* Provides streaming of Resultset rows. Each next row is consumed from the
* input stream only on {@link #next()} call. Consumed rows are not cached thus
* we only stream result sets when they are forward-only, read-only, and the
* fetch size has been set to Integer.MIN_VALUE (rows are read one by one).
* Provides streaming of Resultset rows. Each next row is consumed from the input stream only on {@link #next()} call. Consumed rows are not cached thus result
* sets are streamed only when they are forward-only, read-only, and the fetch size has been set to Integer.MIN_VALUE (rows are read one by one).
*
* @param <T>
* ProtocolEntity type
*/
public class ResultsetRowsStreaming<T extends ProtocolEntity> extends AbstractResultsetRows implements ResultsetRows {

private NativeProtocol protocol;
private NativeMessageBuilder commandBuilder = null;
private ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory;

private boolean isAfterEnd = false;

private boolean noMoreRows = false;

private boolean isBinaryEncoded = false;
private final Lock lock = new ReentrantLock();

private Row nextRow;

private boolean isAfterEnd = false;
private boolean noMoreRows = false;
private boolean isBinaryEncoded = false;
private boolean streamerClosed = false;

private ExceptionInterceptor exceptionInterceptor;

private ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory;

private NativeMessageBuilder commandBuilder = null;
private final Lock lock = new ReentrantLock();

/**
* Creates a new RowDataDynamic object.
* Creates a new ResultsetRowsStreaming object.
*
* @param io
* @param protocol
* the connection to MySQL that this data is coming from
* @param columnDefinition
* the metadata that describe this data
Expand All @@ -91,9 +85,9 @@ public class ResultsetRowsStreaming<T extends ProtocolEntity> extends AbstractRe
* @param resultSetFactory
* {@link ProtocolEntityFactory}
*/
public ResultsetRowsStreaming(NativeProtocol io, ColumnDefinition columnDefinition, boolean isBinaryEncoded,
public ResultsetRowsStreaming(NativeProtocol protocol, ColumnDefinition columnDefinition, boolean isBinaryEncoded,
ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) {
this.protocol = io;
this.protocol = protocol;
this.isBinaryEncoded = isBinaryEncoded;
this.metadata = columnDefinition;
this.exceptionInterceptor = this.protocol.getExceptionInterceptor();
Expand All @@ -115,14 +109,12 @@ public void close() {
while (next() != null) {
hadMore = true;
howMuchMore++;

if (howMuchMore % 100 == 0) {
Thread.yield();
}
}

if (!this.protocol.getPropertySet().getBooleanProperty(PropertyKey.clobberStreamingResults).getValue()
&& this.protocol.getPropertySet().getIntegerProperty(PropertyKey.netTimeoutForStreamingResults).getValue() > 0) {
if (this.protocol.getPropertySet().getIntegerProperty(PropertyKey.netTimeoutForStreamingResults).getValue() > 0) {
Session session = this.owner.getSession();
TelemetrySpan span = session.getTelemetryHandler().startSpan(TelemetrySpanName.SET_VARIABLE, "net_write_timeout");
try (TelemetryScope scope = span.makeCurrent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,7 @@ MysqlIO.17=Attempt to close streaming result set
MysqlIO.18=\ when no streaming result set was registered. This is an internal error.
MysqlIO.23=Can not use streaming results with multiple result statements
MysqlIO.25=\ ... (truncated)
MysqlIO.39=Streaming result set
MysqlIO.40=\ is still active.
MysqlIO.41=\ No statements may be issued when any streaming result sets are open and in use on a given connection.
MysqlIO.42=\ Ensure that you have called .close() on any active streaming result sets before attempting more queries.
MysqlIO.39=Streaming result set {0} is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
MysqlIO.43=Unexpected end of input stream
MysqlIO.48=Unexpected end of input stream
MysqlIO.57=send() compressed packet:\n
Expand Down
41 changes: 41 additions & 0 deletions src/main/user-api/java/com/mysql/cj/jdbc/CloseOption.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2024, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by
* the Free Software Foundation.
*
* This program is designed to work with certain software that is licensed under separate terms, as designated in a particular file or component or in
* included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the
* separately licensed software that they have either included with the program or referenced in the documentation.
*
* Without limiting anything contained in the foregoing, this file, which is part of MySQL Connector/J, is also subject to the Universal FOSS Exception,
* version 1.0, a copy of which can be found at http://oss.oracle.com/licenses/universal-foss-exception.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details.
*
* You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/

package com.mysql.cj.jdbc;

import java.util.Arrays;

public enum CloseOption {

IMPLICIT, // Close operation initiated internally by a clean up routine.
FORCED, // A forced, hard close.
ROLLBACK, // Allow rollback during the close operation.
PROPAGATE, // Allow propagating the close operation to dependents and owner objects.
NO_CACHE; // Does not allow caching the closing object.

public boolean in(CloseOption... options) {
return Arrays.stream(options).anyMatch(this::equals);
}

public boolean notIn(CloseOption... options) {
return Arrays.stream(options).noneMatch(this::equals);
}

}
Loading

0 comments on commit de3e3f6

Please sign in to comment.