Skip to content

Commit

Permalink
PHOENIX-3868 Pherf - Create sync/async index as part of a scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
Mujtaba committed Jun 6, 2017
1 parent adf1c2d commit b1f0bf7
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public void testColumnRulesApplied() {
}
}

// Verify number of rows written
assertExpectedNumberOfRecordsWritten(scenario);

// Run some queries
executor = new WorkloadExecutor();
Workload query = new QueryExecutor(parser, util, executor);
Expand All @@ -113,6 +116,22 @@ public void testColumnRulesApplied() {
}
}

@Test
public void testPreAndPostDataLoadDdls() throws Exception {
Scenario scenario = parser.getScenarioByName("testPreAndPostDdls");
WorkloadExecutor executor = new WorkloadExecutor();
executor.add(new WriteWorkload(util, parser, scenario, GeneratePhoenixStats.NO));

try {
executor.get();
executor.shutdown();
} catch (Exception e) {
fail("Failed to load data. An exception was thrown: " + e.getMessage());
}

assertExpectedNumberOfRecordsWritten(scenario);
}

@Test
public void testRWWorkload() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public class ResultBaseTestIT extends BaseHBaseManagedTimeIT {
String dir = properties.getProperty("pherf.default.results.dir");
resultUtil.ensureBaseDirExists(dir);

util.setZookeeper("localhost");
PhoenixUtil.setZookeeper("localhost");
reader = new SchemaReader(util, matcherSchema);
parser = new XMLConfigParser(matcherScenario);
}

@AfterClass public static void tearDown() throws Exception {
resultUtil.deleteDir(properties.getProperty("pherf.default.results.dir"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ public String toString() {
}
return stringBuilder.toString();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.apache.phoenix.pherf.configuration;

import javax.xml.bind.annotation.XmlAttribute;

public class Ddl {
private String statement;
private String tableName;

public Ddl() {
}

public Ddl(String statement, String tableName) {
this.statement = statement;
this.tableName = tableName;
}

/**
* DDL
* @return
*/
@XmlAttribute
public String getStatement() {
return statement;
}
public void setStatement(String statement) {
this.statement = statement;
}

/**
* Table name used in the DDL
* @return
*/
@XmlAttribute
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}

public String toString(){
if (statement.contains("?")) {
return statement.replace("?", tableName);
} else {
return statement;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;

import org.apache.commons.lang.builder.HashCodeBuilder;
Expand All @@ -39,8 +40,10 @@ public class Scenario {
private WriteParams writeParams;
private String name;
private String tenantId;
private String ddl;

private List<Ddl> preScenarioDdls;
private List<Ddl> postScenarioDdls;


public Scenario() {
writeParams = new WriteParams();
}
Expand Down Expand Up @@ -92,7 +95,7 @@ public int getRowCount() {
public void setRowCount(int rowCount) {
this.rowCount = rowCount;
}

/**
* Phoenix properties
*
Expand Down Expand Up @@ -179,18 +182,6 @@ public void setTenantId(String tenantId) {
this.tenantId = tenantId;
}

/**
* Scenario level DDL that is executed before running the scenario.
*/
@XmlAttribute
public String getDdl() {
return ddl;
}

public void setDdl(String ddl) {
this.ddl = ddl;
}

public WriteParams getWriteParams() {
return writeParams;
}
Expand All @@ -211,4 +202,30 @@ public String toString() {
}
return stringBuilder.toString();
}
}

public List<Ddl> getPreScenarioDdls() {
return preScenarioDdls;
}

/**
* Scenario level DDLs (for views/index/async) that are executed before data load
*/
@XmlElementWrapper(name = "preScenarioDdls")
@XmlElement(name = "ddl")
public void setPreScenarioDdls(List<Ddl> preScenarioDdls) {
this.preScenarioDdls = preScenarioDdls;
}

public List<Ddl> getPostScenarioDdls() {
return postScenarioDdls;
}

/**
* Scenario level DDLs (for views/index/async) that are executed after data load
*/
@XmlElementWrapper(name = "postScenarioDdls")
@XmlElement(name = "ddl")
public void setPostScenarioDdls(List<Ddl> postScenarioDdls) {
this.postScenarioDdls = postScenarioDdls;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,24 @@

package org.apache.phoenix.pherf.util;

import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter;
import org.apache.phoenix.pherf.PherfConstants;
import org.apache.phoenix.pherf.configuration.*;
import org.apache.phoenix.pherf.jmx.MonitorManager;
import org.apache.phoenix.pherf.result.DataLoadThreadTime;
import org.apache.phoenix.pherf.result.DataLoadTimeSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;

import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
Expand All @@ -42,6 +48,8 @@ public class PhoenixUtil {
private static PhoenixUtil instance;
private static boolean useThinDriver;
private static String queryServerUrl;
private static final String ASYNC_KEYWORD = "ASYNC";
private static final int ONE_MIN_IN_MS = 60000;

private PhoenixUtil() {
this(false);
Expand Down Expand Up @@ -281,13 +289,19 @@ public void executeQuerySetDdls(QuerySet querySet) throws Exception {
*
* @throws Exception
*/
public void executeScenarioDdl(Scenario scenario) throws Exception {
if (null != scenario.getDdl()) {
public void executeScenarioDdl(List<Ddl> ddls, String tenantId, DataLoadTimeSummary dataLoadTimeSummary) throws Exception {
if (null != ddls) {
Connection conn = null;
try {
logger.info("\nExecuting DDL:" + scenario.getDdl() + " on tenantId:"
+ scenario.getTenantId());
executeStatement(scenario.getDdl(), conn = getConnection(scenario.getTenantId()));
for (Ddl ddl : ddls) {
logger.info("\nExecuting DDL:" + ddl + " on tenantId:" +tenantId);
long startTime = System.currentTimeMillis();
executeStatement(ddl.toString(), conn = getConnection(tenantId));
if (ddl.getStatement().toUpperCase().contains(ASYNC_KEYWORD)) {
waitForAsyncIndexToFinish(ddl.getTableName());
}
dataLoadTimeSummary.add(ddl.getTableName(), 0, (int)(System.currentTimeMillis() - startTime));
}
} finally {
if (null != conn) {
conn.close();
Expand All @@ -296,7 +310,56 @@ public void executeScenarioDdl(Scenario scenario) throws Exception {
}
}

public static String getZookeeper() {
/**
* Waits for ASYNC index to build
* @param tableName
* @throws InterruptedException
*/
private void waitForAsyncIndexToFinish(String tableName) throws InterruptedException {
//Wait for up to 15 mins for ASYNC index build to start
boolean jobStarted = false;
for (int i=0; i<15; i++) {
if (isYarnJobInProgress(tableName)) {
jobStarted = true;
break;
}
Thread.sleep(ONE_MIN_IN_MS);
}
if (jobStarted == false) {
throw new IllegalStateException("ASYNC index build did not start within 15 mins");
}

// Wait till ASYNC index job finishes to get approximate job E2E time
for (;;) {
if (!isYarnJobInProgress(tableName))
break;
Thread.sleep(ONE_MIN_IN_MS);
}
}

/**
* Checks if a YARN job with the specific table name is in progress
* @param tableName
* @return
*/
boolean isYarnJobInProgress(String tableName) {
try {
logger.info("Fetching YARN apps...");
Set<String> response = new PhoenixMRJobSubmitter().getSubmittedYarnApps();
for (String str : response) {
logger.info("Runnng YARN app: " + str);
if (str.toUpperCase().contains(tableName.toUpperCase())) {
return true;
}
}
} catch (Exception e) {
e.printStackTrace();
}

return false;
}

public static String getZookeeper() {
return zookeeper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,13 @@ public Runnable execute() throws Exception {
private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary,
DataLoadThreadTime dataLoadThreadTime, Scenario scenario) throws Exception {
logger.info("\nLoading " + scenario.getRowCount() + " rows for " + scenario.getTableName());
long start = System.currentTimeMillis();

// Execute any Scenario DDL before running workload
pUtil.executeScenarioDdl(scenario);

List<Future<Info>> writeBatches = getBatches(dataLoadThreadTime, scenario);
// Execute any pre dataload scenario DDLs
pUtil.executeScenarioDdl(scenario.getPreScenarioDdls(), scenario.getTenantId(), dataLoadTimeSummary);

waitForBatches(dataLoadTimeSummary, scenario, start, writeBatches);
// Write data
List<Future<Info>> writeBatches = getBatches(dataLoadThreadTime, scenario);
waitForBatches(dataLoadTimeSummary, scenario, System.currentTimeMillis(), writeBatches);

// Update Phoenix Statistics
if (this.generateStatistics == GeneratePhoenixStats.YES) {
Expand All @@ -188,6 +187,9 @@ private synchronized void exec(DataLoadTimeSummary dataLoadTimeSummary,
} else {
logger.info("Phoenix table stats update not requested.");
}

// Execute any post data load scenario DDLs before starting query workload
pUtil.executeScenarioDdl(scenario.getPostScenarioDdls(), scenario.getTenantId(), dataLoadTimeSummary);
}

private List<Future<Info>> getBatches(DataLoadThreadTime dataLoadThreadTime, Scenario scenario)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@
<maxValue>2014-10-17 00:00:00.000</maxValue>
</datavalue>
<datavalue distribution="2">
<minValue>2014-10-17 00:00:00.000</minValue>
<maxValue>2014-10-18 00:00:00.000</maxValue>
</datavalue>
<datavalue distribution="2">
Expand All @@ -315,6 +314,16 @@
<name>TENANT_ID</name>
</column>
</dataOverride>

<preScenarioDdls>
<ddl>CREATE INDEX IDX_DIVISION ON PHERF.PHERF_PROD_TEST_UNSALTED (DIVISION)</ddl>
</preScenarioDdls>

<postScenarioDdls>
<ddl>CREATE INDEX IDX_OLDVAL_STRING ON PHERF.PHERF_PROD_TEST_UNSALTED (OLDVAL_STRING)</ddl>
<ddl>CREATE INDEX IDX_CONNECTION_ID ON PHERF.PHERF_PROD_TEST_UNSALTED (CONNECTION_ID)</ddl>
</postScenarioDdls>

<writeParams executionDurationInMs="10000">
<!--
Number of writer it insert into the threadpool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ private String writeXML() {
data.setDataMappingColumns(columnList);

Scenario scenario = new Scenario();
scenario.setTenantId("00DXXXXXX");
List<Ddl> preScenarioDdls = new ArrayList<Ddl>();
preScenarioDdls.add(new Ddl("CREATE INDEX IF NOT EXISTS ? ON FHA (NEWVAL_NUMBER) ASYNC", "FHAIDX_NEWVAL_NUMBER"));
preScenarioDdls.add(new Ddl("CREATE LOCAL INDEX IF NOT EXISTS ? ON FHA (NEWVAL_NUMBER)", "FHAIDX_NEWVAL_NUMBER"));
scenario.setPreScenarioDdls(preScenarioDdls);
scenario.setPhoenixProperties(new HashMap<String, String>());
scenario.getPhoenixProperties().put("phoenix.query.threadPoolSize", "200");
scenario.setDataOverride(new DataOverride());
Expand Down
Loading

0 comments on commit b1f0bf7

Please sign in to comment.