diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index a7d0028f7a8..8d3316bcb35 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -617,6 +617,36 @@ public void testLocalIndexAutomaticRepair() throws Exception { } } + @Test + public void testLocalGlobalIndexMix() throws Exception { + if (isNamespaceMapped) { return; } + String tableName = generateUniqueName(); + Connection conn1 = DriverManager.getConnection(getUrl()); + String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" + + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + + "k3 INTEGER,\n" + + "v1 VARCHAR,\n" + + "v2 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"; + conn1.createStatement().execute(ddl); + conn1.createStatement().execute("CREATE LOCAL INDEX LV1 ON " + tableName + "(v1)"); + conn1.createStatement().execute("CREATE INDEX GV2 ON " + tableName + "(v2)"); + + conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z','3')"); + conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'a','0')"); + conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a','2')"); + conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c','1')"); + conn1.commit(); + ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v1 = 'c'"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE v2 = '2'"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + conn1.close(); + } + private void copyLocalIndexHFiles(Configuration conf, HRegionInfo fromRegion, HRegionInfo toRegion, boolean move) throws IOException { Path root = FSUtils.getRootDir(conf); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index de980512761..9fc76e99932 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -372,7 +372,7 @@ public void postPut(ObserverContext e, Put put, WA super.postPut(e, put, edit, durability); return; } - doPost(edit, put, durability, true, false); + doPost(edit, put, durability); } @Override @@ -382,28 +382,9 @@ public void postDelete(ObserverContext e, Delete d super.postDelete(e, delete, edit, durability); return; } - doPost(edit, delete, durability, true, false); + doPost(edit, delete, durability); } - @Override - public void postBatchMutate(ObserverContext c, - MiniBatchOperationInProgress miniBatchOp) throws IOException { - if (this.disabled) { - super.postBatchMutate(c, miniBatchOp); - return; - } - WALEdit edit = miniBatchOp.getWalEdit(0); - if (edit != null) { - IndexedKeyValue ikv = getFirstIndexedKeyValue(edit); - if (ikv != null) { - // This will prevent the postPut and postDelete hooks from doing anything - // We need to do this now, as the postBatchMutateIndispensably (where the - // actual index writing gets done) is called after the postPut and postDelete. - ikv.markBatchFinished(); - } - } - } - @Override public void postBatchMutateIndispensably(ObserverContext c, MiniBatchOperationInProgress miniBatchOp, final boolean success) throws IOException { @@ -417,17 +398,13 @@ public void postBatchMutateIndispensably(ObserverContext> indexUpdates = extractIndexUpdate(edit); // the WAL edit is kept in memory and we already specified the factory when we created the // references originally - therefore, we just pass in a null factory here and use the ones // already specified on each reference try { - if (!ikv.getBatchFinished() || forceWrite) { - current.addTimelineAnnotation("Actually doing index update for first time"); - writer.writeAndKillYourselfOnFailure(indexUpdates, allowLocalUpdates); - } else if (allowLocalUpdates) { - Collection> localUpdates = - new ArrayList>(); - current.addTimelineAnnotation("Actually doing local index update for first time"); - for (Pair mutation : indexUpdates) { - if (Bytes.toString(mutation.getSecond()).equals( - environment.getRegion().getTableDesc().getNameAsString())) { - localUpdates.add(mutation); - } - } - if(!localUpdates.isEmpty()) { - writer.writeAndKillYourselfOnFailure(localUpdates, allowLocalUpdates); - } - } + current.addTimelineAnnotation("Actually doing index update for first time"); + Collection> localUpdates = + new ArrayList>(); + Collection> remoteUpdates = + new ArrayList>(); + for (Pair mutation : indexUpdates) { + if (Bytes.toString(mutation.getSecond()).equals( + environment.getRegion().getTableDesc().getNameAsString())) { + localUpdates.add(mutation); + } else { + remoteUpdates.add(mutation); + } + } + if(!remoteUpdates.isEmpty()) { + writer.writeAndKillYourselfOnFailure(remoteUpdates, false); + } + if(!localUpdates.isEmpty()) { + writer.writeAndKillYourselfOnFailure(localUpdates, true); + } } finally { // With a custom kill policy, we may throw instead of kill the server. // Without doing this in a finally block (at least with the mini cluster), // the region server never goes down.