Skip to content

Commit

Permalink
[pulsar-io-jdbc] not set action as insert (apache#4862)
Browse files Browse the repository at this point in the history
### Motivation

jdbc sink treat all record as INSERT before apache#4358 , now it requires an indispensable action property which seems to be a break change, and we can deal records without any action property as INSERT.

### Modifications

treat action not set as INSERT action like before.
  • Loading branch information
yittg authored and sijie committed Aug 2, 2019
1 parent 434c395 commit d8356d8
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,29 @@ private void flush() {
// bind each record value
for (Record<T> record : swapList) {
String action = record.getProperties().get(ACTION);
if (action != null && action.equals(DELETE)) {
bindValue(deleteStatment, record, action);
count += 1;
deleteStatment.execute();
} else if (action != null && action.equals(UPDATE)) {
bindValue(updateStatment, record, action);
count += 1;
updateStatment.execute();
} else if (action != null && action.equals(INSERT)){
bindValue(insertStatement, record, action);
count += 1;
insertStatement.execute();
if (action == null) {
action = INSERT;
}
switch (action) {
case DELETE:
bindValue(deleteStatment, record, action);
count += 1;
deleteStatment.execute();
break;
case UPDATE:
bindValue(updateStatment, record, action);
count += 1;
updateStatment.execute();
break;
case INSERT:
bindValue(insertStatement, record, action);
count += 1;
insertStatement.execute();
break;
default:
String msg = String.format("Unsupported action %s, can be one of %s, or not set which indicate %s",
action, Arrays.asList(INSERT, UPDATE, DELETE), INSERT);
throw new IllegalArgumentException(msg);
}
}
connection.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.pulsar.io.jdbc;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
Expand All @@ -40,6 +44,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -95,7 +100,6 @@ public void setUp() throws Exception {
// change batchSize to 1, to flush on each write.
conf.put("batchSize", 1);

jdbcSink = new JdbcAutoSchemaSink();
jdbcSink = new JdbcAutoSchemaSink();

// open should success
Expand All @@ -109,8 +113,7 @@ public void tearDown() throws Exception {
jdbcSink.close();
}

@Test
public void TestOpenAndWriteSink() throws Exception {
private void testOpenAndWriteSink(Map<String, String> actionProperties) throws Exception {
Message<GenericRecord> insertMessage = mock(MessageImpl.class);
GenericSchema<GenericRecord> genericAvroSchema;
// prepare a foo Record
Expand All @@ -121,19 +124,16 @@ public void TestOpenAndWriteSink() throws Exception {
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());

byte[] insertBytes = schema.encode(insertObj);

CompletableFuture<Void> future = new CompletableFuture<>();
Record<GenericRecord> insertRecord = PulsarRecord.<GenericRecord>builder()
.message(insertMessage)
.topicName("fake_topic_name")
.ackFunction(() -> {})
.ackFunction(() -> future.complete(null))
.build();

genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());

Map<String, String> insertProperties = Maps.newHashMap();
insertProperties.put("ACTION", "INSERT");
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
when(insertMessage.getProperties()).thenReturn(insertProperties);
when(insertMessage.getProperties()).thenReturn(actionProperties);
log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
insertObj.toString(),
insertMessage.getValue().toString(),
Expand All @@ -143,7 +143,7 @@ public void TestOpenAndWriteSink() throws Exception {
jdbcSink.write(insertRecord);
log.info("executed write");
// sleep to wait backend flush complete
Thread.sleep(1000);
future.get(1, TimeUnit.SECONDS);

// value has been written to db, read it out and verify.
String querySql = "SELECT * FROM " + tableName + " WHERE field3=3";
Expand All @@ -156,6 +156,25 @@ public void TestOpenAndWriteSink() throws Exception {

}

@Test
public void TestInsertAction() throws Exception {
testOpenAndWriteSink(ImmutableMap.of("ACTION", "INSERT"));
}

@Test
public void TestNoAction() throws Exception {
testOpenAndWriteSink(ImmutableMap.of());
}

@Test
public void TestUnknownAction() throws Exception {
Record<GenericRecord> recordRecord = mock(Record.class);
when(recordRecord.getProperties()).thenReturn(ImmutableMap.of("ACTION", "UNKNOWN"));
CompletableFuture<Void> future = new CompletableFuture<>();
doAnswer(a -> future.complete(null)).when(recordRecord).fail();
jdbcSink.write(recordRecord);
future.get(1, TimeUnit.SECONDS);
}

@Test
public void TestUpdateAction() throws Exception {
Expand All @@ -169,10 +188,11 @@ public void TestUpdateAction() throws Exception {

byte[] updateBytes = schema.encode(updateObj);
Message<GenericRecord> updateMessage = mock(MessageImpl.class);
CompletableFuture<Void> future = new CompletableFuture<>();
Record<GenericRecord> updateRecord = PulsarRecord.<GenericRecord>builder()
.message(updateMessage)
.topicName("fake_topic_name")
.ackFunction(() -> {})
.ackFunction(() -> future.complete(null))
.build();

GenericSchema<GenericRecord> updateGenericAvroSchema;
Expand All @@ -188,8 +208,7 @@ public void TestUpdateAction() throws Exception {
updateRecord.getValue().toString());

jdbcSink.write(updateRecord);

Thread.sleep(1000);
future.get(1, TimeUnit.SECONDS);

// value has been written to db, read it out and verify.
String updateQuerySql = "SELECT * FROM " + tableName + " WHERE field3=4";
Expand All @@ -210,10 +229,11 @@ public void TestDeleteAction() throws Exception {

byte[] deleteBytes = schema.encode(deleteObj);
Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
CompletableFuture<Void> future = new CompletableFuture<>();
Record<GenericRecord> deleteRecord = PulsarRecord.<GenericRecord>builder()
.message(deleteMessage)
.topicName("fake_topic_name")
.ackFunction(() -> {})
.ackFunction(() -> future.complete(null))
.build();

GenericSchema<GenericRecord> deleteGenericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
Expand All @@ -228,8 +248,7 @@ public void TestDeleteAction() throws Exception {
deleteRecord.getValue().toString());

jdbcSink.write(deleteRecord);

Thread.sleep(1000);
future.get(1, TimeUnit.SECONDS);

// value has been written to db, read it out and verify.
String deleteQuerySql = "SELECT * FROM " + tableName + " WHERE field3=5";
Expand Down

0 comments on commit d8356d8

Please sign in to comment.