Skip to content

Commit

Permalink
GEODE-6010: change create jdbc-mapping to alter region and create asy…
Browse files Browse the repository at this point in the history
…nc-queue (apache#2836)

create jdbc-mapping now also does the following:
-- alters the region to have a JdbcLoader as its cache-loader
-- if synchronous, alters the region to have a JdbcWriter as its cache-writer
-- if asynchronous, creates an async-event-queue and alters regions to have it
The async-event-queue is created parallel for partitioned regions, and serial for all other regions.

create jdbc-mapping now requires cluster configuration and uses it to check
all required preconditions before the command starts changing things.
 
Co-authored-by: Darrel Schneider <[email protected]>
Co-authored-by: Jianxia Chen <[email protected]>
Co-authored-by: Scott Jewell <[email protected]>
  • Loading branch information
dschneider-pivotal authored Nov 14, 2018
1 parent 09dd194 commit 0dc46ea
Show file tree
Hide file tree
Showing 9 changed files with 1,242 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.connectors.jdbc.internal.cli.CreateMappingCommand;
import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.pdx.ReflectionBasedAutoSerializer;
import org.apache.geode.pdx.internal.AutoSerializableManager;
Expand Down Expand Up @@ -179,8 +180,11 @@ private void closeDB() throws SQLException {
@Test
public void throwsExceptionWhenNoMappingExistsUsingWriter() throws Exception {
createTable();
createRegionUsingGfsh(true, false, false);
createJdbcConnection();
StringBuffer createRegionCmd = new StringBuffer();
createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE"
+ " --cache-writer=" + JdbcWriter.class.getName());
gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
createJdbcDataSource();

server.invoke(() -> {
PdxInstance pdxEmployee1 =
Expand All @@ -197,8 +201,12 @@ public void throwsExceptionWhenNoMappingExistsUsingWriter() throws Exception {
public void throwsExceptionWhenNoMappingExistsUsingAsyncWriter() throws Exception {
createTable();
IgnoredException.addIgnoredException("JdbcConnectorException");
createRegionUsingGfsh(false, true, false);
createJdbcConnection();
StringBuffer createRegionCmd = new StringBuffer();
createAsyncListener("JAW");
createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE"
+ " --async-event-queue-id=JAW");
gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
createJdbcDataSource();

server.invoke(() -> {
PdxInstance pdxEmployee1 =
Expand All @@ -212,32 +220,14 @@ public void throwsExceptionWhenNoMappingExistsUsingAsyncWriter() throws Exceptio
await().untilAsserted(() -> {
assertThat(asyncWriter.getFailedEvents()).isEqualTo(1);
});

});
}

@Test
public void throwsExceptionWhenNoMappingMatches() throws Exception {
createTable();
createRegionUsingGfsh(true, false, false);
createJdbcConnection();

server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("name", "Emp1").writeInt("age", 55).create();
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
assertThatThrownBy(() -> region.put("key1", pdxEmployee1))
.isExactlyInstanceOf(JdbcConnectorException.class).hasMessage(
"JDBC mapping for region employees not found. Create the mapping with the gfsh command 'create jdbc-mapping'.");
});
}

@Test
public void throwsExceptionWhenNoDataSourceExists() throws Exception {
createTable();
createRegionUsingGfsh(true, false, false);
createMapping(REGION_NAME, DATA_SOURCE_NAME);
createRegionUsingGfsh();
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);

server.invoke(() -> {
PdxInstance pdxEmployee1 =
Expand All @@ -262,9 +252,9 @@ public void verifyDateToDate() throws Exception {
"Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
+ TestDate.DATE_FIELD_NAME + " date)");
});
createRegionUsingGfsh(true, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName());
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
final String key = "emp1";
final java.sql.Date sqlDate = java.sql.Date.valueOf("1982-09-11");
final Date jdkDate = new Date(sqlDate.getTime());
Expand Down Expand Up @@ -297,9 +287,9 @@ public void verifyDateToTime() throws Exception {
"Create Table " + TABLE_NAME + " (id varchar(10) primary key not null, "
+ TestDate.DATE_FIELD_NAME + " time)");
});
createRegionUsingGfsh(true, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName());
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
final String key = "emp1";
final java.sql.Time sqlTime = java.sql.Time.valueOf("23:59:59");
final Date jdkDate = new Date(sqlTime.getTime());
Expand Down Expand Up @@ -327,9 +317,9 @@ public void verifyDateToTimestamp() throws Exception {
server = startupRule.startServerVM(1, x -> x.withConnectionToLocator(locator.getPort()));
createTableWithTimeStamp(server, connectionUrl, TABLE_NAME, TestDate.DATE_FIELD_NAME);

createRegionUsingGfsh(true, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName());
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, TestDate.class.getName(), true);
final String key = "emp1";
final java.sql.Timestamp sqlTimestamp = java.sql.Timestamp.valueOf("1982-09-11 23:59:59.123");
final Date jdkDate = new Date(sqlTimestamp.getTime());
Expand Down Expand Up @@ -365,9 +355,9 @@ protected void createTableWithTimeStamp(MemberVM vm, String connectionUrl, Strin
@Test
public void putWritesToDB() throws Exception {
createTable();
createRegionUsingGfsh(true, false, false);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME);
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
Expand All @@ -382,9 +372,26 @@ public void putWritesToDB() throws Exception {
@Test
public void putAsyncWritesToDB() throws Exception {
createTable();
createRegionUsingGfsh(true, false, false);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME);
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "key1").writeString("name", "Emp1").writeInt("age", 55).create();

String key = "emp1";
ClusterStartupRule.getCache().getRegion(REGION_NAME).put(key, pdxEmployee1);
assertTableHasEmployeeData(1, pdxEmployee1, key);
});
}

@Test
public void putAsyncWithPartitionWritesToDB() throws Exception {
createTable();
createPartitionRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
Expand All @@ -399,9 +406,9 @@ public void putAsyncWritesToDB() throws Exception {
@Test
public void getReadsFromEmptyDB() throws Exception {
createTable();
createRegionUsingGfsh(false, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME);
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
server.invoke(() -> {
String key = "emp1";
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
Expand All @@ -414,9 +421,9 @@ public void getReadsFromEmptyDB() throws Exception {
@Test
public void getReadsFromDB() throws Exception {
createTable();
createRegionUsingGfsh(true, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME);
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, true);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
Expand All @@ -442,17 +449,18 @@ public void getReadsFromDB() throws Exception {
@Test
public void getReadsFromDBWithAsyncWriter() throws Exception {
createTable();
createRegionUsingGfsh(false, true, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME);
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, false);
server.invoke(() -> {
PdxInstance pdxEmployee1 =
ClusterStartupRule.getCache().createPdxInstanceFactory(Employee.class.getName())
.writeString("id", "id1").writeString("name", "Emp1").writeInt("age", 55).create();
String key = "id1";
Region<Object, Object> region = ClusterStartupRule.getCache().getRegion(REGION_NAME);
JdbcAsyncWriter asyncWriter = (JdbcAsyncWriter) ClusterStartupRule.getCache()
.getAsyncEventQueue("JAW").getAsyncEventListener();
.getAsyncEventQueue(CreateMappingCommand.createAsyncEventQueueName(REGION_NAME))
.getAsyncEventListener();

region.put(key, pdxEmployee1);
await().untilAsserted(() -> {
Expand All @@ -473,9 +481,9 @@ public void getReadsFromDBWithAsyncWriter() throws Exception {
@Test
public void getReadsFromDBWithPdxClassName() throws Exception {
createTable();
createRegionUsingGfsh(true, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName());
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, Employee.class.getName(), true);
server.invoke(() -> {
String key = "id1";
Employee value = new Employee(key, "Emp1", 55);
Expand All @@ -495,9 +503,9 @@ public void clientGetReadsFromDBWithPdxClassName() throws Exception {
ClientVM client = getClientVM();
createClientRegion(client);

createRegionUsingGfsh(true, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
client.invoke(() -> {
String key = "id1";
ClassWithSupportedPdxFields value =
Expand All @@ -519,9 +527,9 @@ public void clientPutsAndGetsWithNullFieldsWithPdxClassName() throws Exception {
ClientVM client = getClientVM();
createClientRegion(client);

createRegionUsingGfsh(true, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
client.invoke(() -> {
String key = "id1";
ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key);
Expand All @@ -540,9 +548,9 @@ public void clientRegistersPdxAndReadsFromDBWithPdxClassName() throws Exception
createTableForAllSupportedFields();
ClientVM client = getClientVM();
createClientRegion(client);
createRegionUsingGfsh(true, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
String key = "id1";
ClassWithSupportedPdxFields value =
new ClassWithSupportedPdxFields(key, true, (byte) 1, (short) 2,
Expand All @@ -569,9 +577,9 @@ public void clientRegistersPdxAndReadsFromDBContainingNullColumnsWithPdxClassNam
createTableForAllSupportedFields();
ClientVM client = getClientVM();
createClientRegion(client);
createRegionUsingGfsh(true, false, true);
createJdbcConnection();
createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName());
createRegionUsingGfsh();
createJdbcDataSource();
createMapping(REGION_NAME, DATA_SOURCE_NAME, ClassWithSupportedPdxFields.class.getName(), true);
String key = "id1";
ClassWithSupportedPdxFields value = new ClassWithSupportedPdxFields(key);

Expand Down Expand Up @@ -607,9 +615,9 @@ private void createClientRegion(ClientVM client) {
});
}

private void createJdbcConnection() {
private void createJdbcDataSource() {
final String commandStr =
"create jndi-binding --type=POOLED --name=" + DATA_SOURCE_NAME + " --url=" + connectionUrl;
"create data-source --pooled --name=" + DATA_SOURCE_NAME + " --url=" + connectionUrl;
gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
}

Expand All @@ -620,32 +628,36 @@ private void createAsyncListener(String id) {
gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
}

private void createRegionUsingGfsh(boolean withCacheWriter, boolean withAsyncWriter,
boolean withLoader) {
private void createRegionUsingGfsh() {
StringBuffer createRegionCmd = new StringBuffer();
createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE ");
if (withCacheWriter) {
createRegionCmd.append(" --cache-writer=" + JdbcWriter.class.getName());
}
if (withLoader) {
createRegionCmd.append(" --cache-loader=" + JdbcLoader.class.getName());
}
if (withAsyncWriter) {
createAsyncListener("JAW");
createRegionCmd.append(" --async-event-queue-id=JAW");
}
createRegionCmd.append("create region --name=" + REGION_NAME + " --type=REPLICATE");
gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
}

private void createPartitionRegionUsingGfsh() {
StringBuffer createRegionCmd = new StringBuffer();
createRegionCmd.append("create region --name=" + REGION_NAME + " --type=PARTITION");
gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
}

private void createMapping(String regionName, String connectionName) {
createMapping(regionName, connectionName, Employee.class.getName());
private void createMapping(String regionName, String connectionName, boolean synchronous) {
createMapping(regionName, connectionName, Employee.class.getName(), synchronous);
}

private void createMapping(String regionName, String connectionName, String pdxClassName) {
final String commandStr = "create jdbc-mapping --region=" + regionName + " --data-source="
+ connectionName + (pdxClassName != null ? " --pdx-name=" + pdxClassName : "");
private void createMapping(String regionName, String connectionName, String pdxClassName,
boolean synchronous) {
final String commandStr = "create jdbc-mapping --region=" + regionName
+ " --data-source=" + connectionName
+ " --synchronous=" + synchronous
+ " --pdx-name=" + pdxClassName;
gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
if (!synchronous) {
final String alterAsyncQueue =
"alter async-event-queue --id="
+ CreateMappingCommand.createAsyncEventQueueName(regionName)
+ " --batch-size=1 --batch-time-interval=0";
gfsh.executeAndAssertThat(alterAsyncQueue).statusIsSuccess();
}
}

private void assertTableHasEmployeeData(int size, PdxInstance employee, String key)
Expand Down
Loading

0 comments on commit 0dc46ea

Please sign in to comment.