Skip to content

Commit

Permalink
Support flattening and unflattening structured types (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan authored Dec 23, 2024
1 parent 08729e9 commit 64b7b73
Show file tree
Hide file tree
Showing 19 changed files with 372 additions and 76 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
install:
./gradlew compileJava installDist

test:
./gradlew test -x spotbugsMain -x spotbugsTest -x spotbugsTestFixtures

build:
./gradlew build
docker build . -t hoptimator
Expand Down Expand Up @@ -77,4 +80,4 @@ release:
test -n "$(VERSION)" # MISSING ARG: $$VERSION
./gradlew publish

.PHONY: build clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release
.PHONY: build test install clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release
46 changes: 46 additions & 0 deletions hoptimator-avro/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,55 @@
plugins {
id 'java'
id 'maven-publish'
}

dependencies {
implementation project(':hoptimator-api')
implementation libs.avro
implementation libs.calcite.core
}

publishing {
repositories {
maven {
name 'GitHubPackages'
url = 'https://maven.pkg.github.com/linkedin/Hoptimator'
credentials {
username = System.getenv('GITHUB_ACTOR')
password = System.getenv('GITHUB_TOKEN')
}
}
maven {
name 'LinkedInJFrog'
url 'https://linkedin.jfrog.io/artifactory/hoptimator'
credentials {
username = System.getenv('JFROG_USERNAME')
password = System.getenv('JFROG_API_KEY')
}
}
}
publications {
maven(MavenPublication) {
groupId = 'com.linkedin.hoptimator'
artifactId = 'hoptimator-avro'
version = System.getenv('VERSION')
from components.java
pom {
name = 'hoptimator-avro'
description = 'Hoptimator plugin for Apache Avro'
url = 'https://github.com/linkedin/Hoptimator'
licenses {
license {
name = 'BSD 2-Clause'
url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE'
}
}
scm {
connection = 'scm:git:git://github.com:linkedin/Hoptimator.git'
developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git'
url = 'https://github.com/linkedin/Hoptimator'
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class AdsSchema extends AbstractSchema {
public AdsSchema() {
tableMap.put("PAGE_VIEWS", new PageViewTable());
tableMap.put("AD_CLICKS", new AdClickTable());
tableMap.put("CAMPAIGNS", new CampaignTable());
}

@Override
Expand Down
1 change: 0 additions & 1 deletion hoptimator-jdbc-driver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ plugins {
}

dependencies {
implementation project(':hoptimator-avro')
implementation project(':hoptimator-demodb')
implementation project(':hoptimator-jdbc')
implementation project(':hoptimator-util')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.linkedin.hoptimator.util.RemoteTable;


/** A table populated with all available Catlaogs. */
/** A table populated with all available Catalogs. */
public class CatalogTable extends RemoteTable<Catalog, CatalogTable.Row> {

// This and other Row classes are used by generated code, so it is important
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.sql.SQLException;
import java.sql.Wrapper;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.calcite.schema.SchemaPlus;
Expand All @@ -15,7 +15,7 @@
/** Built-in utility tables. */
public class UtilityCatalog extends AbstractSchema implements Catalog {

private final Map<String, Table> tableMap = new HashMap<>();
private final Map<String, Table> tableMap = new LinkedHashMap<>();

public UtilityCatalog() {
tableMap.put("PRINT", new PrintTable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.io.IOException;
import java.io.StringReader;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -47,10 +47,9 @@ public Map<String, String> configure(Source source) throws SQLException {
} catch (IOException e) {
throw new SQLException(e);
}
Map<String, String> map = new HashMap<>();
for (String key : props.stringPropertyNames()) {
map.put(key, props.getProperty(key));
}
Map<String, String> map = new LinkedHashMap<>();
props.stringPropertyNames().stream().sorted().forEach(k ->
map.put(k, props.getProperty(k)));
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public K8sDatabaseTable(K8sContext context) {
public void addDatabases(SchemaPlus parentSchema) {
for (Row row : rows()) {
parentSchema.add(schemaName(row),
HoptimatorJdbcSchema.create(row.NAME, null, row.SCHEMA, dataSource(row), parentSchema, dialect(row)));
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row), parentSchema, dialect(row)));
}
}

Expand Down
4 changes: 2 additions & 2 deletions hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ spec:
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2', 'connector'='kafka')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1', 'connector'='kafka')
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1')
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
Expand Down
61 changes: 61 additions & 0 deletions hoptimator-util/build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,69 @@
plugins {
id 'java'
id 'maven-publish'
}

dependencies {
implementation project(':hoptimator-api')
implementation libs.calcite.core

testImplementation(testFixtures(project(':hoptimator-jdbc')))
testImplementation(platform('org.junit:junit-bom:5.11.3'))
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}

test {
useJUnitPlatform {
excludeTags 'integration'
}
testLogging {
events "passed", "skipped", "failed"
}
}

publishing {
repositories {
maven {
name 'GitHubPackages'
url = 'https://maven.pkg.github.com/linkedin/Hoptimator'
credentials {
username = System.getenv('GITHUB_ACTOR')
password = System.getenv('GITHUB_TOKEN')
}
}
maven {
name 'LinkedInJFrog'
url 'https://linkedin.jfrog.io/artifactory/hoptimator'
credentials {
username = System.getenv('JFROG_USERNAME')
password = System.getenv('JFROG_API_KEY')
}
}
}
publications {
maven(MavenPublication) {
groupId = 'com.linkedin.hoptimator'
artifactId = 'hoptimator-util'
version = System.getenv('VERSION')
from components.java
pom {
name = 'hoptimator-util'
description = 'Utilities to help with extending Hoptimator'
url = 'https://github.com/linkedin/Hoptimator'
licenses {
license {
name = 'BSD 2-Clause'
url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE'
}
}
scm {
connection = 'scm:git:git://github.com:linkedin/Hoptimator.git'
developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git'
url = 'https://github.com/linkedin/Hoptimator'
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
Expand All @@ -19,7 +19,7 @@ private ConnectionService() {
}

public static <T> Map<String, String> configure(T object, Class<T> clazz) throws SQLException {
Map<String, String> configs = new HashMap<>();
Map<String, String> configs = new LinkedHashMap<>();
for (Connector<T> connector : connectors(clazz)) {
configs.putAll(connector.configure(object));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.linkedin.hoptimator.util;

import java.util.Collections;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;


public final class DataTypeUtils {

private DataTypeUtils() {
}

/**
* Flattens nested structs and complex arrays.
*
* Nested structs like `FOO Row(BAR Row(QUX VARCHAR)))` are promoted to
* top-level fields like `FOO$BAR$QUX VARCHAR`.
*
* Complex arrays are demoted to just `ANY ARRAY`. Primitive arrays are
* unchanged.
*
*/
public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeFactory) {
if (!dataType.isStruct()) {
return dataType;
}
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
flattenInto(typeFactory, dataType, builder, Collections.emptyList());
return builder.build();
}

private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType dataType,
RelDataTypeFactory.Builder builder, List<String> path) {
if (dataType.getComponentType() != null && (dataType.getComponentType().isStruct()
|| dataType.getComponentType().getComponentType() != null)) {
// demote complex arrays to just `ANY ARRAY`
builder.add(path.stream().collect(Collectors.joining("$")), typeFactory.createArrayType(
typeFactory.createSqlType(SqlTypeName.ANY), -1));
} else if (!dataType.isStruct()) {
builder.add(path.stream().collect(Collectors.joining("$")), dataType);
} else {
for (RelDataTypeField field : dataType.getFieldList()) {
flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(),
Stream.of(field.getName())).collect(Collectors.toList()));
}
}
}

/** Restructures flattened types, from `FOO$BAR VARCHAR` to `FOO Row(BAR VARCHAR...)` */
public static RelDataType unflatten(RelDataType dataType, RelDataTypeFactory typeFactory) {
if (!dataType.isStruct()) {
throw new IllegalArgumentException("Can only unflatten a struct type.");
}
Node root = new Node();
for (RelDataTypeField field : dataType.getFieldList()) {
buildNodes(root, field.getName(), field.getType());
}
return buildRecord(root, typeFactory);
}

private static void buildNodes(Node pos, String name, RelDataType dataType) {
if (!name.contains("$")) {
pos.children.put(name, new Node(dataType));
} else {
String[] parts = name.split("\\$", 2);
Node child = pos.children.computeIfAbsent(parts[0], x -> new Node());
buildNodes(child, parts[1], dataType);
}
}

private static RelDataType buildRecord(Node node, RelDataTypeFactory typeFactory) {
if (node.dataType != null) {
return node.dataType;
}
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
for (LinkedHashMap.Entry<String, Node> child : node.children.entrySet()) {
builder.add(child.getKey(), buildRecord(child.getValue(), typeFactory));
}
return builder.build();
}

private static class Node {
RelDataType dataType;
LinkedHashMap<String, Node> children = new LinkedHashMap<>();

Node(RelDataType dataType) {
this.dataType = dataType;
}

Node() {
// nop
}
}
}

This file was deleted.

Loading

0 comments on commit 64b7b73

Please sign in to comment.