From b1f0bf75e6c4883d85e20ed700a109f1ec9b91d3 Mon Sep 17 00:00:00 2001 From: Mujtaba Date: Tue, 6 Jun 2017 11:54:33 -0700 Subject: [PATCH] PHOENIX-3868 Pherf - Create sync/async index as part of a scenario --- .../apache/phoenix/pherf/DataIngestIT.java | 19 +++++ .../phoenix/pherf/ResultBaseTestIT.java | 4 +- .../pherf/configuration/DataModel.java | 1 + .../phoenix/pherf/configuration/Ddl.java | 50 +++++++++++++ .../phoenix/pherf/configuration/Scenario.java | 49 ++++++++---- .../phoenix/pherf/util/PhoenixUtil.java | 75 +++++++++++++++++-- .../phoenix/pherf/workload/WriteWorkload.java | 14 ++-- .../scenario/prod_test_unsalted_scenario.xml | 11 ++- .../pherf/ConfigurationParserTest.java | 5 ++ .../test/resources/scenario/test_scenario.xml | 26 ++++++- 10 files changed, 221 insertions(+), 33 deletions(-) create mode 100644 phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Ddl.java diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java index 7b7ac29764d..973ce2c4067 100644 --- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/DataIngestIT.java @@ -101,6 +101,9 @@ public void testColumnRulesApplied() { } } + // Verify number of rows written + assertExpectedNumberOfRecordsWritten(scenario); + // Run some queries executor = new WorkloadExecutor(); Workload query = new QueryExecutor(parser, util, executor); @@ -113,6 +116,22 @@ public void testColumnRulesApplied() { } } + @Test + public void testPreAndPostDataLoadDdls() throws Exception { + Scenario scenario = parser.getScenarioByName("testPreAndPostDdls"); + WorkloadExecutor executor = new WorkloadExecutor(); + executor.add(new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO)); + + try { + executor.get(); + executor.shutdown(); + } catch (Exception e) { + fail("Failed to load data. An exception was thrown: " + e.getMessage()); + } + + assertExpectedNumberOfRecordsWritten(scenario); + } + @Test public void testRWWorkload() throws Exception { diff --git a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java index 1e77f480f96..2f4f4e59103 100644 --- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java +++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/ResultBaseTestIT.java @@ -48,11 +48,11 @@ public class ResultBaseTestIT extends BaseHBaseManagedTimeIT { String dir = properties.getProperty("pherf.default.results.dir"); resultUtil.ensureBaseDirExists(dir); - util.setZookeeper("localhost"); + PhoenixUtil.setZookeeper("localhost"); reader = new SchemaReader(util, matcherSchema); parser = new XMLConfigParser(matcherScenario); } - + @AfterClass public static void tearDown() throws Exception { resultUtil.deleteDir(properties.getProperty("pherf.default.results.dir")); } diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java index 8eb42ff7f13..4c99ddd7e95 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataModel.java @@ -71,4 +71,5 @@ public String toString() { } return stringBuilder.toString(); } + } \ No newline at end of file diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Ddl.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Ddl.java new file mode 100644 index 00000000000..e431040a196 --- /dev/null +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Ddl.java @@ -0,0 +1,50 @@ +package org.apache.phoenix.pherf.configuration; + +import javax.xml.bind.annotation.XmlAttribute; + +public class Ddl { + private String statement; + private String tableName; + + public Ddl() { + } + + public Ddl(String statement, String tableName) { + this.statement = statement; + this.tableName = tableName; + } + + /** + * DDL + * @return + */ + @XmlAttribute + public String getStatement() { + return statement; + } + public void setStatement(String statement) { + this.statement = statement; + } + + /** + * Table name used in the DDL + * @return + */ + @XmlAttribute + public String getTableName() { + return tableName; + } + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String toString(){ + if (statement.contains("?")) { + return statement.replace("?", tableName); + } else { + return statement; + } + + } + +} diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java index 200fdc531eb..02e5cc737ab 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java @@ -24,6 +24,7 @@ import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; import javax.xml.bind.annotation.XmlRootElement; import org.apache.commons.lang.builder.HashCodeBuilder; @@ -39,8 +40,10 @@ public class Scenario { private WriteParams writeParams; private String name; private String tenantId; - private String ddl; - + private List preScenarioDdls; + private List postScenarioDdls; + + public Scenario() { writeParams = new WriteParams(); } @@ -92,7 +95,7 @@ public int getRowCount() { public void setRowCount(int rowCount) { this.rowCount = rowCount; } - + /** * Phoenix properties * @@ -179,18 +182,6 @@ public void setTenantId(String tenantId) { this.tenantId = tenantId; } - /** - * Scenario level DDL that is executed before running the scenario. - */ - @XmlAttribute - public String getDdl() { - return ddl; - } - - public void setDdl(String ddl) { - this.ddl = ddl; - } - public WriteParams getWriteParams() { return writeParams; } @@ -211,4 +202,30 @@ public String toString() { } return stringBuilder.toString(); } -} + + public List getPreScenarioDdls() { + return preScenarioDdls; + } + + /** + * Scenario level DDLs (for views/index/async) that are executed before data load + */ + @XmlElementWrapper(name = "preScenarioDdls") + @XmlElement(name = "ddl") + public void setPreScenarioDdls(List preScenarioDdls) { + this.preScenarioDdls = preScenarioDdls; + } + + public List getPostScenarioDdls() { + return postScenarioDdls; + } + + /** + * Scenario level DDLs (for views/index/async) that are executed after data load + */ + @XmlElementWrapper(name = "postScenarioDdls") + @XmlElement(name = "ddl") + public void setPostScenarioDdls(List postScenarioDdls) { + this.postScenarioDdls = postScenarioDdls; + } +} \ No newline at end of file diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java index df185442c36..38dcd640f0a 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/util/PhoenixUtil.java @@ -18,18 +18,24 @@ package org.apache.phoenix.pherf.util; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter; import org.apache.phoenix.pherf.PherfConstants; import org.apache.phoenix.pherf.configuration.*; import org.apache.phoenix.pherf.jmx.MonitorManager; +import org.apache.phoenix.pherf.result.DataLoadThreadTime; +import org.apache.phoenix.pherf.result.DataLoadTimeSummary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.sql.*; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Properties; +import java.util.Set; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; @@ -42,6 +48,8 @@ public class PhoenixUtil { private static PhoenixUtil instance; private static boolean useThinDriver; private static String queryServerUrl; + private static final String ASYNC_KEYWORD = "ASYNC"; + private static final int ONE_MIN_IN_MS = 60000; private PhoenixUtil() { this(false); @@ -281,13 +289,19 @@ public void executeQuerySetDdls(QuerySet querySet) throws Exception { * * @throws Exception */ - public void executeScenarioDdl(Scenario scenario) throws Exception { - if (null != scenario.getDdl()) { + public void executeScenarioDdl(List ddls, String tenantId, DataLoadTimeSummary dataLoadTimeSummary) throws Exception { + if (null != ddls) { Connection conn = null; try { - logger.info("\nExecuting DDL:" + scenario.getDdl() + " on tenantId:" - + scenario.getTenantId()); - executeStatement(scenario.getDdl(), conn = getConnection(scenario.getTenantId())); + for (Ddl ddl : ddls) { + logger.info("\nExecuting DDL:" + ddl + " on tenantId:" +tenantId); + long startTime = System.currentTimeMillis(); + executeStatement(ddl.toString(), conn = getConnection(tenantId)); + if (ddl.getStatement().toUpperCase().contains(ASYNC_KEYWORD)) { + waitForAsyncIndexToFinish(ddl.getTableName()); + } + dataLoadTimeSummary.add(ddl.getTableName(), 0, (int)(System.currentTimeMillis() - startTime)); + } } finally { if (null != conn) { conn.close(); @@ -296,7 +310,56 @@ public void executeScenarioDdl(Scenario scenario) throws Exception { } } - public static String getZookeeper() { + /** + * Waits for ASYNC index to build + * @param tableName + * @throws InterruptedException + */ + private void waitForAsyncIndexToFinish(String tableName) throws InterruptedException { + //Wait for up to 15 mins for ASYNC index build to start + boolean jobStarted = false; + for (int i=0; i<15; i++) { + if (isYarnJobInProgress(tableName)) { + jobStarted = true; + break; + } + Thread.sleep(ONE_MIN_IN_MS); + } + if (jobStarted == false) { + throw new IllegalStateException("ASYNC index build did not start within 15 mins"); + } + + // Wait till ASYNC index job finishes to get approximate job E2E time + for (;;) { + if (!isYarnJobInProgress(tableName)) + break; + Thread.sleep(ONE_MIN_IN_MS); + } + } + + /** + * Checks if a YARN job with the specific table name is in progress + * @param tableName + * @return + */ + boolean isYarnJobInProgress(String tableName) { + try { + logger.info("Fetching YARN apps..."); + Set response = new PhoenixMRJobSubmitter().getSubmittedYarnApps(); + for (String str : response) { + logger.info("Runnng YARN app: " + str); + if (str.toUpperCase().contains(tableName.toUpperCase())) { + return true; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + + return false; + } + + public static String getZookeeper() { return zookeeper; } diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java index 69d35cc4966..3574761709a 100644 --- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java +++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java @@ -171,14 +171,13 @@ public Runnable execute() throws Exception { private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary, DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception { logger.info("\nLoading " + scenario.getRowCount() + " rows for " + scenario.getTableName()); - long start = System.currentTimeMillis(); - // Execute any Scenario DDL before running workload - pUtil.executeScenarioDdl(scenario); - - List> writeBatches = getBatches(dataLoadThreadTime, scenario); + // Execute any pre dataload scenario DDLs + pUtil.executeScenarioDdl(scenario.getPreScenarioDdls(), scenario.getTenantId(), dataLoadTimeSummary); - waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches); + // Write data + List> writeBatches = getBatches(dataLoadThreadTime, scenario); + waitForBatches(dataLoadTimeSummary, scenario, System.currentTimeMillis(), writeBatches); // Update Phoenix Statistics if (this.generateStatistics == GeneratePhoenixStats.YES) { @@ -188,6 +187,9 @@ private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary, } else { logger.info("Phoenix table stats update not requested."); } + + // Execute any post data load scenario DDLs before starting query workload + pUtil.executeScenarioDdl(scenario.getPostScenarioDdls(), scenario.getTenantId(), dataLoadTimeSummary); } private List> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario) diff --git a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml index 8f93685795e..e538ac2b92d 100644 --- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml +++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml @@ -289,7 +289,6 @@ 2014-10-17 00:00:00.000 - 2014-10-17 00:00:00.000 2014-10-18 00:00:00.000 @@ -315,6 +314,16 @@ TENANT_ID + + + CREATE INDEX IDX_DIVISION ON PHERF.PHERF_PROD_TEST_UNSALTED (DIVISION) + + + + CREATE INDEX IDX_OLDVAL_STRING ON PHERF.PHERF_PROD_TEST_UNSALTED (OLDVAL_STRING) + CREATE INDEX IDX_CONNECTION_ID ON PHERF.PHERF_PROD_TEST_UNSALTED (CONNECTION_ID) + + @@ -255,6 +256,23 @@ + + + + + + + + + + + + + + + + + + + + + +