Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
refactor and remove unnecessary APIs (#13)
Browse files Browse the repository at this point in the history
* restructuring and removing the simple placeholder of relation 
* making type factory part of the relation step
* refactor and remove unnecessary APIs
  • Loading branch information
Rong Rong authored Sep 24, 2019
1 parent 46ce2bb commit bd1ccaf
Show file tree
Hide file tree
Showing 50 changed files with 528 additions and 710 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@

package com.uber.athena.dsl.planner;

import com.uber.athena.dsl.planner.element.ElementNode;
import com.uber.athena.dsl.planner.topology.Topology;
import com.uber.athena.dsl.planner.utils.ConstructionException;
import com.uber.athena.dsl.planner.utils.ParsingException;
import com.uber.athena.dsl.planner.utils.ValidationException;

import java.io.InputStream;
import java.util.Map;

/**
* Main interface of the planner framework.
*
Expand All @@ -30,14 +39,43 @@
* within the DSL planner framework:
*
* <p><ul>
* <li>{@link DslParser}
* <li>{@link DslValidator}
* <li>{@link ElementBuilder}
* <li>{@link RelationBuilder}
* <li>{@link com.uber.athena.dsl.planner.parser.Parser}
* <li>{@link com.uber.athena.dsl.planner.validation.Validator}
* <li>{@link com.uber.athena.dsl.planner.element.ElementBuilder}
* <li>{@link com.uber.athena.dsl.planner.relation.RelationBuilder}
* <li>...
* </ul></p>
*
*/
public interface Planner {

/**
* Parser an input stream and constructs a {@link Topology}.
*
* @param stream input stream
* @return topology constructed.
* @throws ParsingException when parsing of the input stream fails.
*/
Topology parse(InputStream stream) throws ParsingException;

/**
* Validate that the topology is correct.
*
* <p>Validation does not try to resolve any relations or any elements
* that requires construction, it only validates the semantic.
*
* @param topology topology representing the DSL model.
* @return topology after validation process completes.
* @throws ValidationException validation fails.
*/
Topology validate(Topology topology) throws ValidationException;

/**
* Construct the {@link ElementNode}s for all vertices.
*
* @param topology topology definition of the DSL model.
* @return a mapping .
* @throws ConstructionException when construction fails.
*/
Map<String, ElementNode> constructElement(Topology topology) throws ConstructionException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,55 @@

package com.uber.athena.dsl.planner;

import com.uber.athena.dsl.planner.element.ElementBuilder;
import com.uber.athena.dsl.planner.element.ElementNode;
import com.uber.athena.dsl.planner.parser.Parser;
import com.uber.athena.dsl.planner.topology.Topology;
import com.uber.athena.dsl.planner.utils.ConstructionException;
import com.uber.athena.dsl.planner.utils.ParsingException;
import com.uber.athena.dsl.planner.utils.ValidationException;
import com.uber.athena.dsl.planner.validation.Validator;

import java.io.InputStream;
import java.util.Map;

/**
* Standard {@link Planner} implementation.
*/
public class PlannerImpl implements Planner {

private Parser parser;
private Validator validator;
private ElementBuilder elementBuilder;

// TODO @walterddr create wrapper for planner construction instead of directly
// creating each individual modules.

public PlannerImpl(
Parser parser,
Validator validator,
ElementBuilder elementBuilder
) {
this.parser = parser;
this.validator = validator;
this.elementBuilder = elementBuilder;
}

@Override
public Topology parse(InputStream stream) throws ParsingException {
// fix config utilization.
return parser.parseInputStream(stream, false, null, false);
}

@Override
public Topology validate(Topology topology) throws ValidationException {
validator.validate(topology);
return topology;
}

@Override
public Map<String, ElementNode> constructElement(
Topology topology) throws ConstructionException {
return elementBuilder.construct(topology);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,18 @@

package com.uber.athena.dsl.planner.element;

import com.uber.athena.dsl.planner.type.Type;

/**
* Base implementation of an {@link ElementNode}.
*/
@SuppressWarnings("unchecked")
public class Element implements ElementNode {

private Object obj;
private Type type;
private Class<?> clazz;

public Element(Object obj, Class<?> clazz) {
this(obj, clazz, null);
}

public Element(Object obj, Class<?> clazz, Type type) {
this.obj = obj;
this.clazz = clazz;
this.type = type;
}

@Override
Expand All @@ -50,9 +42,4 @@ public Class<?> getElementClass() {
public <R> R getElement() {
return (R) this.obj;
}

@Override
public <T extends Type> T getProduceType() {
return (T) type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@

package com.uber.athena.dsl.planner.element;

import com.uber.athena.dsl.planner.Blackboard;
import com.uber.athena.dsl.planner.element.constructor.Constructor;
import com.uber.athena.dsl.planner.element.convertlet.ConvertletExecutor;
import com.uber.athena.dsl.planner.model.ModelVertex;
import com.uber.athena.dsl.planner.model.VertexNode;
import com.uber.athena.dsl.planner.topology.Topology;
import com.uber.athena.dsl.planner.type.Type;
import com.uber.athena.dsl.planner.type.TypeFactory;
import com.uber.athena.dsl.planner.utils.ConstructionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* Builder for constructing an {@link Element} from a {@link ModelVertex}.
* Builder for constructing an {@link Element} from a {@link VertexNode}.
*
* <p>Element builder is used to construct a proper Object for later
* relation construction. It does process some
Expand All @@ -46,91 +45,49 @@ public class ElementBuilder {
private Constructor constructor;

/**
* Configuration properties for {@link Convertlet}s and {@link TypeFactory}.
* Configuration properties.
*/
private Properties config;

/**
* Type/Schema factory used to determine output schema of an Element.
*/
private TypeFactory typeFactory;

/**
* Executor of {@link Convertlet}s.
*
* <p>{@link Convertlet}s are encapsulated within the executor environment.
*/
private ConvertletExecutor convertlet;

/**
* Construct an element builder.
*
* @param builderConfig configuration properties for the builder.
* @param typeFactory the type factory to construct element produce type.
* @param convertlet the optional convertlet used to convert element.
* @param constructor the constructor used to construct elements.
*/
public ElementBuilder(
Properties builderConfig,
Constructor constructor,
TypeFactory typeFactory,
ConvertletExecutor convertlet) {
Constructor constructor) {
this.config = builderConfig;
this.constructor = constructor;
this.typeFactory = typeFactory;
this.convertlet = convertlet;
}

/**
* Construct ElementNodes for the entire topology.
* Construct {@link ElementNode}s for the topology.
*
* <p>Constructed nodes will be put into the {@link Blackboard} provided.
* <p>Element builder invokes the constructor to create runtime objects for
* the DSL topology. It provides a reflective constructor mechanism as
* default approach.
*
* @param topology the topology defined by the DSL model.
* @param elementBlackboard blackboard for holding constructed elements.
* @return a mapping from the vertex Ids to the constructed runtime objects.
*/
public void construct(
Topology topology,
Blackboard<ElementNode> elementBlackboard) {
for (ModelVertex vertex : topology.getSources().values()) {
ElementNode node = construct(vertex, topology);
elementBlackboard.saveNode(node, vertex);
public Map<String, ElementNode> construct(
Topology topology) throws ConstructionException {
Map<String, ElementNode> elementMapping = new HashMap<>();
for (VertexNode vertex : topology.getSources().values()) {
ElementNode node = constructor.construct(vertex, topology);
elementMapping.put(vertex.getVertexId(), node);
}
for (ModelVertex vertex : topology.getSinks().values()) {
ElementNode node = construct(vertex, topology);
elementBlackboard.saveNode(node, vertex);
for (VertexNode vertex : topology.getSinks().values()) {
ElementNode node = constructor.construct(vertex, topology);
elementMapping.put(vertex.getVertexId(), node);
}
for (ModelVertex vertex : topology.getOperators().values()) {
ElementNode node = construct(vertex, topology);
elementBlackboard.saveNode(node, vertex);
for (VertexNode vertex : topology.getOperators().values()) {
ElementNode node = constructor.construct(vertex, topology);
elementMapping.put(vertex.getVertexId(), node);
}
}

private ElementNode construct(ModelVertex vertex, Topology topology) {
try {
// Construct the element from vertex using reflection.
Object obj = constructor.construct(vertex.getVertexDef(), topology);
Class<?> clazz = Class.forName(vertex.getVertexDef().getClassName());

// Determines the constructed element produce type.
Type type = typeFactory.getType(vertex);
ElementNode node = new Element(obj, clazz, type);

// Invoke convertlet executor to make extended conversions.
// TODO @walterddr add convertlet invocation.

return node;
} catch (Exception e) {
LOG.error("Cannot construct element!", e);
return null;
}
}

public ConvertletExecutor getConvertlet() {
return convertlet;
}

public TypeFactory getTypeFactory() {
return typeFactory;
return elementMapping;
}

public Properties getConfig() {
Expand Down
Loading

0 comments on commit bd1ccaf

Please sign in to comment.