Skip to content

Commit

Permalink
Making event hub configs Samza compliant
Browse files Browse the repository at this point in the history
Fixes for Bugs

- SAMZA-1571 Make Eventhubs system configs compatible with Samza standalone.
- SAMZA-1624 EventHub system should prefix the configs with senstive for SasKey and SasToken
- SAMZA-1625 EventHub systemAdmin is swallowing exceptions
- SAMZA-1626 EventHub system admin is not returning the metadata for all the ssps requested for

Description

1. Right now event hub doesn't follow the samza's config convention of naming the secrets as "sensitive" so that they are masked before they are logged.
2. Event hub configs uses the old system.<systemName>.streams.<streamName> which is blacklisted in Samza standalone. So moving these configs to newer <streams>.<streamid>
3. Wrapping the underlying exception properly in the SamzaException in EventHubSystemAdmin
4. Porting Bharat's fix to return the metadata for all the ssps requested for in EventHubSystemAdmin

Author: Srinivasulu Punuru <[email protected]>

Reviewers: Xinyu Liu <[email protected]>

Closes apache#453 from srinipunuru/eh.1
  • Loading branch information
srinipunuru authored and xinyuiscool committed Mar 23, 2018
1 parent 956cf41 commit 272aa32
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ public class EventHubConfig extends MapConfig {

public static final String CONFIG_STREAM_LIST = "systems.%s.stream.list";

public static final String CONFIG_STREAM_NAMESPACE = "systems.%s.streams.%s.eventhubs.namespace";
public static final String CONFIG_STREAM_NAMESPACE = "streams.%s.eventhubs.namespace";

public static final String CONFIG_STREAM_ENTITYPATH = "systems.%s.streams.%s.eventhubs.entitypath";
public static final String CONFIG_STREAM_ENTITYPATH = "streams.%s.eventhubs.entitypath";

public static final String CONFIG_STREAM_SAS_KEY_NAME = "systems.%s.streams.%s.eventhubs.sas.keyname";
public static final String CONFIG_STREAM_SAS_KEY_NAME = Config.SENSITIVE_PREFIX + "streams.%s.eventhubs.sas.keyname";

public static final String CONFIG_STREAM_SAS_TOKEN = "systems.%s.streams.%s.eventhubs.sas.token";
public static final String CONFIG_STREAM_SAS_TOKEN = Config.SENSITIVE_PREFIX + "streams.%s.eventhubs.sas.token";

public static final String CONFIG_STREAM_CONSUMER_GROUP = "systems.%s.streams.%s.eventhubs.consumer.group";
public static final String CONFIG_STREAM_CONSUMER_GROUP = "streams.%s.eventhubs.consumer.group";
public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;

public static final String CONFIG_PRODUCER_PARTITION_METHOD = "systems.%s.eventhubs.partition.method";
Expand All @@ -71,18 +71,18 @@ public EventHubConfig(Config config) {
.forEach((streamId) -> physcialToId.put(streamConfig.getPhysicalName(streamId), streamId));
}

private String getFromStreamIdOrName(String configName, String systemName, String streamName, String defaultString) {
String result = getFromStreamIdOrName(configName, systemName, streamName);
private String getFromStreamIdOrName(String configName, String streamName, String defaultString) {
String result = getFromStreamIdOrName(configName, streamName);
if (result == null) {
return defaultString;
}
return result;
}

private String getFromStreamIdOrName(String configName, String systemName, String streamName) {
private String getFromStreamIdOrName(String configName, String streamName) {
String streamId = getStreamId(streamName);
return get(String.format(configName, systemName, streamId),
streamId.equals(streamName) ? null : get(String.format(configName, systemName, streamName)));
return get(String.format(configName, streamId),
streamId.equals(streamName) ? null : get(String.format(configName, streamName)));
}

private String validateRequiredConfig(String value, String fieldName, String systemName, String streamName) {
Expand Down Expand Up @@ -122,7 +122,7 @@ public List<String> getStreams(String systemName) {
* @return EventHubs namespace
*/
public String getStreamNamespace(String systemName, String streamName) {
return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, systemName, streamName),
return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, streamName),
"Namespace", systemName, streamName);
}

Expand All @@ -134,7 +134,7 @@ public String getStreamNamespace(String systemName, String streamName) {
* @return EventHubs entity path
*/
public String getStreamEntityPath(String systemName, String streamName) {
return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, systemName, streamName),
return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, streamName),
"EntityPath", systemName, streamName);
}

Expand All @@ -146,7 +146,7 @@ public String getStreamEntityPath(String systemName, String streamName) {
* @return EventHubs SAS key name
*/
public String getStreamSasKeyName(String systemName, String streamName) {
return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName),
return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, streamName),
"SASKeyName", systemName, streamName);
}

Expand All @@ -158,7 +158,7 @@ public String getStreamSasKeyName(String systemName, String streamName) {
* @return EventHubs SAS token
*/
public String getStreamSasToken(String systemName, String streamName) {
return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, systemName, streamName),
return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, streamName),
"SASToken", systemName, streamName);
}

Expand All @@ -170,7 +170,7 @@ public String getStreamSasToken(String systemName, String streamName) {
* @return EventHubs consumer group
*/
public String getStreamConsumerGroup(String systemName, String streamName) {
return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, systemName, streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> str
streamPartitions.put(streamName, ehInfo.getPartitionIds());
} catch (InterruptedException | ExecutionException | TimeoutException e) {

String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s, Stream:%s",
systemName, streamName);
throw new SamzaException(msg);
String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s, Stream:%s", systemName,
streamName);
LOG.error(msg, e);
throw new SamzaException(msg, e);
}
}

Expand Down Expand Up @@ -172,7 +173,8 @@ private Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadata(Strin
String msg = String.format(
"Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
systemName, streamName, partitionId);
throw new SamzaException(msg);
LOG.error(msg, e);
throw new SamzaException(msg, e);
}
});
return sspMetadataMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ public static Config getEventHubConfig(EventHubSystemProducer.PartitioningMethod
mapConfig.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, SYSTEM_NAME), partitioningMethod.toString());
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, SYSTEM_NAME), STREAM_NAME1 + "," + STREAM_NAME2);

mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_NAMESPACE);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_ENTITY1);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY);

mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_NAMESPACE);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_ENTITY2);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, STREAM_NAME1), EVENTHUB_NAMESPACE);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, STREAM_NAME1), EVENTHUB_ENTITY1);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, STREAM_NAME1), EVENTHUB_KEY);

mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, STREAM_NAME2), EVENTHUB_NAMESPACE);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, STREAM_NAME2), EVENTHUB_ENTITY2);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, STREAM_NAME2), EVENTHUB_KEY);

return new MapConfig(mapConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public void testMultipleRegistersToSameSSP() throws Exception {
// Set configs
Map<String, String> configMap = new HashMap<>();
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1);
MapConfig config = new MapConfig(configMap);

MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
Expand Down Expand Up @@ -123,10 +123,10 @@ public void testSinglePartitionConsumptionHappyPath() throws Exception {
// Set configs
Map<String, String> configMap = new HashMap<>();
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1);
MapConfig config = new MapConfig(configMap);

MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
Expand Down Expand Up @@ -173,10 +173,10 @@ public void testSinglePartitionConsumptionInterceptor() throws Exception {
// Set configs
Map<String, String> configMap = new HashMap<>();
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1);
MapConfig config = new MapConfig(configMap);

MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
Expand Down Expand Up @@ -225,10 +225,10 @@ public void testMultiPartitionConsumptionHappyPath() throws Exception {
// Set configs
Map<String, String> configMap = new HashMap<>();
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
MapConfig config = new MapConfig(configMap);

MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
Expand Down Expand Up @@ -282,14 +282,14 @@ public void testMultiStreamsConsumptionHappyPath() throws Exception {
Map<String, String> configMap = new HashMap<>();
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName),
String.format("%s,%s", streamName1, streamName2));
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName1), MOCK_ENTITY_1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName1), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName1), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName1), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName2), MOCK_ENTITY_2);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName2), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName2), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName2), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName1), MOCK_ENTITY_1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName1), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName1), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName1), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName2), MOCK_ENTITY_2);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName2), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName2), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName2), EVENTHUB_KEY);
MapConfig config = new MapConfig(configMap);

MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ public void testSendingToSpecificPartitions() throws Exception {
// Set configs
Map<String, String> configMap = new HashMap<>();
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
MapConfig config = new MapConfig(configMap);
Expand Down Expand Up @@ -125,10 +125,10 @@ public void testSendingToSpecificPartitionsWithInterceptor() throws Exception {
// Set configs
Map<String, String> configMap = new HashMap<>();
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
MapConfig config = new MapConfig(configMap);
Expand Down Expand Up @@ -182,10 +182,10 @@ public void testSendingToEventHubHashing() throws Exception {
// Set configs
Map<String, String> configMap = new HashMap<>();
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);

// mod 2 on the partitionid to simulate consistent hashing
configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
Expand Down

0 comments on commit 272aa32

Please sign in to comment.