Skip to content

Commit

Permalink
Pass session to all connector methods
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Jul 25, 2015
1 parent a7ebcba commit 1104864
Show file tree
Hide file tree
Showing 108 changed files with 768 additions and 612 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, SchemaTableName
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorTableHandle table)
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
JdbcTableHandle handle = checkType(table, JdbcTableHandle.class, "tableHandle");

Expand All @@ -86,13 +86,13 @@ public List<SchemaTableName> listTables(ConnectorSession session, String schemaN
}

@Override
public ColumnHandle getSampleWeightColumnHandle(ConnectorTableHandle tableHandle)
public ColumnHandle getSampleWeightColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return null;
}

@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorTableHandle tableHandle)
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
JdbcTableHandle jdbcTableHandle = checkType(tableHandle, JdbcTableHandle.class, "tableHandle");

Expand All @@ -113,7 +113,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
if (tableHandle == null) {
continue;
}
columns.put(tableName, getTableMetadata(tableHandle).getColumns());
columns.put(tableName, getTableMetadata(session, tableHandle).getColumns());
}
catch (TableNotFoundException e) {
// table disappeared during listing operation
Expand All @@ -123,7 +123,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
checkType(tableHandle, JdbcTableHandle.class, "tableHandle");
return checkType(columnHandle, JdbcColumnHandle.class, "columnHandle").getColumnMetadata();
Expand All @@ -142,7 +142,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
}

@Override
public void dropTable(ConnectorTableHandle tableHandle)
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
if (!allowDropTable) {
throw new PrestoException(PERMISSION_DENIED, "DROP TABLE is disabled in this catalog");
Expand All @@ -158,14 +158,14 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}

@Override
public void commitCreateTable(ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
public void commitCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
{
JdbcOutputTableHandle handle = checkType(tableHandle, JdbcOutputTableHandle.class, "tableHandle");
jdbcClient.commitCreateTable(handle, fragments);
}

@Override
public void renameTable(ConnectorTableHandle tableHandle, SchemaTableName newTableName)
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName)
{
throw new PrestoException(NOT_SUPPORTED, "This connector does not support renaming tables");
}
Expand All @@ -177,7 +177,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
}

@Override
public void commitInsert(ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
public void commitInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorRecordSetProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.RecordSet;
import com.google.common.collect.ImmutableList;
Expand All @@ -38,7 +39,7 @@ public JdbcRecordSetProvider(JdbcClient jdbcClient)
}

@Override
public RecordSet getRecordSet(ConnectorSplit split, List<? extends ColumnHandle> columns)
public RecordSet getRecordSet(ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns)
{
JdbcSplit jdbcSplit = checkType(split, JdbcSplit.class, "split");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorRecordSinkProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.RecordSink;

import javax.inject.Inject;
Expand All @@ -35,13 +36,13 @@ public JdbcRecordSinkProvider(JdbcClient jdbcClient)
}

@Override
public RecordSink getRecordSink(ConnectorOutputTableHandle tableHandle)
public RecordSink getRecordSink(ConnectorSession session, ConnectorOutputTableHandle tableHandle)
{
return new JdbcRecordSink(checkType(tableHandle, JdbcOutputTableHandle.class, "tableHandle"), jdbcClient);
}

@Override
public RecordSink getRecordSink(ConnectorInsertTableHandle tableHandle)
public RecordSink getRecordSink(ConnectorSession session, ConnectorInsertTableHandle tableHandle)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartition;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitManager;
import com.facebook.presto.spi.ConnectorSplitSource;
Expand Down Expand Up @@ -46,14 +47,14 @@ public JdbcSplitManager(JdbcConnectorId connectorId, JdbcClient jdbcClient)
}

@Override
public ConnectorPartitionResult getPartitions(ConnectorTableHandle tableHandle, TupleDomain<ColumnHandle> tupleDomain)
public ConnectorPartitionResult getPartitions(ConnectorSession session, ConnectorTableHandle tableHandle, TupleDomain<ColumnHandle> tupleDomain)
{
JdbcTableHandle handle = checkType(tableHandle, JdbcTableHandle.class, "tableHandle");
return jdbcClient.getPartitions(handle, tupleDomain);
}

@Override
public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle tableHandle, List<ConnectorPartition> partitions)
public ConnectorSplitSource getPartitionSplits(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorPartition> partitions)
{
if (partitions.isEmpty()) {
return new FixedSplitSource(connectorId, ImmutableList.<ConnectorSplit>of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testGetTableHandle()
public void testGetColumnHandles()
{
// known table
assertEquals(metadata.getColumnHandles(tableHandle), ImmutableMap.of(
assertEquals(metadata.getColumnHandles(SESSION, tableHandle), ImmutableMap.of(
"text", new JdbcColumnHandle(CONNECTOR_ID, "TEXT", VARCHAR),
"value", new JdbcColumnHandle(CONNECTOR_ID, "VALUE", BIGINT)));

Expand All @@ -95,7 +95,7 @@ public void testGetColumnHandles()
private void unknownTableColumnHandle(JdbcTableHandle tableHandle)
{
try {
metadata.getColumnHandles(tableHandle);
metadata.getColumnHandles(SESSION, tableHandle);
fail("Expected getColumnHandle of unknown table to throw a TableNotFoundException");
}
catch (TableNotFoundException ignored) {
Expand All @@ -106,7 +106,7 @@ private void unknownTableColumnHandle(JdbcTableHandle tableHandle)
public void getTableMetadata()
{
// known table
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(tableHandle);
ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(SESSION, tableHandle);
assertEquals(tableMetadata.getTable(), new SchemaTableName("example", "numbers"));
assertEquals(tableMetadata.getColumns(), ImmutableList.of(
new ColumnMetadata("text", VARCHAR, false),
Expand All @@ -121,7 +121,7 @@ public void getTableMetadata()
private void unknownTableMetadata(JdbcTableHandle tableHandle)
{
try {
metadata.getTableMetadata(tableHandle);
metadata.getTableMetadata(SESSION, tableHandle);
fail("Expected getTableMetadata of unknown table to throw a TableNotFoundException");
}
catch (TableNotFoundException ignored) {
Expand Down Expand Up @@ -152,7 +152,7 @@ public void testListTables()
public void getColumnMetadata()
{
assertEquals(
metadata.getColumnMetadata(tableHandle, new JdbcColumnHandle(CONNECTOR_ID, "text", VARCHAR)),
metadata.getColumnMetadata(SESSION, tableHandle, new JdbcColumnHandle(CONNECTOR_ID, "text", VARCHAR)),
new ColumnMetadata("text", VARCHAR, false));
}

Expand All @@ -168,7 +168,7 @@ public void testCreateTable()
public void testDropTableTable()
{
try {
metadata.dropTable(tableHandle);
metadata.dropTable(SESSION, tableHandle);
fail("expected exception");
}
catch (PrestoException e) {
Expand All @@ -177,10 +177,10 @@ public void testDropTableTable()

JdbcMetadataConfig config = new JdbcMetadataConfig().setAllowDropTable(true);
metadata = new JdbcMetadata(new JdbcConnectorId(CONNECTOR_ID), database.getJdbcClient(), config);
metadata.dropTable(tableHandle);
metadata.dropTable(SESSION, tableHandle);

try {
metadata.getTableMetadata(tableHandle);
metadata.getTableMetadata(SESSION, tableHandle);
fail("expected exception");
}
catch (PrestoException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.Domain;
import com.facebook.presto.spi.Range;
Expand All @@ -33,14 +34,18 @@
import java.util.List;
import java.util.Map;

import static com.facebook.presto.spi.type.TimeZoneKey.UTC_KEY;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static java.util.Locale.ENGLISH;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

@Test
public class TestJdbcRecordSetProvider
{
private static final ConnectorSession SESSION = new ConnectorSession("user", UTC_KEY, ENGLISH, System.currentTimeMillis(), null);

private TestingDatabase database;
private JdbcClient jdbcClient;
private JdbcSplit split;
Expand Down Expand Up @@ -76,7 +81,7 @@ public void testGetRecordSet()
throws Exception
{
JdbcRecordSetProvider recordSetProvider = new JdbcRecordSetProvider(jdbcClient);
RecordSet recordSet = recordSetProvider.getRecordSet(split, ImmutableList.of(textColumn, valueColumn));
RecordSet recordSet = recordSetProvider.getRecordSet(SESSION, split, ImmutableList.of(textColumn, valueColumn));
assertNotNull(recordSet, "recordSet is null");

RecordCursor cursor = recordSet.cursor();
Expand Down Expand Up @@ -184,7 +189,7 @@ private RecordCursor getCursor(JdbcTableHandle jdbcTableHandle, List<JdbcColumnH
JdbcSplit split = (JdbcSplit) getOnlyElement(getFutureValue(splits.getNextBatch(1000)));

JdbcRecordSetProvider recordSetProvider = new JdbcRecordSetProvider(jdbcClient);
RecordSet recordSet = recordSetProvider.getRecordSet(split, columns);
RecordSet recordSet = recordSetProvider.getRecordSet(SESSION, split, columns);

return recordSet.cursor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorTableHandle tableHandle)
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
{
BlackHoleTableHandle blackHoleTableHandle = checkType(tableHandle, BlackHoleTableHandle.class, "tableHandle");
return blackHoleTableHandle.toTableMetadata(typeManager);
Expand All @@ -93,7 +93,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, String schemaN
}

@Override
public ColumnHandle getSampleWeightColumnHandle(ConnectorTableHandle tableHandle)
public ColumnHandle getSampleWeightColumnHandle(ConnectorSession session, ConnectorTableHandle tableHandle)
{
// returns null as the table does not contain sampled data
// (see {@link com.facebook.presto.spi.ConnectorMetadata.getSampleWeightColumnHandle()}
Expand All @@ -107,15 +107,15 @@ public boolean canCreateSampledTables(ConnectorSession session)
}

@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorTableHandle tableHandle)
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
BlackHoleTableHandle blackHoleTableHandle = checkType(tableHandle, BlackHoleTableHandle.class, "tableHandle");
return blackHoleTableHandle.getColumnHandles().stream()
.collect(toMap(BlackHoleColumnHandle::getName, column -> column));
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
BlackHoleColumnHandle blackHoleColumnHandle = checkType(columnHandle, BlackHoleColumnHandle.class, "columnHandle");
return blackHoleColumnHandle.toColumnMetadata(typeManager);
Expand All @@ -130,14 +130,14 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
}

@Override
public void dropTable(ConnectorTableHandle tableHandle)
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
BlackHoleTableHandle blackHoleTableHandle = checkType(tableHandle, BlackHoleTableHandle.class, "tableHandle");
tables.remove(blackHoleTableHandle.getTableName());
}

@Override
public void renameTable(ConnectorTableHandle tableHandle, SchemaTableName newTableName)
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName)
{
BlackHoleTableHandle oldTableHandle = checkType(tableHandle, BlackHoleTableHandle.class, "tableHandle");
BlackHoleTableHandle newTableHandle = new BlackHoleTableHandle(
Expand All @@ -155,7 +155,7 @@ public void renameTable(ConnectorTableHandle tableHandle, SchemaTableName newTab
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
ConnectorOutputTableHandle outputTableHandle = beginCreateTable(session, tableMetadata);
commitCreateTable(outputTableHandle, ImmutableList.of());
commitCreateTable(session, outputTableHandle, ImmutableList.of());
}

@Override
Expand All @@ -165,7 +165,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}

@Override
public void commitCreateTable(ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
public void commitCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments)
{
BlackHoleOutputTableHandle blackHoleOutputTableHandle = checkType(tableHandle, BlackHoleOutputTableHandle.class, "tableHandle");
BlackHoleTableHandle table = blackHoleOutputTableHandle.getTable();
Expand All @@ -179,7 +179,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
}

@Override
public void commitInsert(ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
public void commitInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments)
{
}

Expand Down Expand Up @@ -213,13 +213,16 @@ public Map<SchemaTableName, String> getViews(ConnectorSession session, SchemaTab
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
ConnectorTableHandle table,
Constraint<ColumnHandle> constraint,
Optional<Set<ColumnHandle>> desiredColumns)
{
return ImmutableList.of(new ConnectorTableLayoutResult(getTableLayout(BLACK_HOLE_TABLE_LAYOUT_HANDLE), TupleDomain.none()));
return ImmutableList.of(new ConnectorTableLayoutResult(getTableLayout(session, BLACK_HOLE_TABLE_LAYOUT_HANDLE), TupleDomain.none()));
}

@Override
public ConnectorTableLayout getTableLayout(ConnectorTableLayoutHandle handle)
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
{
return new ConnectorTableLayout(handle, Optional.empty(), TupleDomain.none(), Optional.empty(), Optional.empty(), ImmutableList.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorPageSinkProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.google.common.collect.ImmutableList;
Expand All @@ -29,13 +30,13 @@ public class BlackHolePageSinkProvider
implements ConnectorPageSinkProvider
{
@Override
public ConnectorPageSink createPageSink(ConnectorOutputTableHandle outputTableHandle)
public ConnectorPageSink createPageSink(ConnectorSession session, ConnectorOutputTableHandle outputTableHandle)
{
return new NoOpConnectorPageSink();
}

@Override
public ConnectorPageSink createPageSink(ConnectorInsertTableHandle insertTableHandle)
public ConnectorPageSink createPageSink(ConnectorSession session, ConnectorInsertTableHandle insertTableHandle)
{
return new NoOpConnectorPageSink();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorPageSourceProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.InMemoryRecordSet;
import com.facebook.presto.spi.RecordPageSource;
Expand All @@ -28,7 +29,7 @@ public final class BlackHolePageSourceProvider
implements ConnectorPageSourceProvider
{
@Override
public ConnectorPageSource createPageSource(ConnectorSplit split, List<ColumnHandle> columns)
public ConnectorPageSource createPageSource(ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns)
{
return new RecordPageSource(new InMemoryRecordSet(ImmutableList.of(), ImmutableList.of()));
}
Expand Down
Loading

0 comments on commit 1104864

Please sign in to comment.