Skip to content

Commit

Permalink
HIVE-14170: Beeline IncrementalRows should buffer rows and incrementa…
Browse files Browse the repository at this point in the history
…lly re-calculate width if TableOutputFormat is used (Sahil Takiar, reviewed by Tao Li)
  • Loading branch information
sahilTakiar authored and Sergio Pena committed Aug 30, 2016
1 parent 60ec753 commit ebad27d
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 12 deletions.
5 changes: 5 additions & 0 deletions beeline/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
<version>9.1-901.jdbc4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
10 changes: 7 additions & 3 deletions beeline/src/java/org/apache/hive/beeline/BeeLine.java
Original file line number Diff line number Diff line change
Expand Up @@ -2023,10 +2023,14 @@ int print(ResultSet rs) throws SQLException {

Rows rows;

if (getOpts().getIncremental()) {
rows = new IncrementalRows(this, rs);
if (f instanceof TableOutputFormat) {
if (getOpts().getIncremental()) {
rows = new IncrementalRowsWithNormalization(this, rs);
} else {
rows = new BufferedRows(this, rs);
}
} else {
rows = new BufferedRows(this, rs);
rows = new IncrementalRows(this, rs);
}
return f.print(rows);
}
Expand Down
10 changes: 10 additions & 0 deletions beeline/src/java/org/apache/hive/beeline/BeeLineOpts.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class BeeLineOpts implements Completer {
public static final String DEFAULT_NULL_STRING = "NULL";
public static final char DEFAULT_DELIMITER_FOR_DSV = '|';
public static final int DEFAULT_MAX_COLUMN_WIDTH = 50;
public static final int DEFAULT_INCREMENTAL_BUFFER_ROWS = 1000;

public static String URL_ENV_PREFIX = "BEELINE_URL_";

Expand All @@ -74,6 +75,7 @@ class BeeLineOpts implements Completer {
private boolean verbose = false;
private boolean force = false;
private boolean incremental = false;
private int incrementalBufferRows = DEFAULT_INCREMENTAL_BUFFER_ROWS;
private boolean showWarnings = false;
private boolean showNestedErrs = false;
private boolean showElapsedTime = true;
Expand Down Expand Up @@ -511,6 +513,14 @@ public boolean getIncremental() {
return incremental;
}

public void setIncrementalBufferRows(int incrementalBufferRows) {
this.incrementalBufferRows = incrementalBufferRows;
}

public int getIncrementalBufferRows() {
return this.incrementalBufferRows;
}

public void setSilent(boolean silent) {
this.silent = silent;
}
Expand Down
23 changes: 20 additions & 3 deletions beeline/src/java/org/apache/hive/beeline/BufferedRows.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.Iterator;
import java.util.LinkedList;

import com.google.common.base.Optional;


/**
* Rows implementation which buffers all rows in a linked list.
*/
Expand All @@ -36,21 +39,36 @@ class BufferedRows extends Rows {
private int maxColumnWidth;

BufferedRows(BeeLine beeLine, ResultSet rs) throws SQLException {
this(beeLine, rs, Optional.<Integer> absent());
}

BufferedRows(BeeLine beeLine, ResultSet rs, Optional<Integer> limit) throws SQLException {
super(beeLine, rs);
list = new LinkedList<Row>();
int count = rsMeta.getColumnCount();
list.add(new Row(count));
while (rs.next()) {
list.add(new Row(count, rs));

int numRowsBuffered = 0;
if (limit.isPresent()) {
while (limit.get() > numRowsBuffered && rs.next()) {
this.list.add(new Row(count, rs));
numRowsBuffered++;
}
} else {
while (rs.next()) {
this.list.add(new Row(count, rs));
}
}
iterator = list.iterator();
maxColumnWidth = beeLine.getOpts().getMaxColumnWidth();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public Object next() {
return iterator.next();
}
Expand All @@ -76,5 +94,4 @@ void normalizeWidths() {
row.sizes = max;
}
}

}
10 changes: 5 additions & 5 deletions beeline/src/java/org/apache/hive/beeline/IncrementalRows.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
* without any buffering.
*/
public class IncrementalRows extends Rows {
private final ResultSet rs;
protected final ResultSet rs;
private final Row labelRow;
private final Row maxRow;
private Row nextRow;
private boolean endOfResult;
private boolean normalizingWidths;
protected boolean normalizingWidths;


IncrementalRows(BeeLine beeLine, ResultSet rs) throws SQLException {
Expand All @@ -53,8 +53,8 @@ public class IncrementalRows extends Rows {
// normalized display width is based on maximum of display size
// and label size
maxRow.sizes[i] = Math.max(
maxRow.sizes[i],
rsMeta.getColumnDisplaySize(i + 1));
maxRow.sizes[i],
rsMeta.getColumnDisplaySize(i + 1));
maxRow.sizes[i] = Math.min(maxWidth, maxRow.sizes[i]);
}

Expand Down Expand Up @@ -104,4 +104,4 @@ void normalizeWidths() {
// for each row as it is produced
normalizingWidths = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* This source file is based on code taken from SQLLine 1.0.2
* See SQLLine notice in LICENSE
*/
package org.apache.hive.beeline;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.NoSuchElementException;

import com.google.common.base.Optional;


/**
* Extension of {@link IncrementalRows} which buffers "x" number of rows in memory at a time. It
* uses the {@link BufferedRows} class to do its buffering. The value of "x" is determined by the
* Beeline option <code>--incrementalBufferRows</code>, which defaults to
* {@link BeeLineOpts#DEFAULT_INCREMENTAL_BUFFER_ROWS}. Once the initial set of rows are buffered, it
* will allow the {@link #next()} method to drain the buffer. Once the buffer is empty the next
* buffer will be fetched until the {@link ResultSet} is empty. The width of the rows are normalized
* within each buffer using the {@link BufferedRows#normalizeWidths()} method.
*/
public class IncrementalRowsWithNormalization extends IncrementalRows {

private final int incrementalBufferRows;
private BufferedRows buffer;

IncrementalRowsWithNormalization(BeeLine beeLine, ResultSet rs) throws SQLException {
super(beeLine, rs);

this.incrementalBufferRows = beeLine.getOpts().getIncrementalBufferRows();
this.buffer = new BufferedRows(beeLine, rs, Optional.of(this.incrementalBufferRows));
this.buffer.normalizeWidths();
}

@Override
public boolean hasNext() {
try {
if (this.buffer.hasNext()) {
return true;
} else {
this.buffer = new BufferedRows(this.beeLine, this.rs,
Optional.of(this.incrementalBufferRows));
if (this.normalizingWidths) {
this.buffer.normalizeWidths();
}

// Drain the first Row, which just contains column names
if (!this.buffer.hasNext()) {
return false;
}
this.buffer.next();

return this.buffer.hasNext();
}
} catch (SQLException ex) {
throw new RuntimeException(ex.toString());
}
}

@Override
public Object next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return this.buffer.next();
}
}
2 changes: 1 addition & 1 deletion beeline/src/java/org/apache/hive/beeline/Rows.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* Holds column values as strings
*/
abstract class Rows implements Iterator {
private final BeeLine beeLine;
protected final BeeLine beeLine;
final ResultSetMetaData rsMeta;
final Boolean[] primaryKeys;
final NumberFormat numberFormat;
Expand Down
4 changes: 4 additions & 0 deletions beeline/src/main/resources/BeeLine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ cmd-usage: Usage: java org.apache.hive.cli.beeline.BeeLine \n \
\ memory usage at the price of extra display column padding.\n \
\ Setting --incremental=true is recommended if you encounter an OutOfMemory\n \
\ on the client side (due to the fetched result set size being large).\n \
\ Only applicable if --outputformat=table.\n \
\ --incrementalBufferRows=NUMROWS the number of rows to buffer when printing rows on stdout,\n \
\ defaults to 1000; only applicable if --incremental=true\n \
\ and --outputformat=table\n \
\ --truncateTable=[true/false] truncate table column when it exceeds length\n \
\ --delimiterForDSV=DELIMITER specify the delimiter for delimiter-separated values output format (default: |)\n \
\ --isolation=LEVEL set the transaction isolation level\n \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.beeline;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

import org.junit.Test;

import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;


public class TestIncrementalRowsWithNormalization {

@Test
public void testIncrementalRows() throws SQLException {
Integer incrementalBufferRows = 5;

// Mock BeeLineOpts
BeeLineOpts mockBeeLineOpts = mock(BeeLineOpts.class);
when(mockBeeLineOpts.getIncrementalBufferRows()).thenReturn(incrementalBufferRows);
when(mockBeeLineOpts.getMaxColumnWidth()).thenReturn(BeeLineOpts.DEFAULT_MAX_COLUMN_WIDTH);
when(mockBeeLineOpts.getNumberFormat()).thenReturn("default");
when(mockBeeLineOpts.getNullString()).thenReturn("NULL");

// Mock BeeLine
BeeLine mockBeeline = mock(BeeLine.class);
when(mockBeeline.getOpts()).thenReturn(mockBeeLineOpts);

// MockResultSet
ResultSet mockResultSet = mock(ResultSet.class);

ResultSetMetaData mockResultSetMetaData = mock(ResultSetMetaData.class);
when(mockResultSetMetaData.getColumnCount()).thenReturn(1);
when(mockResultSetMetaData.getColumnLabel(1)).thenReturn("Mock Table");
when(mockResultSet.getMetaData()).thenReturn(mockResultSetMetaData);

// First 10 calls to resultSet.next() should return true
when(mockResultSet.next()).thenAnswer(new Answer<Boolean>() {
private int iterations = 10;

@Override
public Boolean answer(InvocationOnMock invocation) {
return this.iterations-- > 0;
}
});

when(mockResultSet.getString(1)).thenReturn("Hello World");

// IncrementalRows constructor should buffer the first "incrementalBufferRows" rows
IncrementalRowsWithNormalization incrementalRowsWithNormalization = new IncrementalRowsWithNormalization(
mockBeeline, mockResultSet);

// When the first buffer is loaded ResultSet.next() should be called "incrementalBufferRows" times
verify(mockResultSet, times(5)).next();

// Iterating through the buffer should not cause the next buffer to be fetched
for (int i = 0; i < incrementalBufferRows + 1; i++) {
incrementalRowsWithNormalization.next();
}
verify(mockResultSet, times(5)).next();

// When a new buffer is fetched ResultSet.next() should be called "incrementalBufferRows" more times
incrementalRowsWithNormalization.next();
verify(mockResultSet, times(10)).next();
}
}

0 comments on commit ebad27d

Please sign in to comment.