Skip to content

Commit

Permalink
Fix for Bug#103878 (32954449), CONNECTOR/J 8 : QUERY WITH 'SHOW XXX'
Browse files Browse the repository at this point in the history
WILL GET EXCEPTION WHEN USE CURSOR.
  • Loading branch information
soklakov committed Jul 16, 2021
1 parent 2ff451f commit 408a1b6
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

Version 8.0.27

- Fix for Bug#103878 (32954449), CONNECTOR/J 8 : QUERY WITH 'SHOW XXX' WILL GET EXCEPTION WHEN USE CURSOR.

- Fix for Bug#103796 (32922715), CONNECTOR/J 8 STMT SETQUERYTIMEOUT CAN NOT WORK.
Thanks to Hong Wang for his contribution.

Expand Down
32 changes: 31 additions & 1 deletion src/main/core-api/java/com/mysql/cj/protocol/MessageReader.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020, Oracle and/or its affiliates.
* Copyright (c) 2018, 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
Expand Down Expand Up @@ -46,6 +46,18 @@ public interface MessageReader<H extends MessageHeader, M extends Message> {
*/
H readHeader() throws IOException;

/**
* Read the next message header from server, possibly blocking indefinitely until the message is received,
* and cache it so that the next {@link #readHeader()} return the same header.
*
* @return {@link MessageHeader} of the next message
* @throws IOException
* if an error occurs
*/
default H probeHeader() throws IOException {
return readHeader();
}

/**
* Read message from server into to the given {@link Message} instance or into the new one if not present.
* For asynchronous channel it synchronously reads the next message in the stream, blocking until the message is read fully.
Expand All @@ -61,6 +73,24 @@ public interface MessageReader<H extends MessageHeader, M extends Message> {
*/
M readMessage(Optional<M> reuse, H header) throws IOException;

/**
* Read message from server into to the given {@link Message} instance or into the new one if not present
* and cache it so that the next {@link #readMessage(Optional, MessageHeader)} return the same message.
* For asynchronous channel it synchronously reads the next message in the stream, blocking until the message is read fully.
* Could throw CJCommunicationsException wrapping an {@link IOException} during read or parse
*
* @param reuse
* {@link Message} object to reuse. May be ignored by implementation.
* @param header
* {@link MessageHeader} instance
* @return {@link Message} instance
* @throws IOException
* if an error occurs
*/
default M probeMessage(Optional<M> reuse, H header) throws IOException {
return readMessage(reuse, header);
}

/**
* Read message from server into to the given {@link Message} instance or into the new one if not present.
* For asynchronous channel it synchronously reads the next message in the stream, blocking until the message is read fully.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,25 @@ public Resultset read(int maxRows, boolean streamResults, NativePacketPayload re
boolean isCursorPossible = this.protocol.getPropertySet().getBooleanProperty(PropertyKey.useCursorFetch).getValue()
&& resultSetFactory.getResultSetType() == Type.FORWARD_ONLY && resultSetFactory.getFetchSize() > 0;

// There is no EOF packet after fields when CLIENT_DEPRECATE_EOF is set;
// if we asked to use cursor then there should be an OK or an ERR packet here
// At this point 3 types of packets are expected:
// 1. If CLIENT_DEPRECATE_EOF is not set then an EOF packet is always expected to be the next one.
// 2. If CLIENT_DEPRECATE_EOF is set and a cursor was created then the next packet is an OK with 0xFE signature.
// 3. If CLIENT_DEPRECATE_EOF is set and a cursor was not created then the next packet is a ProtocolBinary::ResultsetRow.
// If CLIENT_DEPRECATE_EOF is set, there is no way to tell which one, OK or ResultsetRow, is the next packet, so it should be read with a special caching method.
if (isCursorPossible || !this.protocol.getServerSession().isEOFDeprecated()) {
NativePacketPayload rowPacket = this.protocol.readMessage(this.protocol.getReusablePacket());
// Read the next packet but leave it in the reader cache. In case it's not the OK or EOF one it will be read again by ResultSet factories.
NativePacketPayload rowPacket = this.protocol.probeMessage(this.protocol.getReusablePacket());
this.protocol.checkErrorMessage(rowPacket);
this.protocol.readServerStatusForResultSets(rowPacket, true);
if (rowPacket.isResultSetOKPacket() || rowPacket.isEOFPacket()) {
// Consume the OK/EOF packet from the reader cache and read the status flags from it;
// The SERVER_STATUS_CURSOR_EXISTS flag should indicate the cursor state in this case.
rowPacket = this.protocol.readMessage(this.protocol.getReusablePacket());
this.protocol.readServerStatusForResultSets(rowPacket, true);
} else {
// If it's not an OK/EOF then the cursor is not created and this recent packet is a row.
// Retain the packet in the reader cache.
isCursorPossible = false;
}
}

ResultsetRows rows = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2020, Oracle and/or its affiliates.
* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
Expand Down Expand Up @@ -63,11 +63,17 @@ public DebugBufferingPacketReader(MessageReader<NativePacketHeader, NativePacket

@Override
public NativePacketHeader readHeader() throws IOException {

byte prevPacketSeq = this.packetReader.getMessageSequence();
return readHeaderLocal(prevPacketSeq, this.packetReader.readHeader());
}

NativePacketHeader hdr = this.packetReader.readHeader();
@Override
public NativePacketHeader probeHeader() throws IOException {
byte prevPacketSeq = this.packetReader.getMessageSequence();
return readHeaderLocal(prevPacketSeq, this.packetReader.probeHeader());
}

private NativePacketHeader readHeaderLocal(byte prevPacketSeq, NativePacketHeader hdr) throws IOException {
// Normally we shouldn't get into situation of getting packets out of order from server,
// so we do this check only in debug mode.
byte currPacketSeq = hdr.getMessageSequence();
Expand Down Expand Up @@ -124,6 +130,36 @@ public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, Nati
return buf;
}

@Override
public NativePacketPayload probeMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
int packetLength = header.getMessageSize();
NativePacketPayload buf = this.packetReader.probeMessage(reuse, header);

int bytesToDump = Math.min(MAX_PACKET_DUMP_LENGTH, packetLength);
String PacketPayloadImpl = StringUtils.dumpAsHex(buf.getByteBuffer(), bytesToDump);

StringBuilder packetDump = new StringBuilder(DEBUG_MSG_LEN + NativeConstants.HEADER_LENGTH + PacketPayloadImpl.length());
packetDump.append("Server ");
packetDump.append(reuse.isPresent() ? "(re-used) " : "(new) ");
packetDump.append(buf.toString());
packetDump.append(" --------------------> Client\n");
packetDump.append("\nPacket payload:\n\n");
packetDump.append(this.lastHeaderPayload);
packetDump.append(PacketPayloadImpl);

if (bytesToDump == MAX_PACKET_DUMP_LENGTH) {
packetDump.append("\nNote: Packet of " + packetLength + " bytes truncated to " + MAX_PACKET_DUMP_LENGTH + " bytes.\n");
}

if ((this.packetDebugBuffer.size() + 1) > this.packetDebugBufferSize.getValue()) {
this.packetDebugBuffer.removeFirst();
}

this.packetDebugBuffer.addLast(packetDump);

return buf;
}

@Override
public byte getMessageSequence() {
return this.packetReader.getMessageSequence();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2020, Oracle and/or its affiliates.
* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
Expand Down Expand Up @@ -54,6 +54,11 @@ public NativePacketHeader readHeader() throws IOException {
return this.packetReader.readHeader();
}

@Override
public NativePacketHeader probeHeader() throws IOException {
return this.packetReader.probeHeader();
}

@Override
public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {

Expand Down Expand Up @@ -93,6 +98,45 @@ public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, Nati
return buf;
}

@Override
public NativePacketPayload probeMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {

int packetLength = header.getMessageSize();
NativePacketPayload buf = this.packetReader.probeMessage(reuse, header);

if (packetLength == NativeConstants.MAX_PACKET_SIZE) { // it's a multi-packet

buf.setPosition(NativeConstants.MAX_PACKET_SIZE);

NativePacketPayload multiPacket = null;
int multiPacketLength = -1;
byte multiPacketSeq = getMessageSequence();

do {
NativePacketHeader hdr = readHeader();
multiPacketLength = hdr.getMessageSize();

if (multiPacket == null) {
multiPacket = new NativePacketPayload(multiPacketLength);
}

multiPacketSeq++;
if (multiPacketSeq != hdr.getMessageSequence()) {
throw new IOException(Messages.getString("PacketReader.10"));
}

this.packetReader.probeMessage(Optional.of(multiPacket), hdr);

buf.writeBytes(StringLengthDataType.STRING_FIXED, multiPacket.getByteBuffer(), 0, multiPacketLength);

} while (multiPacketLength == NativeConstants.MAX_PACKET_SIZE);

buf.setPosition(0);
}

return buf;
}

@Override
public byte getMessageSequence() {
return this.packetReader.getMessageSequence();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public boolean isErrorPacket() {
* @return true if it is a EOF packet
*/
public final boolean isEOFPacket() {
return (this.byteBuffer[0] & 0xff) == TYPE_ID_EOF && (getPayloadLength() <= 5);
return (this.byteBuffer[0] & 0xff) == TYPE_ID_EOF && (this.payloadLength <= 5);
}

/**
Expand Down Expand Up @@ -247,7 +247,7 @@ public final boolean isOKPacket() {
* @return true if it is an OK packet for ResultSet
*/
public final boolean isResultSetOKPacket() {
return (this.byteBuffer[0] & 0xff) == TYPE_ID_EOF && (getPayloadLength() < 16777215);
return (this.byteBuffer[0] & 0xff) == TYPE_ID_EOF && (this.payloadLength > 5) && (this.payloadLength < 16777215);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,22 @@ public final NativePacketPayload readMessage(NativePacketPayload reuse) {
}
}

public final NativePacketPayload probeMessage(NativePacketPayload reuse) {
try {
NativePacketHeader header = this.packetReader.probeHeader();
NativePacketPayload buf = this.packetReader.probeMessage(Optional.ofNullable(reuse), header);
this.packetSequence = header.getMessageSequence();
return buf;

} catch (IOException ioEx) {
throw ExceptionFactory.createCommunicationsException(this.propertySet, this.serverSession, this.getPacketSentTimeHolder(),
this.getPacketReceivedTimeHolder(), ioEx, getExceptionInterceptor());
} catch (OutOfMemoryError oom) {
throw ExceptionFactory.createException(oom.getMessage(), MysqlErrorNumbers.SQL_STATE_MEMORY_ALLOCATION_ERROR, 0, false, oom,
this.exceptionInterceptor);
}
}

/**
* @param packet
* {@link Message}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2020, Oracle and/or its affiliates.
* Copyright (c) 2016, 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
Expand Down Expand Up @@ -49,25 +49,40 @@ public class SimplePacketReader implements MessageReader<NativePacketHeader, Nat

private byte readPacketSequence = -1;

NativePacketHeader lastHeader = null;
NativePacketPayload lastMessage = null;

public SimplePacketReader(SocketConnection socketConnection, RuntimeProperty<Integer> maxAllowedPacket) {
this.socketConnection = socketConnection;
this.maxAllowedPacket = maxAllowedPacket;
}

@Override
public NativePacketHeader readHeader() throws IOException {
if (this.lastHeader == null) {
return readHeaderLocal();
}
NativePacketHeader hdr = this.lastHeader;
this.lastHeader = null;
this.readPacketSequence = hdr.getMessageSequence();
return hdr;
}

@Override
public NativePacketHeader probeHeader() throws IOException {
this.lastHeader = readHeaderLocal();
return this.lastHeader;
}

private NativePacketHeader readHeaderLocal() throws IOException {
NativePacketHeader hdr = new NativePacketHeader();

try {
this.socketConnection.getMysqlInput().readFully(hdr.getBuffer().array(), 0, NativeConstants.HEADER_LENGTH);

int packetLength = hdr.getMessageSize();

if (packetLength > this.maxAllowedPacket.getValue()) {
throw new CJPacketTooBigException(packetLength, this.maxAllowedPacket.getValue());
}

} catch (IOException | CJPacketTooBigException e) {
try {
this.socketConnection.forceClose();
Expand All @@ -78,38 +93,52 @@ public NativePacketHeader readHeader() throws IOException {
}

this.readPacketSequence = hdr.getMessageSequence();

return hdr;
}

@Override
public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
if (this.lastMessage == null) {
return readMessageLocal(reuse, header);
}
NativePacketPayload buf = this.lastMessage;
this.lastMessage = null;
return buf;
}

@Override
public NativePacketPayload probeMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
this.lastMessage = readMessageLocal(reuse, header);
return this.lastMessage;
}

private NativePacketPayload readMessageLocal(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {
try {
int packetLength = header.getMessageSize();
NativePacketPayload buf;
NativePacketPayload message;
if (reuse.isPresent()) {
buf = reuse.get();
message = reuse.get();
// Set the Buffer to it's original state
buf.setPosition(0);
message.setPosition(0);
// Do we need to re-alloc the byte buffer?
if (buf.getByteBuffer().length < packetLength) {
if (message.getByteBuffer().length < packetLength) {
// Note: We actually check the length of the buffer, rather than getBufLength(), because getBufLength()
// is not necessarily the actual length of the byte array used as the buffer
buf.setByteBuffer(new byte[packetLength]);
message.setByteBuffer(new byte[packetLength]);
}

// Set the new length
buf.setPayloadLength(packetLength);
message.setPayloadLength(packetLength);
} else {
buf = new NativePacketPayload(new byte[packetLength]);
message = new NativePacketPayload(new byte[packetLength]);
}

// Read the data from the server
int numBytesRead = this.socketConnection.getMysqlInput().readFully(buf.getByteBuffer(), 0, packetLength);
int numBytesRead = this.socketConnection.getMysqlInput().readFully(message.getByteBuffer(), 0, packetLength);
if (numBytesRead != packetLength) {
throw new IOException(Messages.getString("PacketReader.1", new Object[] { packetLength, numBytesRead }));
}
return buf;
return message;

} catch (IOException e) {
try {
Expand Down
Loading

0 comments on commit 408a1b6

Please sign in to comment.