Skip to content

Commit

Permalink
Merge branch '10.5.x' into 10.6.x
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentJenkins committed Jan 5, 2023
2 parents 2d51b07 + 52e0f1e commit 448ab77
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.connect.jdbc.dialect;

import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.JdbcSinkConfig.InsertMode;
import io.confluent.connect.jdbc.sink.JdbcSinkConfig.PrimaryKeyMode;
import io.confluent.connect.jdbc.sink.PreparedStatementBinder;
Expand Down Expand Up @@ -141,7 +142,27 @@ protected boolean maybeBindPrimitive(

if (schema.type() == Type.STRING) {
if (colDef.type() == Types.CLOB) {
statement.setCharacterStream(index, new StringReader((String) value));
final int upsertValueLimit = 4000;
boolean valueBinded = false;
long valueLength = ((String) value).length();
if (this.config instanceof JdbcSinkConfig) {
String insertMode = this.config.getString(JdbcSinkConfig.INSERT_MODE);
if (insertMode != null && !insertMode.isEmpty()) {
// UPSERT mode uses MERGE statement in the query. The oracle driver requires values of
// length more than 4000 to be LOB binded in this case.
if (InsertMode.valueOf(insertMode.toUpperCase()) == InsertMode.UPSERT) {
if (valueLength < upsertValueLimit) {
statement.setCharacterStream(index, new StringReader((String) value), valueLength);
} else {
statement.setCharacterStream(index, new StringReader((String) value));
}
valueBinded = true;
}
}
}
if (!valueBinded) {
statement.setCharacterStream(index, new StringReader((String) value), valueLength);
}
return true;
} else if (colDef.type() == Types.NCLOB) {
statement.setNCharacterStream(index, new StringReader((String) value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ protected OracleDatabaseDialect createDialect() {
@Test
public void bindFieldStringValue() throws SQLException {
int index = ThreadLocalRandom.current().nextInt();
verifyBindField(++index, Schema.STRING_SCHEMA, "yep").setCharacterStream(eq(index), any(StringReader.class));
String value = "yep";
verifyBindField(++index, Schema.STRING_SCHEMA, value).setCharacterStream(eq(index), any(StringReader.class), eq((long)value.length()));
}

@Override
Expand Down Expand Up @@ -344,7 +345,7 @@ public void shouldBindStringAccordingToColumnDef() throws SQLException {
verify(stmtNvarchar, times(1)).setNString(index, value);

dialect.bindField(stmtClob, index, schema, value, colDefClob);
verify(stmtClob, times(1)).setCharacterStream(eq(index), any(StringReader.class));
verify(stmtClob, times(1)).setCharacterStream(eq(index), any(StringReader.class), eq((long) value.length()));
}

@Test
Expand Down

0 comments on commit 448ab77

Please sign in to comment.