Skip to content

Commit

Permalink
Improve encapsulation of components used in FetchBatchAccumulator
Browse files Browse the repository at this point in the history
- Renames `FetchProjectorContext` → `ReaderBuckets`. Is now used only
for partitioning of the `_fetchId`s by `_readerId` into a `_docId` +
  later carry the result for the fetched values for the doc-ids.

- `FetchRowInputSymbolVisitor` leaked a lot of the workings into the
`FetchBatchAccumulator`. This is now encapsulated via `FetchRows`.
  • Loading branch information
mfussenegger committed Mar 18, 2020
1 parent 65f0c34 commit 3c3c3d4
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 617 deletions.
1 change: 1 addition & 0 deletions dex/src/main/java/io/crate/data/Projector.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* Projects a given upstream {@link BatchIterator} object and returns a batch iterator which follows the semantics of
* the underlying projection.
*/
@FunctionalInterface
public interface Projector extends UnaryOperator<BatchIterator<Row>> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,32 @@

package io.crate.execution.dsl.projection;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.IntObjectMap;
import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.monitor.jvm.JvmInfo;

import io.crate.Streamer;
import io.crate.common.collections.Lists2;
import io.crate.data.Paging;
import io.crate.expression.symbol.ScopedSymbol;
import io.crate.expression.symbol.SelectSymbol;
import io.crate.expression.symbol.Symbol;
import io.crate.expression.symbol.SymbolVisitors;
import io.crate.expression.symbol.Symbols;
import io.crate.metadata.RelationName;
import io.crate.planner.node.fetch.FetchSource;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.monitor.jvm.JvmInfo;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

public class FetchProjection extends Projection {

Expand Down Expand Up @@ -170,4 +178,25 @@ public Map<String, Object> mapRepresentation() {
"fetchSize", fetchSize
);
}

@SuppressWarnings({"rawtypes"})
public Map<String, ? extends IntObjectMap<Streamer[]>> generateStreamersGroupedByReaderAndNode() {
HashMap<String, IntObjectHashMap<Streamer[]>> streamersByReaderByNode = new HashMap<>();
for (Map.Entry<String, IntSet> entry : nodeReaders.entrySet()) {
IntObjectHashMap<Streamer[]> streamersByReaderId = new IntObjectHashMap<>();
String nodeId = entry.getKey();
streamersByReaderByNode.put(nodeId, streamersByReaderId);
for (IntCursor readerIdCursor : entry.getValue()) {
int readerId = readerIdCursor.value;
String index = readerIndices.floorEntry(readerId).getValue();
RelationName relationName = indicesToIdents.get(index);
FetchSource fetchSource = fetchSources.get(relationName);
if (fetchSource == null) {
continue;
}
streamersByReaderId.put(readerIdCursor.value, Symbols.streamerArray(fetchSource.references()));
}
}
return streamersByReaderByNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,71 +22,55 @@

package io.crate.execution.engine.fetch;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;

import com.carrotsearch.hppc.IntContainer;
import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.IntObjectMap;
import com.carrotsearch.hppc.IntSet;
import com.carrotsearch.hppc.cursors.IntCursor;
import com.carrotsearch.hppc.cursors.IntObjectCursor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import io.crate.concurrent.CompletableFutures;
import io.crate.data.BatchAccumulator;
import io.crate.data.Bucket;
import io.crate.data.Input;
import io.crate.data.Row;
import io.crate.data.UnsafeArrayRow;
import io.crate.expression.InputRow;
import io.crate.expression.symbol.Symbol;
import io.crate.metadata.TransactionContext;
import io.crate.metadata.Functions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;

public class FetchBatchAccumulator implements BatchAccumulator<Row, Iterator<? extends Row>> {

private static final Logger LOGGER = LogManager.getLogger(FetchBatchAccumulator.class);

private final FetchOperation fetchOperation;
private final FetchProjectorContext context;
private final ReaderBuckets readerBuckets;
private final int fetchSize;
private final FetchRowInputSymbolVisitor.Context collectRowContext;
private final InputRow outputRow;
private final ArrayList<Object[]> inputValues = new ArrayList<>();
private final FetchRows fetchRows;
private final Map<String, IntSet> readerIdsByNode;

public FetchBatchAccumulator(TransactionContext txnCtx,
public FetchBatchAccumulator(FetchRows fetchRows,
FetchOperation fetchOperation,
Functions functions,
List<Symbol> outputSymbols,
FetchProjectorContext fetchProjectorContext,
Map<String, IntSet> readerIdsByNode,
int fetchSize) {
this.fetchOperation = fetchOperation;
this.context = fetchProjectorContext;
this.readerIdsByNode = readerIdsByNode;
this.readerBuckets = new ReaderBuckets();
this.fetchSize = fetchSize;

FetchRowInputSymbolVisitor rowInputSymbolVisitor = new FetchRowInputSymbolVisitor(txnCtx, functions);
this.collectRowContext = new FetchRowInputSymbolVisitor.Context(fetchProjectorContext.tableToFetchSource);

List<Input<?>> inputs = new ArrayList<>(outputSymbols.size());
for (Symbol symbol : outputSymbols) {
inputs.add(symbol.accept(rowInputSymbolVisitor, collectRowContext));
}
outputRow = new InputRow(inputs);
this.fetchRows = fetchRows;
}

@Override
public void onItem(Row row) {
Object[] cells = row.materialize();
collectRowContext.inputRow().cells(cells);
for (int i : collectRowContext.fetchIdPositions()) {
for (int i : fetchRows.fetchIdPositions()) {
Object fetchId = cells[i];
if (fetchId != null) {
context.require((long) fetchId);
readerBuckets.require((long) fetchId);
}
}
inputValues.add(cells);
Expand All @@ -95,10 +79,10 @@ public void onItem(Row row) {
@Override
public CompletableFuture<Iterator<? extends Row>> processBatch(boolean isLastBatch) {
List<CompletableFuture<IntObjectMap<? extends Bucket>>> futures = new ArrayList<>();
Iterator<Map.Entry<String, IntSet>> it = context.nodeToReaderIds.entrySet().iterator();
Iterator<Map.Entry<String, IntSet>> it = readerIdsByNode.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, IntSet> entry = it.next();
IntObjectHashMap<IntContainer> toFetch = generateToFetch(entry.getValue());
IntObjectHashMap<IntContainer> toFetch = readerBuckets.generateToFetch(entry.getValue());
if (toFetch.isEmpty() && !isLastBatch) {
continue;
}
Expand All @@ -117,7 +101,7 @@ public CompletableFuture<Iterator<? extends Row>> processBatch(boolean isLastBat

@Override
public void close() {
for (String nodeId : context.nodeToReaderIds.keySet()) {
for (String nodeId : readerIdsByNode.keySet()) {
fetchOperation.fetch(nodeId, new IntObjectHashMap<>(0), true)
.exceptionally(e -> {
LOGGER.error("An error happened while sending close fetchRequest to node=" + nodeId, e);
Expand All @@ -128,19 +112,14 @@ public void close() {

@Override
public void reset() {
context.clearBuckets();
readerBuckets.clearBuckets();
inputValues.clear();
}

private Iterator<? extends Row> getRows(List<IntObjectMap<? extends Bucket>> results) {
applyResultToReaderBuckets(results);
readerBuckets.applyResults(results);
return new Iterator<Row>() {

final int[] fetchIdPositions = collectRowContext.fetchIdPositions();
final UnsafeArrayRow inputRow = collectRowContext.inputRow();
final UnsafeArrayRow[] fetchRows = collectRowContext.fetchRows();
final Object[][] nullCells = collectRowContext.nullCells();

int idx = 0;

@Override
Expand All @@ -154,25 +133,10 @@ public Row next() {
throw new NoSuchElementException("Iterator is exhausted");
}
Object[] cells = inputValues.get(idx);
inputRow.cells(cells);
for (int i = 0; i < fetchIdPositions.length; i++) {
Object fetchIdObj = cells[fetchIdPositions[i]];
if (fetchIdObj == null) {
fetchRows[i].cells(nullCells[i]);
continue;
}
long fetchId = (long) fetchIdObj;
int readerId = FetchId.decodeReaderId(fetchId);
int docId = FetchId.decodeDocId(fetchId);
ReaderBucket readerBucket = context.getReaderBucket(readerId);
assert readerBucket != null : "readerBucket must not be null";
fetchRows[i].cells(readerBucket.get(docId));
}
idx++;
Row outputRow = fetchRows.updatedOutputRow(cells, readerBuckets);
if (!hasNext()) {
// free up memory - in case we're streaming data to the client
// this would otherwise grow to hold the whole result in-memory
reset();
readerBuckets.clearBuckets();
}
return outputRow;
}
Expand All @@ -183,27 +147,4 @@ public Row next() {
public int batchSize() {
return fetchSize;
}

private void applyResultToReaderBuckets(List<IntObjectMap<? extends Bucket>> results) {
for (IntObjectMap<? extends Bucket> result : results) {
if (result == null) {
continue;
}
for (IntObjectCursor<? extends Bucket> cursor : result) {
ReaderBucket readerBucket = context.getReaderBucket(cursor.key);
readerBucket.fetched(cursor.value);
}
}
}

private IntObjectHashMap<IntContainer> generateToFetch(IntSet readerIds) {
IntObjectHashMap<IntContainer> toFetch = new IntObjectHashMap<>(readerIds.size());
for (IntCursor readerIdCursor : readerIds) {
ReaderBucket readerBucket = context.readerBucket(readerIdCursor.value);
if (readerBucket != null && readerBucket.docs.size() > 0) {
toFetch.put(readerIdCursor.value, readerBucket.docs.keys());
}
}
return toFetch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,52 +26,31 @@
import io.crate.data.BatchIterator;
import io.crate.data.Projector;
import io.crate.data.Row;
import io.crate.expression.symbol.Symbol;
import io.crate.metadata.TransactionContext;
import io.crate.execution.dsl.projection.FetchProjection;
import io.crate.metadata.Functions;
import io.crate.metadata.TransactionContext;

import java.util.List;

public class FetchProjector implements Projector {

private final TransactionContext txnCtx;
private final FetchOperation fetchOperation;
private final Functions functions;
private final List<Symbol> outputSymbols;
private final FetchProjectorContext fetchProjectorContext;
private final int fetchSize;

public FetchProjector(TransactionContext txnCtx,
FetchOperation fetchOperation,
Functions functions,
List<Symbol> outputSymbols,
FetchProjectorContext fetchProjectorContext,
int fetchSize) {
this.txnCtx = txnCtx;
this.fetchOperation = fetchOperation;
this.functions = functions;
this.outputSymbols = outputSymbols;
this.fetchProjectorContext = fetchProjectorContext;
this.fetchSize = fetchSize;
}

@Override
public BatchIterator<Row> apply(BatchIterator<Row> batchIterator) {
return new AsyncOperationBatchIterator<>(
batchIterator,
new FetchBatchAccumulator(
txnCtx,
fetchOperation,
functions,
outputSymbols,
fetchProjectorContext,
fetchSize
)
public final class FetchProjector {

public static Projector create(FetchProjection projection,
TransactionContext txnCtx,
Functions functions,
FetchOperation fetchOperation) {
final FetchRows fetchRows = FetchRows.create(
txnCtx,
functions,
projection.fetchSources(),
projection.outputSymbols()
);
}

@Override
public boolean providesIndependentScroll() {
return false;
return (BatchIterator<Row> source) ->
new AsyncOperationBatchIterator<>(
source,
new FetchBatchAccumulator(
fetchRows,
fetchOperation,
projection.nodeReaders(),
projection.getFetchSize()
)
);
}
}
Loading

0 comments on commit 3c3c3d4

Please sign in to comment.