Skip to content

Commit

Permalink
NIFI-4818: Fix transit URL parsing at Hive2JDBC and KafkaTopic for Re…
Browse files Browse the repository at this point in the history
…portLineageToAtlas

- Hive2JDBC: Handle connection parameters and multiple host entries
correctly
- KafkaTopic: Handle multiple host entries correctly
- Avoid potential "IllegalStateException: Duplicate key" exception
when NiFiAtlasHook analyzes existing NiFiFlowPath input/output entries
- This closes apache#2435
  • Loading branch information
ijokarumawak authored and mcgilman committed Feb 7, 2018
1 parent 3ca7c3e commit f16cbd4
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,11 @@ private Map<String, Referenceable> toReferenceables(Object _refs, Metrics metric
}
return new Tuple<>(refQualifiedName, typedQualifiedNameToRef.get(toTypedQualifiedName(typeName, refQualifiedName)));
}).filter(Objects::nonNull).filter(tuple -> tuple.getValue() != null)
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
// If duplication happens, use new value.
.collect(Collectors.toMap(Tuple::getKey, Tuple::getValue, (oldValue, newValue) -> {
logger.warn("Duplicated qualified name was found, use the new one. oldValue={}, newValue={}", new Object[]{oldValue, newValue});
return newValue;
}));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_INPUT_TABLES;
import static org.apache.nifi.atlas.provenance.analyzer.DatabaseAnalyzerUtil.ATTR_OUTPUT_TABLES;
Expand All @@ -49,17 +53,41 @@
*/
public class Hive2JDBC extends AbstractHiveAnalyzer {

private static final Logger logger = LoggerFactory.getLogger(Hive2JDBC.class);

// jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list
// Group 1 = <host1>:<port1>,<host2>:<port2>
// Group 2 = dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list
private static final String URI_PATTERN_STR = "jdbc:hive2://([^/]+)/?(.*)$";
private static final Pattern URI_PATTERN = Pattern.compile(URI_PATTERN_STR);

@Override
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {

// Replace the colon so that the schema in the URI can be parsed correctly.
final String transitUri = event.getTransitUri().replaceFirst("^jdbc:hive2", "jdbc-hive2");
final URI uri = parseUri(transitUri);
final String clusterName = context.getClusterResolver().fromHostNames(uri.getHost());
// Remove the heading '/'
final String path = uri.getPath();
// If uri does not contain database name, then use 'default' as database name.
final String connectedDatabaseName = path == null || path.isEmpty() ? "default" : path.substring(1);
final String transitUri = event.getTransitUri();
if (transitUri == null) {
return null;
}

final Matcher uriMatcher = URI_PATTERN.matcher(transitUri);
if (!uriMatcher.matches()) {
logger.warn("Unexpected transit URI: {}", new Object[]{transitUri});
return null;
}

final String clusterName = context.getClusterResolver().fromHostNames(splitHostNames(uriMatcher.group(1)));
String connectedDatabaseName = null;
if (uriMatcher.groupCount() > 1) {
// Try to find connected database name from connection parameters.
final String[] connectionParams = uriMatcher.group(2).split(";");
connectedDatabaseName = connectionParams[0];
}

if (StringUtils.isEmpty(connectedDatabaseName)) {
// If not found, then use "default".
connectedDatabaseName = "default";
}

final Set<Tuple<String, String>> inputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_INPUT_TABLES));
final Set<Tuple<String, String>> outputTables = parseTableNames(connectedDatabaseName, event.getAttribute(ATTR_OUTPUT_TABLES));
Expand Down Expand Up @@ -97,6 +125,6 @@ private void addRefs(DataSetRefs refs, boolean isInput, String clusterName,

@Override
public String targetTransitUriPattern() {
return "^jdbc:hive2://.+$";
return URI_PATTERN_STR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,8 @@ public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event)
return null;
}

String clusterName = null;
for (String broker : uriMatcher.group(1).split(",")) {
final String brokerHostname = broker.split(":")[0].trim();
clusterName = context.getClusterResolver().fromHostNames(brokerHostname);
if (clusterName != null && !clusterName.isEmpty()) {
break;
}
}
final String[] hostNames = splitHostNames(uriMatcher.group(1));
final String clusterName = context.getClusterResolver().fromHostNames(hostNames);

final String topicName = uriMatcher.group(2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.matches;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -160,4 +161,136 @@ public void testTableLineageWithDefaultTableName() {
assertEquals("tableB1", ref.get(ATTR_NAME));
assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}

/**
* A Hive connection URL can have connection strings delimited by semicolons.
*/
@Test
public void testTableLineageWithDefaultTableNameWithConnectionParams() {
final String processorName = "PutHiveQL";
final String transitUri = "jdbc:hive2://0.example.com:10000;transportMode=http;httpPath=cliservice";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
when(record.getComponentType()).thenReturn(processorName);
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
// E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");

final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");

final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);

final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);

final DataSetRefs refs = analyzer.analyze(context, record);
assertEquals(2, refs.getInputs().size());
// QualifiedName : Name
final Map<String, String> expectedInputRefs = new HashMap<>();
expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
for (Referenceable ref : refs.getInputs()) {
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
assertTrue(expectedInputRefs.containsKey(qName));
assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME));
}

assertEquals(1, refs.getOutputs().size());
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_table", ref.getTypeName());
assertEquals("tableB1", ref.get(ATTR_NAME));
assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}

/**
* Hive connection URL can have multiple zookeeper host ports
* and multiple parameters delimited with semicolons.
* Database name can be omitted.
*/
@Test
public void testTableLineageWithZookeeperDiscovery() {
final String processorName = "PutHiveQL";
final String transitUri = "jdbc:hive2://0.example.com:2181,1.example.com:2181,2.example.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
when(record.getComponentType()).thenReturn(processorName);
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
// E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");

final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"), eq("2.example.com"))).thenReturn("cluster1");

final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);

final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);

final DataSetRefs refs = analyzer.analyze(context, record);
assertEquals(2, refs.getInputs().size());
// QualifiedName : Name
final Map<String, String> expectedInputRefs = new HashMap<>();
expectedInputRefs.put("default.tableA1@cluster1", "tableA1");
expectedInputRefs.put("default.tableA2@cluster1", "tableA2");
for (Referenceable ref : refs.getInputs()) {
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
assertTrue(expectedInputRefs.containsKey(qName));
assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME));
}

assertEquals(1, refs.getOutputs().size());
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_table", ref.getTypeName());
assertEquals("tableB1", ref.get(ATTR_NAME));
assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}

/**
* Hive connection URL using zookeeper and database name.
*/
@Test
public void testTableLineageWithZookeeperDiscoverySpecificDatabase() {
final String processorName = "PutHiveQL";
final String transitUri = "jdbc:hive2://0.example.com:2181,1.example.com:2181/some_database;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
when(record.getComponentType()).thenReturn(processorName);
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);
// E.g. insert into databaseB.tableB1 select something from tableA1 a1 inner join tableA2 a2 where a1.id = a2.id
when(record.getAttribute(ATTR_INPUT_TABLES)).thenReturn("tableA1, tableA2");
when(record.getAttribute(ATTR_OUTPUT_TABLES)).thenReturn("databaseB.tableB1");

final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1");

final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);

final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);

final DataSetRefs refs = analyzer.analyze(context, record);
assertEquals(2, refs.getInputs().size());
// QualifiedName : Name
final Map<String, String> expectedInputRefs = new HashMap<>();
expectedInputRefs.put("some_database.tableA1@cluster1", "tableA1");
expectedInputRefs.put("some_database.tableA2@cluster1", "tableA2");
for (Referenceable ref : refs.getInputs()) {
final String qName = (String) ref.get(ATTR_QUALIFIED_NAME);
assertTrue(expectedInputRefs.containsKey(qName));
assertEquals(expectedInputRefs.get(qName), ref.get(ATTR_NAME));
}

assertEquals(1, refs.getOutputs().size());
Referenceable ref = refs.getOutputs().iterator().next();
assertEquals("hive_table", ref.getTypeName());
assertEquals("tableB1", ref.get(ATTR_NAME));
assertEquals("databaseB.tableB1@cluster1", ref.get(ATTR_QUALIFIED_NAME));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.matches;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -73,7 +74,7 @@ public void testPublishKafkaMultipleBrokers() {
when(record.getEventType()).thenReturn(ProvenanceEventType.SEND);

final ClusterResolvers clusterResolvers = Mockito.mock(ClusterResolvers.class);
when(clusterResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("cluster1");
when(clusterResolvers.fromHostNames(eq("0.example.com"), eq("1.example.com"))).thenReturn("cluster1");

final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getClusterResolver()).thenReturn(clusterResolvers);
Expand Down

0 comments on commit f16cbd4

Please sign in to comment.