Skip to content

Commit

Permalink
Expose state to sources and sinks (apache#4364)
Browse files Browse the repository at this point in the history
* Expose state to sources and sinks

* Fix unittest

* Fix unittest
  • Loading branch information
srkukarni authored May 25, 2019
1 parent 7cadc93 commit f8349e2
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class IOConfigUtilsTest {
Expand Down Expand Up @@ -115,6 +117,44 @@ public Logger getLogger() {
public String getSecret(String secretName) {
return secretsMap.get(secretName);
}

@Override
public void incrCounter(String key, long amount) { }

@Override
public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
return null;
}

@Override
public long getCounter(String key) {
return 0;
}

@Override
public CompletableFuture<Long> getCounterAsync(String key) {
return null;
}

@Override
public void putState(String key, ByteBuffer value) {

}

@Override
public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
return null;
}

@Override
public ByteBuffer getState(String key) {
return null;
}

@Override
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}
}

@Test
Expand Down Expand Up @@ -189,6 +229,45 @@ public Logger getLogger() {
public String getSecret(String secretName) {
return secretsMap.get(secretName);
}

@Override
public void incrCounter(String key, long amount) {
}

@Override
public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
return null;
}

@Override
public long getCounter(String key) {
return 0;
}

@Override
public CompletableFuture<Long> getCounterAsync(String key) {
return null;
}

@Override
public void putState(String key, ByteBuffer value) {

}

@Override
public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
return null;
}

@Override
public ByteBuffer getState(String key) {
return null;
}

@Override
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;

public interface SinkContext {

Expand Down Expand Up @@ -81,4 +83,71 @@ public interface SinkContext {
* @return The secret if anything was found or null
*/
String getSecret(String secretName);

/**
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);


/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);

/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);

/**
* Retrieve the counter value for the key, but don't wait
* for the operation to be completed
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
CompletableFuture<Long> getCounterAsync(String key);

/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);

/**
* Update the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @param value state value of the key
*/
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);

/**
* Retrieve the state value for the key.
*
* @param key name of the key
* @return the state value for the key.
*/
ByteBuffer getState(String key);

/**
* Retrieve the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @return the state value for the key.
*/
CompletableFuture<ByteBuffer> getStateAsync(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.slf4j.Logger;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;

public interface SourceContext {

/**
Expand Down Expand Up @@ -79,4 +82,71 @@ public interface SourceContext {
* @return The secret if anything was found or null
*/
String getSecret(String secretName);

/**
* Increment the builtin distributed counter referred by key.
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);


/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
* @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);

/**
* Retrieve the counter value for the key.
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
long getCounter(String key);

/**
* Retrieve the counter value for the key, but don't wait
* for the operation to be completed
*
* @param key name of the key
* @return the amount of the counter value for this key
*/
CompletableFuture<Long> getCounterAsync(String key);

/**
* Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);

/**
* Update the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @param value state value of the key
*/
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);

/**
* Retrieve the state value for the key.
*
* @param key name of the key
* @return the state value for the key.
*/
ByteBuffer getState(String key);

/**
* Retrieve the state value for the key, but don't wait for the operation to be completed
*
* @param key name of the key
* @return the state value for the key.
*/
CompletableFuture<ByteBuffer> getStateAsync(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

import static org.testng.Assert.*;

Expand Down Expand Up @@ -115,6 +117,46 @@ public Logger getLogger() {

@Override
public String getSecret(String key) { return null; }

@Override
public void incrCounter(String key, long amount) {

}

@Override
public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
return null;
}

@Override
public long getCounter(String key) {
return 0;
}

@Override
public CompletableFuture<Long> getCounterAsync(String key) {
return null;
}

@Override
public void putState(String key, ByteBuffer value) {

}

@Override
public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
return null;
}

@Override
public ByteBuffer getState(String key) {
return null;
}

@Override
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}
};
ThrowingRunnable openAndClose = ()->{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -116,6 +118,46 @@ public Logger getLogger() {

@Override
public String getSecret(String key) { return null; }

@Override
public void incrCounter(String key, long amount) {

}

@Override
public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
return null;
}

@Override
public long getCounter(String key) {
return 0;
}

@Override
public CompletableFuture<Long> getCounterAsync(String key) {
return null;
}

@Override
public void putState(String key, ByteBuffer value) {

}

@Override
public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
return null;
}

@Override
public ByteBuffer getState(String key) {
return null;
}

@Override
public CompletableFuture<ByteBuffer> getStateAsync(String key) {
return null;
}
};
Map<String, Object> config = new HashMap<>();
ThrowingRunnable openAndClose = ()->{
Expand Down

0 comments on commit f8349e2

Please sign in to comment.