Skip to content

Commit

Permalink
HIVE-14388: Add number of rows inserted message after insert command …
Browse files Browse the repository at this point in the history
…in Beeline (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar, Peter Vary)
  • Loading branch information
Bharathkrishna Guruvayoor Murali authored and Sahil Takiar committed Jun 1, 2018
1 parent 1c970d9 commit da13a13
Show file tree
Hide file tree
Showing 23 changed files with 303 additions and 45 deletions.
2 changes: 1 addition & 1 deletion beeline/src/main/resources/BeeLine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ abort-drop-all: Aborting drop all tables.

drivers-found-count: 0#No driver classes found|1#{0} driver class found|1<{0} driver classes found
rows-selected: 0#No rows selected|1#{0} row selected|1<{0} rows selected
rows-affected: 0#No rows affected|1#{0} row affected|1<{0} rows affected|0>Unknown rows affected
rows-affected: -1#Unknown rows affected|0#No rows affected|1#{0} row affected|1<{0} rows affected
active-connections: 0#No active connections|1#{0} active connection:|1<{0} active connections:

time-ms: ({0,number,#.###} seconds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ private void doTestSelectAll(String tableName, int maxRows, int fetchSize) throw
assertNotNull("ResultSet is null", res);
assertTrue("getResultSet() not returning expected ResultSet", res == stmt
.getResultSet());
assertEquals("get update count not as expected", -1, stmt.getUpdateCount());
assertEquals("get update count not as expected", 0, stmt.getUpdateCount());
int i = 0;

ResultSetMetaData meta = res.getMetaData();
Expand Down Expand Up @@ -2962,23 +2962,25 @@ public void testInsertOverwriteExecAsync3() throws Exception {

private void testInsertOverwrite(HiveStatement stmt) throws SQLException {
String tblName = "testInsertOverwriteExecAsync";
int rowCount = 0;
stmt.execute("create table " + tblName + " (col1 int , col2 string)");
boolean isResulSet =
stmt.executeAsync("insert overwrite table " + tblName + " select * from " + tableName);
assertFalse(isResulSet);
// HiveStatement#getUpdateCount blocks until the async query is complete
stmt.getUpdateCount();
// Read from the new table
ResultSet rs = stmt.executeQuery("select * from " + tblName);
assertNotNull(rs);
while (rs.next()) {
String value = rs.getString(2);
rowCount++;
assertNotNull(value);
try {
int rowCount = 0;
stmt.execute("create table " + tblName + " (col1 int , col2 string)");
boolean isResulSet =
stmt.executeAsync("insert overwrite table " + tblName + " select * from " + tableName);
assertFalse(isResulSet);
// HiveStatement#getUpdateCount blocks until the async query is complete
rowCount = stmt.getUpdateCount();
// Read from the new table
ResultSet rs = stmt.executeQuery("select * from " + tblName);
assertNotNull(rs);
while (rs.next()) {
String value = rs.getString(2);
assertNotNull(value);
}
assertEquals(dataFileRowCount, rowCount);
} finally {
stmt.execute("drop table " + tblName);
}
assertEquals(rowCount, dataFileRowCount);
stmt.execute("drop table " + tblName);
}

// Test that opening a JDBC connection to a non-existent database throws a HiveSQLException
Expand Down
16 changes: 12 additions & 4 deletions jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException {
TGetOperationStatusResp statusResp = null;

// Poll on the operation status, till the operation is complete
while (!isOperationComplete) {
do {
try {
/**
* For an async SQLOperation, GetOperationStatus will use the long polling approach It will
Expand Down Expand Up @@ -414,7 +414,7 @@ TGetOperationStatusResp waitForOperationToComplete() throws SQLException {
isLogBeingGenerated = false;
throw new SQLException(e.toString(), "08S01", e);
}
}
} while (!isOperationComplete);

/*
we set progress bar to be completed when hive query execution has completed
Expand Down Expand Up @@ -708,8 +708,16 @@ public int getUpdateCount() throws SQLException {
* client might end up using executeAsync and then call this to check if the query run is
* finished.
*/
waitForOperationToComplete();
return -1;
long numModifiedRows = -1;
TGetOperationStatusResp resp = waitForOperationToComplete();
if (resp != null) {
numModifiedRows = resp.getNumModifiedRows();
}
if (numModifiedRows == -1 || numModifiedRows > Integer.MAX_VALUE) {
LOG.warn("Number of rows is greater than Integer.MAX_VALUE");
return -1;
}
return (int) numModifiedRows;
}

/*
Expand Down
15 changes: 15 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -2326,11 +2326,18 @@ private void execute() throws CommandProcessorResponse {
Map<String, MapRedStats> stats = SessionState.get().getMapRedStats();
if (stats != null && !stats.isEmpty()) {
long totalCpu = 0;
long numModifiedRows = 0;
console.printInfo("MapReduce Jobs Launched: ");
for (Map.Entry<String, MapRedStats> entry : stats.entrySet()) {
console.printInfo("Stage-" + entry.getKey() + ": " + entry.getValue());
totalCpu += entry.getValue().getCpuMSec();

if (numModifiedRows > -1) {
//if overflow, then numModifiedRows is set as -1. Else update numModifiedRows with the sum.
numModifiedRows = addWithOverflowCheck(numModifiedRows, entry.getValue().getNumModifiedRows());
}
}
queryState.setNumModifiedRows(numModifiedRows);
console.printInfo("Total MapReduce CPU Time Spent: " + Utilities.formatMsecToStr(totalCpu));
}
lDrvState.stateLock.lock();
Expand All @@ -2351,6 +2358,14 @@ private void execute() throws CommandProcessorResponse {
}
}

private long addWithOverflowCheck(long val1, long val2) {
try {
return Math.addExact(val1, val2);
} catch (ArithmeticException e) {
return -1;
}
}

private void releasePlan(QueryPlan plan) {
// Plan maybe null if Driver.close is called in another thread for the same Driver object
lDrvState.stateLock.lock();
Expand Down
10 changes: 10 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/MapRedStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class MapRedStats {

String jobId;

private long numModifiedRows;

public MapRedStats(int numMap, int numReduce, long cpuMSec, boolean ifSuccess, String jobId) {
this.numMap = numMap;
this.numReduce = numReduce;
Expand Down Expand Up @@ -94,6 +96,14 @@ public void setJobId(String jobId) {
this.jobId = jobId;
}

public long getNumModifiedRows() {
return numModifiedRows;
}

public void setNumModifiedRows(long numModifiedRows) {
this.numModifiedRows = numModifiedRows;
}

public String getTaskNumbers() {
StringBuilder sb = new StringBuilder();
if (numMap > 0) {
Expand Down
12 changes: 12 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public class QueryState {
*/
private HiveTxnManager txnManager;

/**
* Holds the number of rows affected for insert queries.
*/
private long numModifiedRows = 0;

/**
* Private constructor, use QueryState.Builder instead.
* @param conf The query specific configuration object
Expand Down Expand Up @@ -100,6 +105,13 @@ public void setTxnManager(HiveTxnManager txnManager) {
this.txnManager = txnManager;
}

public long getNumModifiedRows() {
return numModifiedRows;
}

public void setNumModifiedRows(long numModifiedRows) {
this.numModifiedRows = numModifiedRows;
}
/**
* Builder to instantiate the QueryState object.
*/
Expand Down
23 changes: 8 additions & 15 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -83,27 +82,13 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.Murmur3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiFunction;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;

/**
* File Sink operator implementation.
**/
Expand Down Expand Up @@ -147,6 +132,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
private transient boolean isInsertOverwrite;
private transient String counterGroup;
private transient BiFunction<Object[], ObjectInspector[], Integer> hashFunc;
public static final String TOTAL_TABLE_ROWS_WRITTEN = "TOTAL_TABLE_ROWS_WRITTEN";

/**
* Counters.
*/
Expand Down Expand Up @@ -583,6 +570,12 @@ protected void initializeOp(Configuration hconf) throws HiveException {
hashFunc = conf.getTableInfo().getBucketingVersion() == 2 ?
ObjectInspectorUtils::getBucketHashCode :
ObjectInspectorUtils::getBucketHashCodeOld;

//Counter for number of rows that are associated with a destination table in FileSinkOperator.
//This count is used to get total number of rows in an insert query.
if (conf.getTableInfo() != null && conf.getTableInfo().getTableName() != null) {
statsMap.put(TOTAL_TABLE_ROWS_WRITTEN, row_count);
}
} catch (HiveException e) {
throw e;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskHandle;
Expand Down Expand Up @@ -426,6 +427,16 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockEx

SessionState ss = SessionState.get();
if (ss != null) {
//Set the number of table rows affected in mapRedStats to display number of rows inserted.
if (ctrs != null) {
Counter counter = ctrs.findCounter(
ss.getConf().getVar(HiveConf.ConfVars.HIVECOUNTERGROUP),
FileSinkOperator.TOTAL_TABLE_ROWS_WRITTEN);
if (counter != null) {
mapRedStats.setNumModifiedRows(counter.getValue());
}
}

this.callBackObj.logPlanProgress(ss);
}
// LOG.info(queryPlan);
Expand Down
8 changes: 8 additions & 0 deletions ql/src/test/results/clientpositive/llap/dp_counter_mm.q.out
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_2: 84
RECORDS_OUT_OPERATOR_SEL_5: 57
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 84
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -65,6 +66,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_2: 189
RECORDS_OUT_OPERATOR_SEL_5: 121
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 189
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -103,6 +105,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_2: 189
RECORDS_OUT_OPERATOR_SEL_5: 121
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 189
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -133,6 +136,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_2: 292
RECORDS_OUT_OPERATOR_SEL_5: 184
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 292
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -185,6 +189,7 @@ Stage-2 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_5: 121
RECORDS_OUT_OPERATOR_SEL_6: 105
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 189
Stage-2 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -223,6 +228,7 @@ Stage-2 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_5: 184
RECORDS_OUT_OPERATOR_SEL_6: 208
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 292
Stage-2 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -270,6 +276,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_7: 189
RECORDS_OUT_OPERATOR_TS_0: 500
RECORDS_OUT_OPERATOR_TS_3: 500
TOTAL_TABLE_ROWS_WRITTEN: 189
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
GROUPED_INPUT_SPLITS_Map_4: 1
Expand Down Expand Up @@ -313,6 +320,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_7: 292
RECORDS_OUT_OPERATOR_TS_0: 500
RECORDS_OUT_OPERATOR_TS_3: 500
TOTAL_TABLE_ROWS_WRITTEN: 292
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
GROUPED_INPUT_SPLITS_Map_4: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_2: 84
RECORDS_OUT_OPERATOR_SEL_5: 57
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 84
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -65,6 +66,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_2: 189
RECORDS_OUT_OPERATOR_SEL_5: 121
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 189
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -103,6 +105,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_2: 189
RECORDS_OUT_OPERATOR_SEL_5: 121
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 189
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -133,6 +136,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_2: 292
RECORDS_OUT_OPERATOR_SEL_5: 184
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 292
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -185,6 +189,7 @@ Stage-2 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_5: 121
RECORDS_OUT_OPERATOR_SEL_6: 105
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 189
Stage-2 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -223,6 +228,7 @@ Stage-2 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_5: 184
RECORDS_OUT_OPERATOR_SEL_6: 208
RECORDS_OUT_OPERATOR_TS_0: 500
TOTAL_TABLE_ROWS_WRITTEN: 292
Stage-2 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
INPUT_DIRECTORIES_Map_1: 1
Expand Down Expand Up @@ -270,6 +276,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_7: 189
RECORDS_OUT_OPERATOR_TS_0: 500
RECORDS_OUT_OPERATOR_TS_3: 500
TOTAL_TABLE_ROWS_WRITTEN: 189
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
GROUPED_INPUT_SPLITS_Map_4: 1
Expand Down Expand Up @@ -313,6 +320,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_SEL_7: 292
RECORDS_OUT_OPERATOR_TS_0: 500
RECORDS_OUT_OPERATOR_TS_3: 500
TOTAL_TABLE_ROWS_WRITTEN: 292
Stage-1 INPUT COUNTERS:
GROUPED_INPUT_SPLITS_Map_1: 1
GROUPED_INPUT_SPLITS_Map_4: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,7 @@ Stage-1 HIVE COUNTERS:
RECORDS_OUT_OPERATOR_MAP_0: 0
RECORDS_OUT_OPERATOR_SEL_2: 2
RECORDS_OUT_OPERATOR_TS_0: 98
TOTAL_TABLE_ROWS_WRITTEN: 2
Stage-1 LLAP IO COUNTERS:
ALLOCATED_BYTES: 2359296
ALLOCATED_USED_BYTES: 44166
Expand Down
Loading

0 comments on commit da13a13

Please sign in to comment.