Skip to content

Commit

Permalink
SAMZA-1610: Implementation of remote table provider
Browse files Browse the repository at this point in the history
Please see commit messages for detailed descriptions.

Author: Peng Du <[email protected]>

Reviewers: Jagadish <[email protected]>, Wei Song <[email protected]>

Closes apache#432 from pdu-mn1/remote-table-0222
pdu-mn1 authored and jagadish-v0 committed Mar 9, 2018

Verified

This commit was signed with the committer’s verified signature.
Duhemm Martin Duhem
1 parent 1971d59 commit 2be7061
Showing 26 changed files with 1,581 additions and 415 deletions.
Original file line number Diff line number Diff line change
@@ -22,7 +22,9 @@
import java.util.Map;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.operators.KV;
import org.apache.samza.task.TaskContext;


/**
@@ -34,6 +36,14 @@
*/
@InterfaceStability.Unstable
public interface ReadableTable<K, V> extends Table<KV<K, V>> {
/**
* Initializes the table during container initialization.
* Guaranteed to be invoked as the first operation on the table.
* @param containerContext Samza container context
* @param taskContext nullable for global table
*/
default void init(SamzaContainerContext containerContext, TaskContext taskContext) {
}

/**
* Gets the value associated with the specified {@code key}.
@@ -57,5 +67,4 @@ public interface ReadableTable<K, V> extends Table<KV<K, V>> {
* Close the table and release any resources acquired
*/
void close();

}
18 changes: 11 additions & 7 deletions samza-api/src/main/java/org/apache/samza/table/TableProvider.java
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@
import java.util.Map;

import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.task.TaskContext;


/**
@@ -29,6 +31,13 @@
*/
@InterfaceStability.Unstable
public interface TableProvider {
/**
* Initialize TableProvider with container and task context
* @param containerContext Samza container context
* @param taskContext nullable for global table
*/
void init(SamzaContainerContext containerContext, TaskContext taskContext);

/**
* Get an instance of the table for read/write operations
* @return the underlying table
@@ -46,12 +55,7 @@ public interface TableProvider {
Map<String, String> generateConfig(Map<String, String> config);

/**
* Start the underlying table
*/
void start();

/**
* Stop the underlying table
* Shutdown the underlying table
*/
void stop();
void close();
}
20 changes: 4 additions & 16 deletions samza-api/src/main/java/org/apache/samza/util/RateLimiter.java
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@

import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.samza.annotation.InterfaceStability;
@@ -43,7 +44,6 @@
* <ul>
* <li>Block indefinitely until requested credits become available</li>
* <li>Block for a provided amount of time, then return available credits</li>
* <li>Non-blocking, returns immediately available credits</li>
* </ul>
*
*/
@@ -79,15 +79,6 @@ public interface RateLimiter extends Serializable {
*/
int acquire(int numberOfCredit, long timeout, TimeUnit unit);

/**
* Attempt to acquire the provided number of credits, returns immediately number of
* credits acquired.
*
* @param numberOfCredit requested number of credits
* @return number of credits acquired
*/
int tryAcquire(int numberOfCredit);

/**
* Attempt to acquire the provided number of credits for a number of tags, blocks indefinitely
* until all requested credits become available
@@ -110,11 +101,8 @@ public interface RateLimiter extends Serializable {
Map<String, Integer> acquire(Map<String, Integer> tagToCreditMap, long timeout, TimeUnit unit);

/**
* Attempt to acquire the provided number of credits for a number of tags, returns immediately number of
* credits acquired.
*
* @param tagToCreditMap a map of requested number of credits keyed by tag
* @return a map of number of credits acquired keyed by tag
* Get the entire set of tags for which we have configured credits for rate limiting.
* @return set of supported tags
*/
Map<String, Integer> tryAcquire(Map<String, Integer> tagToCreditMap);
Set<String> getSupportedTags();
}
50 changes: 19 additions & 31 deletions samza-core/src/main/java/org/apache/samza/table/TableManager.java
Original file line number Diff line number Diff line change
@@ -24,13 +24,17 @@
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaTableConfig;
import org.apache.samza.container.SamzaContainerContext;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.StorageEngine;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;


/**
* A {@link TableManager} manages tables within a Samza task. For each table, it maintains
* the {@link TableSpec} and the {@link TableProvider}. It is used at execution for
@@ -61,15 +65,14 @@ static public class TableCtx {
// tableId -> TableCtx
private final Map<String, TableCtx> tables = new HashMap<>();

private boolean localTablesInitialized;
private boolean initialized;

/**
* Construct a table manager instance
* @param config the job configuration
* @param config job configuration
* @param serdes Serde instances for tables
*/
public TableManager(Config config, Map<String, Serde<Object>> serdes) {

new JavaTableConfig(config).getTableIds().forEach(tableId -> {

// Construct the table provider
@@ -91,23 +94,14 @@ public TableManager(Config config, Map<String, Serde<Object>> serdes) {
}

/**
* Initialize all local table
* @param stores stores created locally
* Initialize table providers with container and task contexts
* @param containerContext context for the Samza container
* @param taskContext context for the current task, nullable for global tables
*/
public void initLocalTables(Map<String, StorageEngine> stores) {
tables.values().forEach(ctx -> {
if (ctx.tableProvider instanceof LocalStoreBackedTableProvider) {
StorageEngine store = stores.get(ctx.tableSpec.getId());
if (store == null) {
throw new SamzaException(String.format(
"Backing store for table %s was not injected by SamzaContainer",
ctx.tableSpec.getId()));
}
((LocalStoreBackedTableProvider) ctx.tableProvider).init(store);
}
});

localTablesInitialized = true;
public void init(SamzaContainerContext containerContext, TaskContext taskContext) {
Preconditions.checkNotNull(containerContext, "null container context.");
tables.values().forEach(ctx -> ctx.tableProvider.init(containerContext, taskContext));
initialized = true;
}

/**
@@ -125,18 +119,11 @@ private void addTable(TableSpec tableSpec) {
tables.put(tableSpec.getId(), ctx);
}

/**
* Start the table manager, internally it starts all tables
*/
public void start() {
tables.values().forEach(ctx -> ctx.tableProvider.start());
}

/**
* Shutdown the table manager, internally it shuts down all tables
*/
public void shutdown() {
tables.values().forEach(ctx -> ctx.tableProvider.stop());
public void close() {
tables.values().forEach(ctx -> ctx.tableProvider.close());
}

/**
@@ -145,9 +132,10 @@ public void shutdown() {
* @return table instance
*/
public Table getTable(String tableId) {
if (!localTablesInitialized) {
throw new IllegalStateException("Local tables in TableManager not initialized.");
if (!initialized) {
throw new IllegalStateException("TableManager has not been initialized.");
}
Preconditions.checkArgument(tables.containsKey(tableId), "Unknown tableId=" + tableId);
return tables.get(tableId).tableProvider.getTable();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.table.remote;

import java.io.Serializable;
import java.util.function.Function;

import org.apache.samza.operators.KV;


/**
* Function interface for providing rate limiting credits for each table record.
* This interface allows callers to pass in lambda expressions which are otherwise
* non-serializable as-is.
* @param <K> the type of the key
* @param <V> the type of the value
*/
public interface CreditFunction<K, V> extends Function<KV<K, V>, Integer>, Serializable {
}
Loading

0 comments on commit 2be7061

Please sign in to comment.