Skip to content

Commit

Permalink
[LIVY-650][THRIFT] Remove schema from ResultSet
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

The class `ResultSet` is serialized and sent over the wire. Currently this class contains a JSON string representation of the spark schema, which is never used. Hence, the PR removes it in order to avoid serializing it uselessly.

## How was this patch tested?

existing UTs

Author: Marco Gaido <[email protected]>

Closes apache#213 from mgaido91/LIVY-650.
  • Loading branch information
mgaido91 committed Aug 29, 2019
1 parent 4ec3b9b commit 9c34750
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ResultSet call(JobContext ctx) {
StatementState st = session.findStatement(statementId);
Iterator<Row> iter = st.iter;

ResultSet rs = new ResultSet(st.types, st.schema);
ResultSet rs = new ResultSet(st.types);
int count = 0;
while (iter.hasNext() && count < maxRows) {
Row row = iter.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,13 @@
*/
public class ResultSet {

private final String schema;
private final ColumnBuffer[] columns;

public ResultSet() {
this.schema = null;
this.columns = null;
}

public ResultSet(DataType[] types, String schema) {
this.schema = schema;
public ResultSet(DataType[] types) {
this.columns = new ColumnBuffer[types.length];
for (int i = 0; i < columns.length; i++) {
columns[i] = new ColumnBuffer(types[i]);
Expand All @@ -69,10 +66,6 @@ public void addRow(Object[] fields) {
}
}

public String getSchema() {
return schema;
}

public ColumnBuffer[] getColumns() {
return columns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.livy.thriftserver.session;

import java.util.Iterator;
import java.util.List;

import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -88,5 +86,4 @@ private void executeSql(JobContext ctx) throws Exception {
// has been executed.
session.registerStatement(statementId, schema, iter);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,4 @@ class StatementState {
this.iter = iter;
this.types = SparkUtils.translateSchema(schema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,5 +157,4 @@ private NoSuchElementException catalogJobNotFound(String jobId) {
return new NoSuchElementException(
String.format("Catalog job %s not found in session %s.", jobId, sessionId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void testColumnBuffer() throws Exception {

ds.write().format("parquet").saveAsTable("types_test");

ResultSet rs = new ResultSet(SparkUtils.translateSchema(ds.schema()), ds.schema().json());
ResultSet rs = new ResultSet(SparkUtils.translateSchema(ds.schema()));
for (Row r : spark.table("types_test").collectAsList()) {
Object[] cols = new Object[r.length()];
for (int i = 0; i < cols.length; i++) {
Expand Down

0 comments on commit 9c34750

Please sign in to comment.