Skip to content

Commit

Permalink
Fix for Bug#97269 (30438500), POSSIBLE BUG IN
Browse files Browse the repository at this point in the history
COM.MYSQL.CJ.XDEVAPI.STREAMINGDOCRESULTBUILDER.
  • Loading branch information
soklakov committed May 12, 2021
1 parent 1198c8c commit 10aa4c9
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

Version 8.0.26

- Fix for Bug#97269 (30438500), POSSIBLE BUG IN COM.MYSQL.CJ.XDEVAPI.STREAMINGDOCRESULTBUILDER.

- Fix for Bug#103303 (32766143), JAVA.LANG.CLASSCASTEXCEPTION WHEN INSERTING BLOB WITH SERVER PREPARED STATEMENT.

- WL#14205, Support query attributes.
Expand Down
34 changes: 16 additions & 18 deletions src/main/protocol-impl/java/com/mysql/cj/protocol/x/Notice.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020, Oracle and/or its affiliates.
* Copyright (c) 2018, 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
Expand Down Expand Up @@ -50,23 +50,21 @@ public class Notice implements ProtocolEntity {

public static Notice getInstance(XMessage message) {
Frame notice = (Frame) message.getMessage();
if (notice.getScope() != Frame.Scope.GLOBAL) { // TODO should we handle global notices somehow? What frame types are applicable there?
switch (notice.getType()) {
case Frame.Type.WARNING_VALUE:
return new XWarning(notice);

case Frame.Type.SESSION_VARIABLE_CHANGED_VALUE:
return new XSessionVariableChanged(notice);

case Frame.Type.SESSION_STATE_CHANGED_VALUE:
return new XSessionStateChanged(notice);

case Frame.Type.GROUP_REPLICATION_STATE_CHANGED_VALUE:
// TODO
break;
default:
break;
}
switch (notice.getType()) {
case Frame.Type.WARNING_VALUE:
return new XWarning(notice);

case Frame.Type.SESSION_VARIABLE_CHANGED_VALUE:
return new XSessionVariableChanged(notice);

case Frame.Type.SESSION_STATE_CHANGED_VALUE:
return new XSessionStateChanged(notice);

case Frame.Type.GROUP_REPLICATION_STATE_CHANGED_VALUE:
// TODO
break;
default:
break;
}
return new Notice(notice);
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/protocol-impl/java/com/mysql/cj/protocol/x/XProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -698,11 +698,19 @@ public void drainRows() {
}

public ColumnDefinition readMetadata() {
return readMetadata(null);
}

public ColumnDefinition readMetadata(Consumer<Notice> noticeConsumer) {
try {
List<Notice> notices;
List<ColumnMetaData> fromServer = new LinkedList<>();
do { // use this construct to read at least one
fromServer.add((ColumnMetaData) this.reader.readMessage(null, ServerMessages.Type.RESULTSET_COLUMN_META_DATA_VALUE).getMessage());
// TODO put notices somewhere like it's done eg. in readStatementExecuteOk(): builder.addNotice(this.reader.read(Frame.class));
XMessage mess = this.reader.readMessage(null, ServerMessages.Type.RESULTSET_COLUMN_META_DATA_VALUE);
if (noticeConsumer != null && (notices = mess.getNotices()) != null) {
notices.stream().forEach(noticeConsumer::accept);
}
fromServer.add((ColumnMetaData) mess.getMessage());
} while (((SyncMessageReader) this.reader).getNextNonNoticeMessageType() == ServerMessages.Type.RESULTSET_COLUMN_META_DATA_VALUE);
ArrayList<Field> metadata = new ArrayList<>(fromServer.size());
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2020, Oracle and/or its affiliates.
* Copyright (c) 2019, 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
Expand Down Expand Up @@ -70,6 +70,7 @@ public boolean addProtocolEntity(ProtocolEntity entity) {

} else if (entity instanceof Notice) {
this.statementExecuteOkBuilder.addProtocolEntity(entity);
return false;
}

if (this.metadata == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2020, Oracle and/or its affiliates.
* Copyright (c) 2019, 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
Expand Down Expand Up @@ -102,7 +102,7 @@ public boolean addProtocolEntity(ProtocolEntity entity) {
});
this.lastEntity = null;
} else {
cd = this.protocol.readMetadata();
cd = this.protocol.readMetadata(this.statementExecuteOkBuilder::addProtocolEntity);
}
return new SqlSingleResult(cd, this.protocol.getServerSession().getDefaultTimeZone(), new XProtocolRowInputStream(cd, this.protocol, (n) -> {
this.statementExecuteOkBuilder.addProtocolEntity(n);
Expand Down
91 changes: 91 additions & 0 deletions src/test/java/testsuite/x/devapi/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@
import com.mysql.cj.xdevapi.ClientFactory;
import com.mysql.cj.xdevapi.ClientImpl;
import com.mysql.cj.xdevapi.ClientImpl.PooledXProtocol;
import com.mysql.cj.xdevapi.Collection;
import com.mysql.cj.xdevapi.DbDoc;
import com.mysql.cj.xdevapi.DocResult;
import com.mysql.cj.xdevapi.FindStatement;
import com.mysql.cj.xdevapi.JsonNumber;
import com.mysql.cj.xdevapi.JsonString;
import com.mysql.cj.xdevapi.Row;
import com.mysql.cj.xdevapi.RowResult;
import com.mysql.cj.xdevapi.Schema;
Expand Down Expand Up @@ -2566,4 +2571,90 @@ public void testExecAsyncNegative() throws Exception {
sqlUpdate("drop table if exists testExecAsyncNegative");
}
}

/**
* Test fix for Bug#97269 (30438500), POSSIBLE BUG IN COM.MYSQL.CJ.XDEVAPI.STREAMINGDOCRESULTBUILDER.
*
* @throws Exception
*/
@Test
public void testBug97269() throws Exception {
assumeTrue(this.isSetForXTests);

Session sess = null;
try {
String message1 = "W1";
String message2 = "W2";

// create notice message buffers
Frame.Builder notice1 = Frame.newBuilder().setScope(Frame.Scope.LOCAL).setType(Frame.Type.WARNING_VALUE)
.setPayload(com.mysql.cj.x.protobuf.MysqlxNotice.Warning.newBuilder().setCode(MysqlErrorNumbers.ER_BAD_DB_ERROR).setMsg(message1).build()
.toByteString());
Frame.Builder notice2 = Frame.newBuilder().setScope(Frame.Scope.GLOBAL).setType(Frame.Type.WARNING_VALUE)
.setPayload(com.mysql.cj.x.protobuf.MysqlxNotice.Warning.newBuilder().setCode(MysqlErrorNumbers.ER_BAD_DB_ERROR).setMsg(message2).build()
.toByteString());

byte[] notice1Bytes = makeNoticeBytes(notice1.build());
byte[] notice2Bytes = makeNoticeBytes(notice2.build());
int size = notice1Bytes.length + notice2Bytes.length;
byte[] noticesBytes = new byte[size];
System.arraycopy(notice1Bytes, 0, noticesBytes, 0, notice1Bytes.length);
System.arraycopy(notice2Bytes, 0, noticesBytes, notice1Bytes.length, notice2Bytes.length);

InjectedSocketFactory.flushAllStaticData();

String url = this.baseUrl + (this.baseUrl.contains("?") ? "" : "?")
+ makeParam(PropertyKey.socketFactory, InjectedSocketFactory.class.getName(), !this.baseUrl.contains("?") || this.baseUrl.endsWith("?"))
+ makeParam(PropertyKey.xdevapiSslMode, XdevapiSslMode.DISABLED.toString())
+ makeParam(PropertyKey.xdevapiCompression, Compression.DISABLED.toString())
// to allow injection between result rows
+ makeParam(PropertyKey.useReadAheadInput, "false");

sess = this.fact.getSession(url);
SocketFactory sf = ((SessionImpl) sess).getSession().getProtocol().getSocketConnection().getSocketFactory();
assertTrue(InjectedSocketFactory.class.isAssignableFrom(sf.getClass()));

Collection collection = sess.getDefaultSchema().createCollection("testBug97269");
collection.add("{\"_id\":\"the_id\",\"g\":1}").execute();

// StreamingDocResultBuilder
InjectedSocketFactory.injectedBuffer = noticesBytes;
DocResult docs = collection.find().fields("$._id as _id, $.g as g, 1 + 1 as q").execute();
DbDoc doc = docs.next();
assertEquals("the_id", ((JsonString) doc.get("_id")).getString());
assertEquals(new Integer(1), ((JsonNumber) doc.get("g")).getInteger());
assertEquals(new Integer(2), ((JsonNumber) doc.get("q")).getInteger());

int cnt = 0;
for (Iterator<Warning> warn = docs.getWarnings(); warn.hasNext();) {
Warning w = warn.next();
if (w.getMessage().equals(message1) || w.getMessage().equals(message2)) {
cnt++;
}
}
assertEquals(2, cnt);

InjectedSocketFactory.flushAllStaticData();
InjectedSocketFactory.injectedBuffer = noticesBytes;

SqlResult rs1 = sess.sql("select 1").execute();
assertEquals(1, rs1.fetchOne().getInt(0));
cnt = 0;
for (Iterator<Warning> warn = rs1.getWarnings(); warn.hasNext();) {
Warning w = warn.next();
if (w.getMessage().equals(message1) || w.getMessage().equals(message2)) {
cnt++;
}
}
assertEquals(2, cnt);

} finally {
InjectedSocketFactory.flushAllStaticData();
dropCollection("testBug97269");
if (sess != null) {
sess.close();
}
}

}
}

0 comments on commit 10aa4c9

Please sign in to comment.