Skip to content

Commit

Permalink
This closes apache#4013: [BEAM-1542] A transform for reading the Span…
Browse files Browse the repository at this point in the history
…ner schema
  • Loading branch information
jkff committed Oct 27, 2017
2 parents a6f69bd + 1fd027b commit e686286
Show file tree
Hide file tree
Showing 4 changed files with 433 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner;

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import org.apache.beam.sdk.transforms.DoFn;

/**
* This {@link DoFn} reads Cloud Spanner 'information_schema.*' tables to build the
* {@link SpannerSchema}.
*/
class ReadSpannerSchema extends DoFn<Void, SpannerSchema> {

private final SpannerConfig config;

private transient SpannerAccessor spannerAccessor;

public ReadSpannerSchema(SpannerConfig config) {
this.config = config;
}

@Setup
public void setup() throws Exception {
spannerAccessor = config.connectToSpanner();
}

@Teardown
public void teardown() throws Exception {
spannerAccessor.close();
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
SpannerSchema.Builder builder = SpannerSchema.builder();
DatabaseClient databaseClient = spannerAccessor.getDatabaseClient();
try (ReadOnlyTransaction tx =
databaseClient.readOnlyTransaction()) {
ResultSet resultSet = readTableInfo(tx);

while (resultSet.next()) {
String tableName = resultSet.getString(0);
String columnName = resultSet.getString(1);
String type = resultSet.getString(2);

builder.addColumn(tableName, columnName, type);
}

resultSet = readPrimaryKeyInfo(tx);
while (resultSet.next()) {
String tableName = resultSet.getString(0);
String columnName = resultSet.getString(1);
String ordering = resultSet.getString(2);

builder.addKeyPart(tableName, columnName, ordering.toUpperCase().equals("DESC"));
}
}
c.output(builder.build());
}

private ResultSet readTableInfo(ReadOnlyTransaction tx) {
return tx.executeQuery(Statement.of(
"SELECT c.table_name, c.column_name, c.spanner_type"
+ " FROM information_schema.columns as c"
+ " WHERE where c.table_catalog = '' AND c.table_schema = ''"
+ " ORDER BY c.table_name, c.ordinal_position"));
}

private ResultSet readPrimaryKeyInfo(ReadOnlyTransaction tx) {
return tx.executeQuery(Statement
.of("SELECT t.table_name, t.column_name, t.column_ordering"
+ " FROM information_schema.index_columns AS t "
+ " WHERE t.index_name = 'PRIMARY_KEY' AND t.table_catalog = ''"
+ " AND t.table_schema = ''"
+ " ORDER BY t.table_name, t.ordinal_position"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner;

import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.Type;
import com.google.common.collect.ArrayListMultimap;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
* Encapsulates Cloud Spanner Schema.
*/
class SpannerSchema implements Serializable {
private final List<String> tables;
private final ArrayListMultimap<String, Column> columns;
private final ArrayListMultimap<String, KeyPart> keyParts;

public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link SpannerSchema}.
*/
static class Builder {
private final ArrayListMultimap<String, Column> columns = ArrayListMultimap.create();
private final ArrayListMultimap<String, KeyPart> keyParts = ArrayListMultimap.create();

public Builder addColumn(String table, String name, String type) {
addColumn(table, Column.create(name.toLowerCase(), type));
return this;
}

private Builder addColumn(String table, Column column) {
columns.put(table.toLowerCase(), column);
return this;
}

public Builder addKeyPart(String table, String column, boolean desc) {
keyParts.put(table, KeyPart.create(column.toLowerCase(), desc));
return this;
}

public SpannerSchema build() {
return new SpannerSchema(columns, keyParts);
}
}

private SpannerSchema(ArrayListMultimap<String, Column> columns,
ArrayListMultimap<String, KeyPart> keyParts) {
this.columns = columns;
this.keyParts = keyParts;
tables = new ArrayList<>(columns.keySet());
}

public List<String> getTables() {
return tables;
}

public List<Column> getColumns(String table) {
return columns.get(table);
}

public List<KeyPart> getKeyParts(String table) {
return keyParts.get(table);
}

@AutoValue
abstract static class KeyPart implements Serializable {
static KeyPart create(String field, boolean desc) {
return new AutoValue_SpannerSchema_KeyPart(field, desc);
}

abstract String getField();

abstract boolean isDesc();
}

@AutoValue
abstract static class Column implements Serializable {

static Column create(String name, Type type) {
return new AutoValue_SpannerSchema_Column(name, type);
}

static Column create(String name, String spannerType) {
return create(name, parseSpannerType(spannerType));
}

public abstract String getName();

public abstract Type getType();

private static Type parseSpannerType(String spannerType) {
spannerType = spannerType.toUpperCase();
if (spannerType.equals("BOOL")) {
return Type.bool();
}
if (spannerType.equals("INT64")) {
return Type.int64();
}
if (spannerType.equals("FLOAT64")) {
return Type.float64();
}
if (spannerType.startsWith("STRING")) {
return Type.string();
}
if (spannerType.startsWith("BYTES")) {
return Type.bytes();
}
if (spannerType.equals("TIMESTAMP")) {
return Type.timestamp();
}
if (spannerType.equals("DATE")) {
return Type.date();
}

if (spannerType.startsWith("ARRAY")) {
// Substring "ARRAY<xxx>"
String spannerArrayType = spannerType.substring(6, spannerType.length() - 1);
Type itemType = parseSpannerType(spannerArrayType);
return Type.array(itemType);
}
throw new IllegalArgumentException("Unknown spanner type " + spannerType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner;

import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Value;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatcher;

/**
* A test of {@link ReadSpannerSchemaTest}.
*/
public class ReadSpannerSchemaTest {

@Rule
public final transient ExpectedException thrown = ExpectedException.none();

private FakeServiceFactory serviceFactory;
private ReadOnlyTransaction mockTx;


private static Struct columnMetadata(String tableName, String columnName, String type) {
return Struct.newBuilder().add("table_name", Value.string(tableName))
.add("column_name", Value.string(columnName)).add("spanner_type", Value.string(type))
.build();
}

private static Struct pkMetadata(String tableName, String columnName, String ordering) {
return Struct.newBuilder().add("table_name", Value.string(tableName))
.add("column_name", Value.string(columnName)).add("column_ordering", Value.string(ordering))
.build();
}

private void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
Type.StructField.of("column_name", Type.string()),
Type.StructField.of("spanner_type", Type.string()));
when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {

@Override public boolean matches(Object argument) {
if (!(argument instanceof Statement)) {
return false;
}
Statement st = (Statement) argument;
return st.getSql().contains("information_schema.columns");
}
}))).thenReturn(ResultSets.forRows(type, rows));
}

private void preparePkMetadata(ReadOnlyTransaction tx, List<Struct> rows) {
Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
Type.StructField.of("column_name", Type.string()),
Type.StructField.of("column_ordering", Type.string()));
when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {

@Override public boolean matches(Object argument) {
if (!(argument instanceof Statement)) {
return false;
}
Statement st = (Statement) argument;
return st.getSql().contains("information_schema.index_columns");
}
}))).thenReturn(ResultSets.forRows(type, rows));
}

@Before
@SuppressWarnings("unchecked")
public void setUp() throws Exception {
serviceFactory = new FakeServiceFactory();
mockTx = mock(ReadOnlyTransaction.class);
}

@Test
public void simple() throws Exception {
// Simplest schema: a table with int64 key
ReadOnlyTransaction tx = mock(ReadOnlyTransaction.class);
when(serviceFactory.mockDatabaseClient().readOnlyTransaction()).thenReturn(tx);

preparePkMetadata(tx, Arrays.asList(pkMetadata("test", "key", "ASC")));
prepareColumnMetadata(tx, Arrays.asList(columnMetadata("test", "key", "INT64")));

SpannerConfig config = SpannerConfig.create().withProjectId("test-project")
.withInstanceId("test-instance").withDatabaseId("test-database")
.withServiceFactory(serviceFactory);

DoFnTester<Void, SpannerSchema> tester = DoFnTester.of(new ReadSpannerSchema(config));
List<SpannerSchema> schemas = tester.processBundle(Arrays.asList((Void) null));

assertEquals(1, schemas.size());

SpannerSchema schema = schemas.get(0);

assertEquals(1, schema.getTables().size());

SpannerSchema.Column column = SpannerSchema.Column.create("key", Type.int64());
SpannerSchema.KeyPart keyPart = SpannerSchema.KeyPart.create("key", false);

assertThat(schema.getColumns("test"), contains(column));
assertThat(schema.getKeyParts("test"), contains(keyPart));
}

}
Loading

0 comments on commit e686286

Please sign in to comment.