Skip to content

Commit

Permalink
Add support for index joins
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang authored and dain committed Apr 19, 2014
1 parent e27fcac commit 7756868
Show file tree
Hide file tree
Showing 84 changed files with 3,168 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputHandleResolver;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
Expand Down Expand Up @@ -77,4 +78,10 @@ public ConnectorOutputHandleResolver getOutputHandleResolver()
{
throw new UnsupportedOperationException();
}

@Override
public ConnectorIndexResolver getIndexResolver()
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.IndexHandle;
import com.facebook.presto.spi.Split;
import com.facebook.presto.spi.TableHandle;
import com.google.common.base.Objects;
Expand Down Expand Up @@ -52,6 +53,12 @@ public boolean canHandle(Split split)
return split instanceof CassandraSplit && ((CassandraSplit) split).getConnectorId().equals(connectorId);
}

@Override
public boolean canHandle(IndexHandle indexHandle)
{
return false;
}

@Override
public Class<? extends TableHandle> getTableHandleClass()
{
Expand All @@ -70,6 +77,12 @@ public Class<? extends Split> getSplitClass()
return CassandraSplit.class;
}

@Override
public Class<? extends IndexHandle> getIndexHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputHandleResolver;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
Expand Down Expand Up @@ -81,4 +82,10 @@ public ConnectorOutputHandleResolver getOutputHandleResolver()
{
throw new UnsupportedOperationException();
}

@Override
public ConnectorIndexResolver getIndexResolver()
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.IndexHandle;
import com.facebook.presto.spi.Split;
import com.facebook.presto.spi.TableHandle;

Expand Down Expand Up @@ -51,6 +52,12 @@ public boolean canHandle(Split split)
return split instanceof ExampleSplit && ((ExampleSplit) split).getConnectorId().equals(connectorId);
}

@Override
public boolean canHandle(IndexHandle indexHandle)
{
return false;
}

@Override
public Class<? extends TableHandle> getTableHandleClass()
{
Expand All @@ -68,4 +75,10 @@ public Class<? extends Split> getSplitClass()
{
return ExampleSplit.class;
}

@Override
public Class<? extends IndexHandle> getIndexHandleClass()
{
throw new UnsupportedOperationException();
}
}
13 changes: 13 additions & 0 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Domain;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.IndexHandle;
import com.facebook.presto.spi.OutputTableHandle;
import com.facebook.presto.spi.Partition;
import com.facebook.presto.spi.PartitionResult;
Expand Down Expand Up @@ -844,6 +845,12 @@ public boolean canHandle(OutputTableHandle handle)
return (handle instanceof HiveOutputTableHandle) && ((HiveOutputTableHandle) handle).getClientId().equals(connectorId);
}

@Override
public boolean canHandle(IndexHandle indexHandle)
{
return false;
}

@Override
public Class<? extends TableHandle> getTableHandleClass()
{
Expand All @@ -868,6 +875,12 @@ public Class<? extends OutputTableHandle> getOutputTableHandleClass()
return HiveOutputTableHandle.class;
}

@Override
public Class<? extends IndexHandle> getIndexHandleClass()
{
throw new UnsupportedOperationException();
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputHandleResolver;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
Expand Down Expand Up @@ -84,4 +85,10 @@ public ConnectorOutputHandleResolver getOutputHandleResolver()
{
return outputHandleResolver;
}

@Override
public ConnectorIndexResolver getIndexResolver()
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ public DictionaryEncodedBlockCursor(RandomAccessBlock dictionary, BlockCursor id
checkArgument(idCursor.getType().equals(BIGINT), "Expected bigint cursor but got %s cursor", idCursor.getType());
}

public DictionaryEncodedBlockCursor(DictionaryEncodedBlockCursor cursor)
{
this.dictionary = cursor.dictionary;
this.idCursor = cursor.idCursor.duplicate();
}

@Override
public BlockCursor duplicate()
{
return new DictionaryEncodedBlockCursor(this);
}

@Override
public Type getType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ public RunLengthEncodedBlockCursor(RandomAccessBlock value, int positionCount)
position = -1;
}

public RunLengthEncodedBlockCursor(RunLengthEncodedBlockCursor cursor)
{
this.value = cursor.value;
this.positionCount = cursor.positionCount;
this.position = cursor.position;
}

@Override
public BlockCursor duplicate()
{
return new RunLengthEncodedBlockCursor(this);
}

@Override
public Type getType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
*/
package com.facebook.presto.connector;

import com.facebook.presto.index.IndexManager;
import com.facebook.presto.metadata.HandleResolver;
import com.facebook.presto.metadata.MetadataManager;
import com.facebook.presto.metadata.OutputTableHandleResolver;
import com.facebook.presto.operator.RecordSinkManager;
import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorFactory;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputHandleResolver;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
Expand Down Expand Up @@ -47,6 +49,8 @@ public class ConnectorManager
private final MetadataManager metadataManager;
private final SplitManager splitManager;
private final DataStreamManager dataStreamManager;
private final IndexManager indexManager;

private final RecordSinkManager recordSinkManager;
private final HandleResolver handleResolver;
private final OutputTableHandleResolver outputTableHandleResolver;
Expand All @@ -59,6 +63,7 @@ public class ConnectorManager
public ConnectorManager(MetadataManager metadataManager,
SplitManager splitManager,
DataStreamManager dataStreamManager,
IndexManager indexManager,
RecordSinkManager recordSinkManager,
HandleResolver handleResolver,
OutputTableHandleResolver outputTableHandleResolver,
Expand All @@ -68,6 +73,7 @@ public ConnectorManager(MetadataManager metadataManager,
this.metadataManager = metadataManager;
this.splitManager = splitManager;
this.dataStreamManager = dataStreamManager;
this.indexManager = indexManager;
this.recordSinkManager = recordSinkManager;
this.handleResolver = handleResolver;
this.outputTableHandleResolver = outputTableHandleResolver;
Expand Down Expand Up @@ -182,5 +188,17 @@ private void addConnector(@Nullable String catalogName, String connectorId, Conn
if (connectorOutputHandleResolver != null) {
outputTableHandleResolver.addHandleResolver(connectorId, connectorOutputHandleResolver);
}

ConnectorIndexResolver indexResolver = null;
try {
indexResolver = connector.getIndexResolver();
checkNotNull(indexResolver, "Connector %s returned a null index resolver", connectorId);
}
catch (UnsupportedOperationException ignored) {
}

if (indexResolver != null) {
indexManager.addIndexResolver(indexResolver);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorFactory;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputHandleResolver;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
Expand Down Expand Up @@ -106,6 +107,12 @@ public ConnectorRecordSetProvider getRecordSetProvider()
{
throw new UnsupportedOperationException();
}

@Override
public ConnectorIndexResolver getIndexResolver()
{
throw new UnsupportedOperationException();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.connector.InternalConnector;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputHandleResolver;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
Expand Down Expand Up @@ -79,4 +80,10 @@ public ConnectorOutputHandleResolver getOutputHandleResolver()
{
throw new UnsupportedOperationException();
}

@Override
public ConnectorIndexResolver getIndexResolver()
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.IndexHandle;
import com.facebook.presto.spi.Split;
import com.facebook.presto.spi.TableHandle;

Expand All @@ -39,6 +40,12 @@ public boolean canHandle(Split split)
return split instanceof DualSplit;
}

@Override
public boolean canHandle(IndexHandle indexHandle)
{
return false;
}

@Override
public Class<? extends TableHandle> getTableHandleClass()
{
Expand All @@ -56,4 +63,10 @@ public Class<? extends Split> getSplitClass()
{
return DualSplit.class;
}

@Override
public Class<? extends IndexHandle> getIndexHandleClass()
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.IndexHandle;
import com.facebook.presto.spi.Split;
import com.facebook.presto.spi.TableHandle;

Expand All @@ -39,6 +40,12 @@ public boolean canHandle(Split split)
return split instanceof InformationSchemaSplit;
}

@Override
public boolean canHandle(IndexHandle indexHandle)
{
return false;
}

@Override
public Class<? extends TableHandle> getTableHandleClass()
{
Expand All @@ -56,4 +63,10 @@ public Class<? extends Split> getSplitClass()
{
return InformationSchemaSplit.class;
}

@Override
public Class<? extends IndexHandle> getIndexHandleClass()
{
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.Connector;
import com.facebook.presto.spi.ConnectorFactory;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorIndexResolver;
import com.facebook.presto.spi.ConnectorMetadata;
import com.facebook.presto.spi.ConnectorOutputHandleResolver;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
Expand Down Expand Up @@ -94,6 +95,12 @@ public ConnectorRecordSinkProvider getRecordSinkProvider()
throw new UnsupportedOperationException();
}

@Override
public ConnectorIndexResolver getIndexResolver()
{
throw new UnsupportedOperationException();
}

@Override
public ConnectorOutputHandleResolver getOutputHandleResolver()
{
Expand Down
Loading

0 comments on commit 7756868

Please sign in to comment.