Skip to content

Commit

Permalink
NIFI-8390: Handle HBase namespaces in Atlas reporting task
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Villard <[email protected]>

This closes apache#4977.
  • Loading branch information
turcsanyip authored and pvillard31 committed Apr 7, 2021
1 parent ef60e7e commit 4b852ba
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,48 +28,78 @@
import java.util.regex.Pattern;

import static org.apache.nifi.atlas.AtlasUtils.toQualifiedName;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI;

/**
* Analyze a transit URI as a HBase table.
* <li>qualifiedName=tableName@namespace (example: myTable@ns1)
* <li>name=tableName (example: myTable)
* <li>qualifiedName=hbaseNamespace:tableName@namespace (example: default:myTable@ns1)
* <li>name=[hbaseNamespace:]tableName (example: myTable)
*/
public class HBaseTable extends AbstractNiFiProvenanceEventAnalyzer {

private static final Logger logger = LoggerFactory.getLogger(HBaseTable.class);
private static final String TYPE = "hbase_table";

// hbase://masterAddress/hbaseTableName/hbaseRowId(optional)
private static final Pattern URI_PATTERN = Pattern.compile("^hbase://([^/]+)/([^/]+)/?.*$");
static final String TYPE_HBASE_TABLE = "hbase_table";
static final String TYPE_HBASE_NAMESPACE = "hbase_namespace";

static final String ATTR_NAMESPACE = "namespace";

static final String DEFAULT_NAMESPACE = "default";

// hbase://masterAddress/[hbaseNamespace:]hbaseTableName/hbaseRowId(optional)
private static final Pattern URI_PATTERN = Pattern.compile("^hbase://([^/]+)/(([^/]+):)?([^/]+)/?.*$");

@Override
public DataSetRefs analyze(AnalysisContext context, ProvenanceEventRecord event) {

final String transitUri = event.getTransitUri();
final Matcher uriMatcher = URI_PATTERN.matcher(transitUri);
if (!uriMatcher.matches()) {
logger.warn("Unexpected transit URI: {}", new Object[]{transitUri});
logger.warn("Unexpected transit URI: {}", transitUri);
return null;
}

final Referenceable ref = new Referenceable(TYPE);
final String[] hostNames = splitHostNames(uriMatcher.group(1));
final String namespace = context.getNamespaceResolver().fromHostNames(hostNames);

final String tableName = uriMatcher.group(2);
ref.set(ATTR_NAME, tableName);
ref.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, tableName));
// TODO: 'uri' is a mandatory attribute, but what should we set?
ref.set(ATTR_URI, transitUri);
final String hbaseNamespaceName = uriMatcher.group(3) != null ? uriMatcher.group(3) : DEFAULT_NAMESPACE;
final String hbaseTableName = uriMatcher.group(4);

final Referenceable hbaseNamespaceRef = createHBaseNamespaceRef(namespace, hbaseNamespaceName);
final Referenceable hbaseTableRef = getHBaseTableRef(namespace, hbaseTableName, hbaseNamespaceRef);

return singleDataSetRef(event.getComponentId(), event.getEventType(), ref);
return singleDataSetRef(event.getComponentId(), event.getEventType(), hbaseTableRef);
}

@Override
public String targetTransitUriPattern() {
return "^hbase://.+$";
}

private Referenceable createHBaseNamespaceRef(String namespace, String hbaseNamespaceName) {
final Referenceable hbaseNamespaceRef = new Referenceable(TYPE_HBASE_NAMESPACE);

hbaseNamespaceRef.set(ATTR_NAME, hbaseNamespaceName);
hbaseNamespaceRef.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, hbaseNamespaceName));
hbaseNamespaceRef.set(ATTR_CLUSTER_NAME, namespace);

return hbaseNamespaceRef;
}

private Referenceable getHBaseTableRef(String namespace, String hbaseTableName, Referenceable hbaseNamespaceRef) {
final Referenceable hbaseTableRef = new Referenceable(TYPE_HBASE_TABLE);

final String hbaseTableFullName = String.format("%s:%s", hbaseNamespaceRef.get(ATTR_NAME), hbaseTableName);
final boolean isDefaultHBaseNamespace = DEFAULT_NAMESPACE.equals(hbaseNamespaceRef.get(ATTR_NAME));

hbaseTableRef.set(ATTR_NAME, isDefaultHBaseNamespace ? hbaseTableName : hbaseTableFullName);
hbaseTableRef.set(ATTR_QUALIFIED_NAME, toQualifiedName(namespace, hbaseTableFullName));
hbaseTableRef.set(ATTR_NAMESPACE, hbaseNamespaceRef);
hbaseTableRef.set(ATTR_URI, isDefaultHBaseNamespace ? hbaseTableName : hbaseTableFullName);

return hbaseTableRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,70 +27,107 @@
import org.junit.Test;
import org.mockito.Mockito;

import static org.apache.nifi.atlas.NiFiTypes.ATTR_CLUSTER_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_URI;
import static org.apache.nifi.atlas.provenance.analyzer.HBaseTable.ATTR_NAMESPACE;
import static org.apache.nifi.atlas.provenance.analyzer.HBaseTable.TYPE_HBASE_NAMESPACE;
import static org.apache.nifi.atlas.provenance.analyzer.HBaseTable.TYPE_HBASE_TABLE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.Mockito.when;

public class TestHBaseTable {

private static final String PROCESSOR_NAME = "FetchHBaseRow";
private static final String ATLAS_METADATA_NAMESPACE = "namespace1";

@Test
public void testHBaseTable() {
final String processorName = "FetchHBaseRow";
public void testHBaseTableImplicitDefaultNamespace() {
final String transitUri = "hbase://0.example.com/tableA/rowB";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
when(record.getComponentType()).thenReturn(processorName);
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH);
final ProvenanceEventRecord record = mockProvenanceEventRecord(transitUri);

final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn(ATLAS_METADATA_NAMESPACE);

executeTest(record, namespaceResolvers, "tableA", "default:tableA@namespace1", "tableA", "default", "default@namespace1");
}

@Test
public void testHBaseTableExplicitDefaultNamespace() {
final String transitUri = "hbase://0.example.com/default:tableA/rowB";
final ProvenanceEventRecord record = mockProvenanceEventRecord(transitUri);

final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");

final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);
executeTest(record, namespaceResolvers, "tableA", "default:tableA@namespace1", "tableA", "default", "default@namespace1");
}

final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
@Test
public void testHBaseTableCustomNamespace() {
final String transitUri = "hbase://0.example.com/namespaceA:tableA/rowB";
final ProvenanceEventRecord record = mockProvenanceEventRecord(transitUri);

final DataSetRefs refs = analyzer.analyze(context, record);
assertEquals(1, refs.getInputs().size());
assertEquals(0, refs.getOutputs().size());
Referenceable ref = refs.getInputs().iterator().next();
assertEquals("hbase_table", ref.getTypeName());
assertEquals("tableA", ref.get(ATTR_NAME));
assertEquals("tableA@namespace1", ref.get(ATTR_QUALIFIED_NAME));
final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(matches(".+\\.example\\.com"))).thenReturn("namespace1");

executeTest(record, namespaceResolvers, "namespaceA:tableA", "namespaceA:tableA@namespace1", "namespaceA:tableA", "namespaceA", "namespaceA@namespace1");
}

@Test
public void testHBaseTableWithMultipleZkHosts() {
final String processorName = "FetchHBaseRow";
final String transitUri = "hbase://zk0.example.com,zk2.example.com,zk3.example.com/tableA/rowB";
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);
when(record.getComponentType()).thenReturn(processorName);
when(record.getTransitUri()).thenReturn(transitUri);
when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH);
final ProvenanceEventRecord record = mockProvenanceEventRecord(transitUri);

final NamespaceResolvers namespaceResolvers = Mockito.mock(NamespaceResolvers.class);
when(namespaceResolvers.fromHostNames(
matches("zk0.example.com"),
matches("zk2.example.com"),
matches("zk3.example.com"))).thenReturn("namespace1");

executeTest(record, namespaceResolvers, "tableA", "default:tableA@namespace1", "tableA", "default", "default@namespace1");
}

private ProvenanceEventRecord mockProvenanceEventRecord(String transitUri) {
final ProvenanceEventRecord record = Mockito.mock(ProvenanceEventRecord.class);

when(record.getEventType()).thenReturn(ProvenanceEventType.FETCH);
when(record.getComponentType()).thenReturn(PROCESSOR_NAME);
when(record.getTransitUri()).thenReturn(transitUri);

return record;
}

private void executeTest(ProvenanceEventRecord record, NamespaceResolvers namespaceResolvers, String expectedTableName, String expectedTableQualifiedName, String expectedTableUri,
String expectedNamespaceName, String expectedNamespaceQualifiedName) {
final AnalysisContext context = Mockito.mock(AnalysisContext.class);
when(context.getNamespaceResolver()).thenReturn(namespaceResolvers);

final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(processorName, transitUri, record.getEventType());
assertNotNull(analyzer);
final NiFiProvenanceEventAnalyzer analyzer = NiFiProvenanceEventAnalyzerFactory.getAnalyzer(PROCESSOR_NAME, record.getTransitUri(), record.getEventType());

final DataSetRefs refs = analyzer.analyze(context, record);

assertAnalysisResult(refs, expectedTableName, expectedTableQualifiedName, expectedTableUri, expectedNamespaceName, expectedNamespaceQualifiedName);
}

private void assertAnalysisResult(DataSetRefs refs, String expectedTableName, String expectedTableQualifiedName, String expectedTableUri,
String expectedNamespaceName, String expectedNamespaceQualifiedName) {
assertEquals(1, refs.getInputs().size());
assertEquals(0, refs.getOutputs().size());
Referenceable ref = refs.getInputs().iterator().next();
assertEquals("hbase_table", ref.getTypeName());
assertEquals("tableA", ref.get(ATTR_NAME));
assertEquals("tableA@namespace1", ref.get(ATTR_QUALIFIED_NAME));

Referenceable tableRef = refs.getInputs().iterator().next();
assertEquals(TYPE_HBASE_TABLE, tableRef.getTypeName());
assertEquals(expectedTableName, tableRef.get(ATTR_NAME));
assertEquals(expectedTableQualifiedName, tableRef.get(ATTR_QUALIFIED_NAME));
assertEquals(expectedTableUri, tableRef.get(ATTR_URI));

Referenceable namespaceRef = (Referenceable) tableRef.get(ATTR_NAMESPACE);
assertEquals(TYPE_HBASE_NAMESPACE, namespaceRef.getTypeName());
assertEquals(expectedNamespaceName, namespaceRef.get(ATTR_NAME));
assertEquals(expectedNamespaceQualifiedName, namespaceRef.get(ATTR_QUALIFIED_NAME));
assertEquals(ATLAS_METADATA_NAMESPACE, namespaceRef.get(ATTR_CLUSTER_NAME));
}

}

0 comments on commit 4b852ba

Please sign in to comment.