Skip to content

Commit

Permalink
[CALCITE-1941] Refine interface Schema#snapshot()
Browse files Browse the repository at this point in the history
  • Loading branch information
maryannxue committed Aug 30, 2017
1 parent 1e7ae1c commit 3520913
Show file tree
Hide file tree
Showing 16 changed files with 156 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlDialect;
Expand Down Expand Up @@ -163,11 +164,7 @@ public boolean isMutable() {
return false;
}

public boolean contentsHaveChangedSince(long lastCheck, long now) {
return false;
}

public Schema snapshot(long now) {
public Schema snapshot(SchemaVersion version) {
return new JdbcSchema(dataSource, dialect, convention, catalog, schema,
tableMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TableMacro;
import org.apache.calcite.util.NameMap;
Expand Down Expand Up @@ -214,12 +215,12 @@ protected TableEntry getImplicitTableBasedOnNullaryFunction(String tableName,
return null;
}

protected CalciteSchema snapshot(CalciteSchema parent, long now) {
protected CalciteSchema snapshot(CalciteSchema parent, SchemaVersion version) {
CalciteSchema snapshot = new CachingCalciteSchema(parent,
schema.snapshot(now), name, null, tableMap, latticeMap,
schema.snapshot(version), name, null, tableMap, latticeMap,
functionMap, functionNames, nullaryFunctionMap, getPath());
for (CalciteSchema subSchema : subSchemaMap.map().values()) {
CalciteSchema subSchemaSnapshot = subSchema.snapshot(snapshot, now);
CalciteSchema subSchemaSnapshot = subSchema.snapshot(snapshot, version);
snapshot.subSchemaMap.put(subSchema.name, subSchemaSnapshot);
}
return snapshot;
Expand Down Expand Up @@ -247,25 +248,24 @@ private interface Cached<T> {
* @param <T> element type */
private abstract class AbstractCached<T> implements Cached<T> {
T t;
long checked = Long.MIN_VALUE;
boolean built = false;

public T get(long now) {
if (!CachingCalciteSchema.this.cache) {
return build();
}
if (checked == Long.MIN_VALUE
|| schema.contentsHaveChangedSince(checked, now)) {
if (!built) {
t = build();
}
checked = now;
built = true;
return t;
}

public void enable(long now, boolean enabled) {
if (!enabled) {
t = null;
}
checked = Long.MIN_VALUE;
built = false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.jdbc.CalcitePrepare.Context;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;

Expand Down Expand Up @@ -76,6 +77,9 @@ public interface CalciteConnection extends Connection, QueryProvider {
String getSchema() throws SQLException;

CalciteConnectionConfig config();

/** Creates a context for preparing a statement for execution. */
Context createPrepareContext();
}

// End CalciteConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.jdbc.CalcitePrepare.Context;
import org.apache.calcite.linq4j.BaseQueryable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
Expand All @@ -47,8 +48,10 @@
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.LongSchemaVersion;
import org.apache.calcite.server.CalciteServer;
import org.apache.calcite.server.CalciteServerStatement;
import org.apache.calcite.sql.advise.SqlAdvisor;
Expand Down Expand Up @@ -139,6 +142,10 @@ public CalciteConnectionConfig config() {
return new CalciteConnectionConfigImpl(info);
}

public Context createPrepareContext() {
return new ContextImpl(this);
}

/** Called after the constructor has completed and the model has been
* loaded. */
void init() {
Expand Down Expand Up @@ -193,7 +200,7 @@ private CalcitePreparedStatement prepareStatement_(
int resultSetHoldability) throws SQLException {
try {
final Meta.Signature signature =
parseQuery(query, new ContextImpl(this), -1);
parseQuery(query, createPrepareContext(), -1);
final CalcitePreparedStatement calcitePreparedStatement =
(CalcitePreparedStatement) factory.newPreparedStatement(this, null,
signature, resultSetType, resultSetConcurrency, resultSetHoldability);
Expand Down Expand Up @@ -463,7 +470,8 @@ static class ContextImpl implements CalcitePrepare.Context {
ContextImpl(CalciteConnectionImpl connection) {
this.connection = Preconditions.checkNotNull(connection);
long now = System.currentTimeMillis();
this.rootSchema = connection.rootSchema.createSnapshot(now);
SchemaVersion schemaVersion = new LongSchemaVersion(now);
this.rootSchema = connection.rootSchema.createSnapshot(schemaVersion);
}

public JavaTypeFactory getTypeFactory() {
Expand Down Expand Up @@ -532,8 +540,8 @@ static class CalciteServerStatementImpl
this.connection = Preconditions.checkNotNull(connection);
}

public ContextImpl createPrepareContext() {
return new ContextImpl(connection);
public Context createPrepareContext() {
return connection.createPrepareContext();
}

public CalciteConnection getConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ interface Context {
*
* <p>The object is being analyzed is typically a view. If it is already
* being analyzed further up the stack, the view definition can be deduced
* to be cylic. */
* to be cyclic. */
List<String> getObjectPath();
}

Expand Down
21 changes: 9 additions & 12 deletions core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TableMacro;
import org.apache.calcite.schema.impl.MaterializedViewTable;
Expand Down Expand Up @@ -144,7 +145,8 @@ protected abstract void addImplicitTablesBasedOnNullaryFunctionsToBuilder(
ImmutableSortedMap.Builder<String, Table> builder);

/** Returns a snapshot representation of this CalciteSchema. */
protected abstract CalciteSchema snapshot(CalciteSchema parent, long now);
protected abstract CalciteSchema snapshot(
CalciteSchema parent, SchemaVersion version);

protected abstract boolean isCacheEnabled();

Expand Down Expand Up @@ -380,21 +382,20 @@ public final TableEntry getTableBasedOnNullaryFunction(String tableName,
/** Creates a snapshot of this CalciteSchema as of the specified time. All
* explicit objects in this CalciteSchema will be copied into the snapshot
* CalciteSchema, while the contents of the snapshot of the underlying schema
* should not change as specified in {@link Schema#snapshot(long)}. Snapshots
* of explicit sub schemas will be created and copied recursively.
* should not change as specified in {@link Schema#snapshot(SchemaVersion)}.
* Snapshots of explicit sub schemas will be created and copied recursively.
*
* <p>Currently, to accommodate the requirement of creating tables on the fly
* for materializations, the snapshot will still use the same table map and
* lattice map as in the original CalciteSchema instead of making copies.</p>
*
* @param now The current time in millis, as returned by
* {@link System#currentTimeMillis()}
* @param version The current schema version
*
* @return the schema snapshot.
*/
public CalciteSchema createSnapshot(long now) {
public CalciteSchema createSnapshot(SchemaVersion version) {
Preconditions.checkArgument(this.isRoot(), "must be root schema");
return snapshot(null, now);
return snapshot(null, version);
}

/** Returns a subset of a map whose keys match the given string
Expand Down Expand Up @@ -548,11 +549,7 @@ public boolean isCacheEnabled() {
return CalciteSchema.this.isCacheEnabled();
}

public boolean contentsHaveChangedSince(long lastCheck, long now) {
return schema.contentsHaveChangedSince(lastCheck, now);
}

public Schema snapshot(long now) {
public Schema snapshot(SchemaVersion version) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ public abstract class CalciteStatement extends AvaticaStatement {
return (CalciteConnectionImpl) connection;
}

public CalciteConnectionImpl.ContextImpl createPrepareContext() {
return new CalciteConnectionImpl.ContextImpl(getConnection());
}

protected <T> CalcitePrepare.CalciteSignature<T> prepare(
Queryable<T> queryable) {
final CalciteConnectionImpl calciteConnection = getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TableMacro;
import org.apache.calcite.util.NameMap;
Expand Down Expand Up @@ -152,12 +153,12 @@ protected TableEntry getImplicitTableBasedOnNullaryFunction(String tableName,
return null;
}

protected CalciteSchema snapshot(CalciteSchema parent, long now) {
protected CalciteSchema snapshot(CalciteSchema parent, SchemaVersion version) {
CalciteSchema snapshot = new SimpleCalciteSchema(parent,
schema.snapshot(now), name, null, tableMap, latticeMap,
schema.snapshot(version), name, null, tableMap, latticeMap,
functionMap, functionNames, nullaryFunctionMap, getPath());
for (CalciteSchema subSchema : subSchemaMap.map().values()) {
CalciteSchema subSchemaSnapshot = subSchema.snapshot(snapshot, now);
CalciteSchema subSchemaSnapshot = subSchema.snapshot(snapshot, version);
snapshot.subSchemaMap.put(subSchema.name, subSchemaSnapshot);
}
return snapshot;
Expand Down
7 changes: 1 addition & 6 deletions core/src/main/java/org/apache/calcite/model/JsonSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,7 @@ public abstract class JsonSchema {
* <p>If {@code true}, Calcite will cache the metadata the first time it reads
* it. This can lead to better performance, especially if name-matching is
* case-insensitive
* (see {@link org.apache.calcite.config.Lex#caseSensitive}).
* However, it also leads to the problem of cache staleness.
* A particular schema implementation can override the
* {@link org.apache.calcite.schema.Schema#contentsHaveChangedSince(long, long)}
* method to tell Calcite when it should consider its cache to be out of
* date.</p>
* (see {@link org.apache.calcite.config.Lex#caseSensitive}).</p>
*
* <p>Tables, functions and sub-schemas explicitly created in a schema are
* not affected by this caching mechanism. They always appear in the schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
Expand Down Expand Up @@ -430,12 +431,7 @@ public static MySchemaPlus create(Path path) {
return schema.getExpression(parentSchema, name);
}

@Override public boolean contentsHaveChangedSince(long lastCheck,
long now) {
return schema.contentsHaveChangedSince(lastCheck, now);
}

@Override public Schema snapshot(long now) {
@Override public Schema snapshot(SchemaVersion version) {
throw new UnsupportedOperationException();
}
}
Expand Down
26 changes: 2 additions & 24 deletions core/src/main/java/org/apache/calcite/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,36 +121,14 @@ public interface Schema {
*/
boolean isMutable();

/** Returns whether the contents of this schema have changed since a given
* time. The time is a millisecond value, as returned by
* {@link System#currentTimeMillis()}. If this method returns true, and
* caching is enabled, Calcite will re-build caches.
*
* <p>The default implementation in
* {@link org.apache.calcite.schema.impl.AbstractSchema} always returns
* {@code false}.</p>
*
* <p>To control whether Calcite caches the contents of a schema, use the
* "cache" JSON attribute. The default value is "true".</p>
*
* @param lastCheck The last time that Calcite called this method, or
* {@link Long#MIN_VALUE} if this is the first call
* @param now The current time in millis, as returned by
* {@link System#currentTimeMillis()}
*
* @return Whether contents changed after {@code lastCheckMillis}.
*/
boolean contentsHaveChangedSince(long lastCheck, long now);

/** Returns the snapshot of this schema as of the specified time. The
* contents of the schema snapshot should not change over time.
*
* @param now The current time in millis, as returned by
* {@link System#currentTimeMillis()}
* @param version The current schema version
*
* @return the schema snapshot.
*/
Schema snapshot(long now);
Schema snapshot(SchemaVersion version);

/** Table type. */
enum TableType {
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/java/org/apache/calcite/schema/SchemaVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.schema;

/**
* An interface to represent a version ID that can be used to create a
* read-consistent view of a Schema. This interface assumes a strict
* partial ordering contract that is:
* <ol>
* <li>irreflexive: !a.isBefore(a), which means a cannot happen before itself;
* <li>transitive: if a.isBefore(b) and b.isBefore(c) then a.isBefore(c);
* <li>antisymmetric: if a.isBefore(b) then !b.isBefore(a).
* </ol>
* Implementation classes of this interface must also override equals(Object),
* hashCode() and toString().
*
* @see Schema#snapshot(SchemaVersion)
*/
public interface SchemaVersion {

/**
* Returns if this Version happens before the other Version.
* @param other the other Version object
*/
boolean isBefore(SchemaVersion other);
}

// End SchemaVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Table;

Expand Down Expand Up @@ -61,11 +62,7 @@ public boolean isMutable() {
return true;
}

public boolean contentsHaveChangedSince(long lastCheck, long now) {
return false;
}

public Schema snapshot(long now) {
public Schema snapshot(SchemaVersion version) {
return this;
}

Expand Down
Loading

0 comments on commit 3520913

Please sign in to comment.