Skip to content

Commit

Permalink
[Functions] reorganize the context hierarchy for functions (apache#10631
Browse files Browse the repository at this point in the history
)

### Motivation

Currently the context relationship for function, source and sink is not well defined. This prevents some common features to be added once for all and creates some confusion, code duplication in the current repo. As demonstrated in the following graph, this PR changes the hierarchy from left to right. By introducing a common base context, it help solving some issues we are seeing. The base context provides common access to pulsar cluster, state, metrics, and meta-data to make sure all components can reuse it.

![context hierarchy](https://user-images.githubusercontent.com/16407807/118730483-8ebf5200-b7ec-11eb-9220-d41261f148bb.png)



### Modifications

- Remove `ConnectorContext` interface.
- Introduce a `BaseContext` interface. 
- Update existing `Context`, `SourceContext`, `SinkContext` interface to extend the new common interface.
  • Loading branch information
nlu90 authored Jun 23, 2021
1 parent 1a80429 commit 999329a
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public interface Producer extends Closeable {
void flush() throws PulsarClientException;

/**
* Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
* Flush all the messages buffered in the client asynchronously.
*
* @return a future that can be used to track when all the messages have been safely persisted.
* @since 2.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,73 +16,67 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.core;
package org.apache.pulsar.functions.api;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.functions.api.StateStore;
import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

/**
* Interface for a connector providing information about environment where it is running.
* It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment.
* BaseContext provides base contextual information to the executing function/source/sink.
* It allows to propagate information, such as pulsar environment, logs, metrics, states etc.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface ConnectorContext {

public interface BaseContext {
/**
* The id of the instance that invokes this source.
* The tenant this component belongs to.
*
* @return the instance id
* @return the tenant this component belongs to
*/
int getInstanceId();
String getTenant();

/**
* Get the number of instances that invoke this source.
* The namespace this component belongs to.
*
* @return the number of instances that invoke this source.
*/
int getNumInstances();

/**
* Record a user defined metric
* @param metricName The name of the metric
* @param value The value of the metric
* @return the namespace this component belongs to
*/
void recordMetric(String metricName, double value);
String getNamespace();

/**
* The tenant this source belongs to.
* The id of the instance that invokes this component.
*
* @return the tenant this source belongs to
* @return the instance id
*/
String getTenant();
int getInstanceId();

/**
* The namespace this source belongs to.
* Get the number of instances that invoke this component.
*
* @return the namespace this source belongs to
* @return the number of instances that invoke this component.
*/
String getNamespace();
int getNumInstances();

/**
* The logger object that can be used to log in a sink
* The logger object that can be used to log in a component.
*
* @return the logger object
*/
Logger getLogger();

/**
* Get the secret associated with this key
* Get the secret associated with this key.
*
* @param secretName The name of the secret
* @return The secret if anything was found or null
*/
String getSecret(String secretName);

/**
* Get the state store with the provided store name.
* Get the state store with the provided store name in the tenant & namespace.
*
* @param name the state store name
* @param <S> the type of interface of the store to return
Expand All @@ -92,43 +86,24 @@ public interface ConnectorContext {
* or interface of the actual returned store.
*/
default <S extends StateStore> S getStateStore(String name) {
throw new UnsupportedOperationException("Not implemented");
throw new UnsupportedOperationException("Component cannot get state store");
}

/**
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);


/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);

/**
* Retrieve the counter value for the key.
* Get the state store with the provided store name.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);

/**
* Retrieve the counter value for the key, but don't wait
* for the operation to be completed
* @param tenant the state tenant name
* @param ns the state namespace name
* @param name the state store name
* @param <S> the type of interface of the store to return
* @return the state store instance.
*
* @param key name of the key
* @return the amount of the counter value for this key
* @throws ClassCastException if the return type isn't a type
* or interface of the actual returned store.
*/
CompletableFuture<Long> getCounterAsync(String key);
default <S extends StateStore> S getStateStore(String tenant, String ns, String name) {
throw new UnsupportedOperationException("Component cannot get state store");
}

/**
* Update the state value for the key.
Expand Down Expand Up @@ -175,4 +150,45 @@ default <S extends StateStore> S getStateStore(String name) {
* @param key name of the key
*/
CompletableFuture<Void> deleteStateAsync(String key);

/**
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);

/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);

/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);

/**
* Retrieve the counter value for the key, but don't wait
* for the operation to be completed
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
CompletableFuture<Long> getCounterAsync(String key);

/**
* Record a user defined metric
* @param metricName The name of the metric
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);
}
Loading

0 comments on commit 999329a

Please sign in to comment.