Skip to content

Commit

Permalink
Apply Local Indexes batch updates only once.
Browse files Browse the repository at this point in the history
  • Loading branch information
lhofhansl committed Apr 20, 2017
1 parent ee886ba commit 5bd7f79
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,36 @@ public void testLocalIndexAutomaticRepair() throws Exception {
}
}

@Test
public void testLocalGlobalIndexMix() throws Exception {
if (isNamespaceMapped) { return; }
String tableName = generateUniqueName();
Connection conn1 = DriverManager.getConnection(getUrl());
String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" +
"k1 INTEGER NOT NULL,\n" +
"k2 INTEGER NOT NULL,\n" +
"k3 INTEGER,\n" +
"v1 VARCHAR,\n" +
"v2 VARCHAR,\n" +
"CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n";
conn1.createStatement().execute(ddl);
conn1.createStatement().execute("CREATE LOCAL INDEX LV1 ON " + tableName + "(v1)");
conn1.createStatement().execute("CREATE INDEX GV2 ON " + tableName + "(v2)");

conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z','3')");
conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'a','0')");
conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a','2')");
conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c','1')");
conn1.commit();
ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v1 = 'c'");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v2 = '2'");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
conn1.close();
}

private void copyLocalIndexHFiles(Configuration conf, HRegionInfo fromRegion, HRegionInfo toRegion, boolean move)
throws IOException {
Path root = FSUtils.getRootDir(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WA
super.postPut(e, put, edit, durability);
return;
}
doPost(edit, put, durability, true, false);
doPost(edit, put, durability);
}

@Override
Expand All @@ -382,28 +382,9 @@ public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete d
super.postDelete(e, delete, edit, durability);
return;
}
doPost(edit, delete, durability, true, false);
doPost(edit, delete, durability);
}

@Override
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (this.disabled) {
super.postBatchMutate(c, miniBatchOp);
return;
}
WALEdit edit = miniBatchOp.getWalEdit(0);
if (edit != null) {
IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
if (ikv != null) {
// This will prevent the postPut and postDelete hooks from doing anything
// We need to do this now, as the postBatchMutateIndispensably (where the
// actual index writing gets done) is called after the postPut and postDelete.
ikv.markBatchFinished();
}
}
}

@Override
public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
Expand All @@ -417,17 +398,13 @@ public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnviro
//each batch operation, only the first one will have anything useful, so we can just grab that
Mutation mutation = miniBatchOp.getOperation(0);
WALEdit edit = miniBatchOp.getWalEdit(0);
// We're forcing the index writes here because we've marked the index batch as "finished"
// to prevent postPut and postDelete from doing anything, but hold off on writing them
// until now so we're outside of the MVCC lock (see PHOENIX-3789). Without this hacky
// forceWrite flag, we'd ignore them again here too.
doPost(edit, mutation, mutation.getDurability(), false, true);
doPost(edit, mutation, mutation.getDurability());
}
}

private void doPost(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite) throws IOException {
private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException {
try {
doPostWithExceptions(edit, m, durability, allowLocalUpdates, forceWrite);
doPostWithExceptions(edit, m, durability);
return;
} catch (Throwable e) {
rethrowIndexingException(e);
Expand All @@ -436,7 +413,7 @@ private void doPost(WALEdit edit, Mutation m, final Durability durability, boole
"Somehow didn't complete the index update, but didn't return succesfully either!");
}

private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability, boolean allowLocalUpdates, boolean forceWrite)
private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability)
throws Exception {
//short circuit, if we don't need to do any work
if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m) || edit == null) {
Expand Down Expand Up @@ -470,30 +447,32 @@ private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability dur
* once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can
* lead to writing all the index updates for each Put/Delete).
*/
if ((!ikv.getBatchFinished() || forceWrite) || allowLocalUpdates) {
if (!ikv.getBatchFinished()) {
Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);

// the WAL edit is kept in memory and we already specified the factory when we created the
// references originally - therefore, we just pass in a null factory here and use the ones
// already specified on each reference
try {
if (!ikv.getBatchFinished() || forceWrite) {
current.addTimelineAnnotation("Actually doing index update for first time");
writer.writeAndKillYourselfOnFailure(indexUpdates, allowLocalUpdates);
} else if (allowLocalUpdates) {
Collection<Pair<Mutation, byte[]>> localUpdates =
new ArrayList<Pair<Mutation, byte[]>>();
current.addTimelineAnnotation("Actually doing local index update for first time");
for (Pair<Mutation, byte[]> mutation : indexUpdates) {
if (Bytes.toString(mutation.getSecond()).equals(
environment.getRegion().getTableDesc().getNameAsString())) {
localUpdates.add(mutation);
}
}
if(!localUpdates.isEmpty()) {
writer.writeAndKillYourselfOnFailure(localUpdates, allowLocalUpdates);
}
}
current.addTimelineAnnotation("Actually doing index update for first time");
Collection<Pair<Mutation, byte[]>> localUpdates =
new ArrayList<Pair<Mutation, byte[]>>();
Collection<Pair<Mutation, byte[]>> remoteUpdates =
new ArrayList<Pair<Mutation, byte[]>>();
for (Pair<Mutation, byte[]> mutation : indexUpdates) {
if (Bytes.toString(mutation.getSecond()).equals(
environment.getRegion().getTableDesc().getNameAsString())) {
localUpdates.add(mutation);
} else {
remoteUpdates.add(mutation);
}
}
if(!remoteUpdates.isEmpty()) {
writer.writeAndKillYourselfOnFailure(remoteUpdates, false);
}
if(!localUpdates.isEmpty()) {
writer.writeAndKillYourselfOnFailure(localUpdates, true);
}
} finally { // With a custom kill policy, we may throw instead of kill the server.
// Without doing this in a finally block (at least with the mini cluster),
// the region server never goes down.
Expand Down

0 comments on commit 5bd7f79

Please sign in to comment.