Skip to content

Commit

Permalink
PHOENIX-4105 Fix tests broken due to PHOENIX-4089 (addendum 1)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesRTaylor committed Aug 20, 2017
1 parent c104a4d commit 1dc9008
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void testIndexWriteFailure() throws Exception {
FailingRegionObserver.FAIL_WRITE = false;
if (rebuildIndexOnWriteFailure) {
// wait for index to be rebuilt automatically
waitForIndexRebuild(conn,indexName, PIndexState.ACTIVE);
waitForIndexRebuild(conn,fullIndexName, PIndexState.ACTIVE);
} else {
// simulate replaying failed mutation
replayMutations();
Expand All @@ -306,7 +306,7 @@ public void testIndexWriteFailure() throws Exception {
// Wait for index to be rebuilt automatically. This should fail because
// we haven't flipped the FAIL_WRITE flag to false and as a result this
// should cause index rebuild to fail too.
waitForIndexRebuild(conn, indexName, PIndexState.DISABLE);
waitForIndexRebuild(conn, fullIndexName, PIndexState.DISABLE);
// verify that the index was marked as disabled and the index disable
// timestamp set to 0
String q =
Expand All @@ -324,9 +324,8 @@ public void testIndexWriteFailure() throws Exception {
}
}

private void waitForIndexRebuild(Connection conn, String index, PIndexState expectedIndexState) throws InterruptedException, SQLException {
private void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
if (!transactional) {
String fullIndexName = SchemaUtil.getTableName(schema, index);
TestUtil.waitForIndexRebuild(conn, fullIndexName, expectedIndexState);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
Expand All @@ -28,12 +29,22 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PTable;
Expand All @@ -57,18 +68,101 @@
@RunWith(RunUntilFailure.class)
public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
private static final Random RAND = new Random(5);
private static final int WAIT_AFTER_DISABLED = 10000;
private static final int WAIT_AFTER_DISABLED = 5000;

@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString());
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "2000");
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "120000"); // give up rebuilding after 2 minutes
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED));
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
}

private static void mutateRandomly(final String fullTableName, final int nThreads, final int nRows, final int nIndexValues, final int batchSize, final CountDownLatch doneSignal) {
Runnable[] runnables = new Runnable[nThreads];
for (int i = 0; i < nThreads; i++) {
runnables[i] = new Runnable() {

@Override
public void run() {
try {
Connection conn = DriverManager.getConnection(getUrl());
for (int i = 0; i < 3000; i++) {
boolean isNull = RAND.nextBoolean();
int randInt = RAND.nextInt() % nIndexValues;
int pk = Math.abs(RAND.nextInt()) % nRows;
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (" + pk + ", 0, " + (isNull ? null : randInt) + ")");
if ((i % batchSize) == 0) {
conn.commit();
}
}
conn.commit();
for (int i = 0; i < 3000; i++) {
int pk = Math.abs(RAND.nextInt()) % nRows;
conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k1= " + pk + " AND k2=0");
if (i % batchSize == 0) {
conn.commit();
}
}
conn.commit();
for (int i = 0; i < 3000; i++) {
int randInt = RAND.nextInt() % nIndexValues;
int pk = Math.abs(RAND.nextInt()) % nRows;
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (" + pk + ", 0, " + randInt + ")");
if ((i % batchSize) == 0) {
conn.commit();
}
}
conn.commit();
} catch (SQLException e) {
throw new RuntimeException(e);
} finally {
doneSignal.countDown();
}
}

};
}
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread(runnables[i]);
t.start();
}
}
@Test
@Repeat(3)
public void testConcurrentUpsertsWithRebuild() throws Throwable {
int nThreads = 5;
final int batchSize = 200;
final int nRows = 51;
final int nIndexValues = 23;
final String schemaName = "";
final String tableName = generateUniqueName();
final String indexName = generateUniqueName();
final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
Connection conn = DriverManager.getConnection(getUrl());
HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) STORE_NULLS=true, VERSIONS=1");
//addDelayingCoprocessor(conn, tableName);
conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)");

final CountDownLatch doneSignal1 = new CountDownLatch(nThreads);
mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal1);
assertTrue("Ran out of time", doneSignal1.await(120, TimeUnit.SECONDS));

IndexUtil.updateIndexState(fullIndexName, EnvironmentEdgeManager.currentTimeMillis(), metaTable, PIndexState.DISABLE);
do {
final CountDownLatch doneSignal2 = new CountDownLatch(nThreads);
mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal2);
assertTrue("Ran out of time", doneSignal2.await(500, TimeUnit.SECONDS));
} while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));

long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
assertEquals(nRows, actualRowCount);
}

private static boolean mutateRandomly(Connection conn, String fullTableName, int nRows) throws Exception {
return mutateRandomly(conn, fullTableName, nRows, false, null);
}
Expand Down Expand Up @@ -144,7 +238,7 @@ private static boolean mutateRandomly(Connection conn, String fullTableName, int
}

@Test
@Repeat(10)
@Repeat(3)
public void testDeleteAndUpsertAfterFailure() throws Throwable {
final int nRows = 10;
String schemaName = generateUniqueName();
Expand All @@ -160,13 +254,26 @@ public void testDeleteAndUpsertAfterFailure() throws Throwable {
HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
mutateRandomly(conn, fullTableName, nRows);
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);

long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
assertEquals(nRows,actualRowCount);
}
}

private static void addDelayingCoprocessor(Connection conn, String tableName) throws SQLException, IOException {
int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
descriptor.addCoprocessor(DelayingRegionObserver.class.getName(), null, priority, null);
HBaseAdmin admin = services.getAdmin();
try {
admin.modifyTable(Bytes.toBytes(tableName), descriptor);
} finally {
admin.close();
}
}

@Test
public void testWriteWhileRebuilding() throws Throwable {
final int nRows = 10;
Expand Down Expand Up @@ -201,12 +308,15 @@ public void run() {
t.start();
long disableTS = EnvironmentEdgeManager.currentTimeMillis();
IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
doneSignal.await(60, TimeUnit.SECONDS);
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
assertTrue(hasInactiveIndex[0]);

TestUtil.dumpIndexStatus(conn, fullIndexName);

long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
assertEquals(nRows,actualRowCount);

}
}

Expand All @@ -231,7 +341,7 @@ public void testMultiVersionsAfterFailure() throws Throwable {
conn.commit();
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee')");
conn.commit();
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);

IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
Expand All @@ -258,7 +368,7 @@ public void testUpsertNullAfterFailure() throws Throwable {
conn.commit();
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
conn.commit();
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);

IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
Expand All @@ -285,7 +395,7 @@ public void testUpsertNullTwiceAfterFailure() throws Throwable {
conn.commit();
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)");
conn.commit();
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);

IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
Expand All @@ -310,7 +420,7 @@ public void testDeleteAfterFailure() throws Throwable {
conn.commit();
conn.createStatement().execute("DELETE FROM " + fullTableName);
conn.commit();
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);

IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
Expand All @@ -335,7 +445,7 @@ public void testDeleteBeforeFailure() throws Throwable {
IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')");
conn.commit();
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);

IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
Expand Down Expand Up @@ -379,11 +489,9 @@ public void testMultiValuesAtSameTS() throws Throwable {
conn.commit();
clock.time += 1000;
advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
clock.time += 100;
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
} finally {
EnvironmentEdgeManager.injectEdge(null);
}
Expand Down Expand Up @@ -414,7 +522,7 @@ public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
conn.commit();
clock.time += 1000;
advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
clock.time += 100;
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
} finally {
Expand Down Expand Up @@ -447,10 +555,8 @@ public void testDeleteAndUpsertValuesAtSameTS2() throws Throwable {
conn.commit();
clock.time += 1000;
advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
clock.time += 100;
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
} finally {
EnvironmentEdgeManager.injectEdge(null);
Expand All @@ -477,4 +583,16 @@ public void run() {
t.setDaemon(true);
t.start();
}

public static class DelayingRegionObserver extends SimpleRegionObserver {
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
try {
Thread.sleep(Math.abs(RAND.nextInt()) % 10);
} catch (InterruptedException e) {
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,11 @@ public void run() {
indexPTable.getSchemaName().getString(),
indexPTable.getTableName().getString());
if (scanEndTime == latestUpperBoundTimestamp) {
// We compare the absolute value of the index disable timestamp. We don't want to
// pass a negative value because that means an additional index write failed.
// Finished building. Pass in the expected value for the index disabled timestamp
// and only set to active if it hasn't changed (i.e. a write failed again, before
// we're done). We take the absolute value because if the option to leave the
// index active upon failure is used, we'll see a negative value when a write
// fails.
IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, Math.abs(indexPTable.getIndexDisableTimestamp()));
batchExecutedPerTableMap.remove(dataPTable.getName());
LOG.info("Making Index:" + indexPTable.getTableName() + " active after rebuilding");
Expand Down
Loading

0 comments on commit 1dc9008

Please sign in to comment.