Skip to content

Commit

Permalink
Node: new getName() and name(String) methods for nodes, allows better…
Browse files Browse the repository at this point in the history
… identification in traces and logs
  • Loading branch information
eledhwen committed Sep 20, 2021
1 parent 7bf37ee commit 6e2774a
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 20 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Add the following in your `pom.xml`:
<dependency>
<groupId>com.noleme</groupId>
<artifactId>noleme-flow</artifactId>
<version>0.13.2</version>
<version>0.14</version>
</dependency>
```

Expand Down Expand Up @@ -165,6 +165,7 @@ Other features that will need to be documented include:
* runtime output management, sampling/collection features (`collect`, `sample` and the `Output` component)
* stream flows and parallelization (`setMaxParallelism` and implementation-specific considerations)
* `ParallelRuntime` service executor lifecycle and other considerations
* DAG node naming for debugging purposes (appears in traces, logs)
_TODO_
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.noleme</groupId>
<artifactId>noleme-flow</artifactId>
<version>0.13.2</version>
<version>0.14</version>
<packaging>jar</packaging>

<name>Noleme Flow</name>
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/Join.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,15 @@ public Pipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
}

/**
*
* @param name
* @return
*/
public Join<I1, I2, O> name(String name)
{
this.name = name;
return this;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,15 @@ public Pipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
}

/**
*
* @param name
* @return
*/
public Pipe<I, O> name(String name)
{
this.name = name;
return this;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,15 @@ public Sink(Loader<I> actor)
{
super(actor);
}

/**
*
* @param name
* @return
*/
public Sink<I> name(String name)
{
this.name = name;
return this;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ public Pipe<O, O> interruptIf(Predicate<O> predicate)
return this.into(new Interruption<>(predicate));
}

/**
*
* @param name
* @return
*/
public Source<O> name(String name)
{
this.name = name;
return this;
}

@Override
public List<Node> getUpstream()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public static void blockBranch(Node n, Set<Node> blocked)
*/
private static void registerStream(StreamPipeline node, LinkedList<Node> runQueue, Heap heap)
{
StreamGenerator generatorNode = node.getGeneratorNode();
Generator generator = heap.getStreamGenerator(generatorNode);
StreamGenerator<?, ?> generatorNode = node.getGeneratorNode();
Generator<?> generator = heap.getStreamGenerator(generatorNode);

if (generator.hasNext())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ else if (node instanceof StreamAccumulator)
throw new PipelineRunException("Unknown node type " + node.getClass().getName(), heap);
}
catch (InterruptionException e) {
logger.debug("Flow node #{} has requested an interruption, blocking downstream nodes.", node.getUid());
logger.debug("Flow node {}#{} has requested an interruption, blocking downstream nodes.", getName(node), node.getUid());

return false;
}
catch (ExtractionException | TransformationException | LoadingException | GenerationException | AccumulationException e) {
logger.error("Flow node #{} has thrown an error: {}", node.getUid(), e.getMessage());
logger.error("Flow node {}#{} has thrown an error: {}", getName(node), node.getUid(), e.getMessage());

throw new PipelineRunException("Node " + node.getClass().getName() + "#" + node.getUid() + " has thrown an exception. (" + e.getClass() + ")", e, heap);
throw new PipelineRunException("Node " + node.getClass().getName() + " " + getName(node) + "#" + node.getUid() + " has thrown an exception. (" + e.getClass() + ")", e, heap);
}
}

Expand All @@ -103,7 +103,7 @@ private boolean launchSource(Source<?> source, Heap heap) throws ExtractionExcep
{
Extractor extractor = source.getActor();

logger.debug("Launching flow source #{} of extractor {}", source.getUid(), extractor.getClass().getName());
logger.debug("Launching flow source {}#{} of extractor {}", getName(source), source.getUid(), extractor.getClass().getName());

/* If the extractor is an InputExtractor, the output value comes from the provided input instead of the extractor itself ; the extractor only holds a reference to the expected input */
if (extractor instanceof InputExtractor)
Expand Down Expand Up @@ -133,7 +133,7 @@ private boolean launchPipe(Pipe<?, ?> pipe, Heap heap) throws TransformationExce
{
Transformer transformer = pipe.getActor();

logger.debug("Launching flow pipe #{} of transformer {}", pipe.getUid(), transformer.getClass().getName());
logger.debug("Launching flow pipe {}#{} of transformer {}", getName(pipe), pipe.getUid(), transformer.getClass().getName());

Object input = heap.consume(pipe.getSimpleUpstream().getUid());
heap.push(pipe.getUid(), transformer.transform(input), pipe.getDownstream().size());
Expand All @@ -152,7 +152,15 @@ private boolean launchJoin(Join<?, ?, ?> join, Heap heap) throws TransformationE
{
BiTransformer transformer = join.getActor();

logger.debug("Launching flow join #{} of upstream flows #{} and #{}", join.getUid(), join.getUpstream1().getUid(), join.getUpstream2().getUid());
logger.debug(
"Launching flow join {}#{} of upstream flows {}#{} and {}#{}",
getName(join),
join.getUid(),
getName(join.getUpstream1()),
join.getUpstream1().getUid(),
getName(join.getUpstream2()),
join.getUpstream2().getUid()
);

Object input1 = heap.consume(join.getUpstream1().getUid());
Object input2 = heap.consume(join.getUpstream2().getUid());
Expand All @@ -171,7 +179,7 @@ private boolean launchSink(Sink<?> sink, Heap heap) throws LoadingException
{
Loader loader = sink.getActor();

logger.debug("Launching flow sink #{} of loader {}", sink.getUid(), loader.getClass().getName());
logger.debug("Launching flow sink {}#{} of loader {}", getName(sink), sink.getUid(), loader.getClass().getName());

Object input = heap.consume(sink.getSimpleUpstream().getUid());

Expand Down Expand Up @@ -228,7 +236,7 @@ private boolean launchStreamGenerator(StreamGenerator<?, ?> generatorNode, int o
{
Generator generator = heap.getStreamGenerator(generatorNode);

logger.debug("Launching flow stream generator #{} at offset {} with generator {}", generatorNode.getUid(), offset, generator.getClass().getName());
logger.debug("Launching flow stream generator {}#{} at offset {} with generator {}", getName(generatorNode), generatorNode.getUid(), offset, generator.getClass().getName());

heap.push(generatorNode.getUid(), offset, generator.generate(), generatorNode.getDownstream().size());
return true;
Expand All @@ -247,7 +255,7 @@ private boolean launchStreamPipe(StreamPipe<?, ?> pipe, int offset, Heap heap) t
{
Transformer transformer = pipe.getActor();

logger.debug("Launching flow stream pipe #{} at offset {} of transformer {}", pipe.getUid(), offset, transformer.getClass().getName());
logger.debug("Launching flow stream pipe {}#{} at offset {} of transformer {}", getName(pipe), pipe.getUid(), offset, transformer.getClass().getName());

Object input = heap.consume(pipe.getSimpleUpstream().getUid(), offset);
heap.push(pipe.getUid(), offset, transformer.transform(input), pipe.getDownstream().size());
Expand All @@ -267,7 +275,16 @@ private boolean launchStreamJoin(StreamJoin<?, ?, ?> join, int offset, Heap heap
{
BiTransformer transformer = join.getActor();

logger.debug("Launching flow stream join #{} at offset {} of upstream flows #{} and #{}", join.getUid(), offset, join.getUpstream1().getUid(), join.getUpstream2().getUid());
logger.debug(
"Launching flow stream join {}#{} at offset {} of upstream flows {}#{} and {}#{}",
getName(join),
join.getUid(),
offset,
getName(join.getUpstream1()),
join.getUpstream1().getUid(),
getName(join.getUpstream2()),
join.getUpstream2().getUid()
);

Object input1 = heap.consume(join.getUpstream1().getUid(), offset);
Object input2 = heap.consume(join.getUpstream2().getUid(), offset);
Expand All @@ -288,7 +305,7 @@ private boolean launchStreamSink(StreamSink<?> sink, int offset, Heap heap) thro
{
Loader loader = sink.getActor();

logger.debug("Launching flow stream sink #{} at offset {} of loader {}", sink.getUid(), offset, loader.getClass().getName());
logger.debug("Launching flow stream sink {}#{} at offset {} of loader {}", getName(sink), sink.getUid(), offset, loader.getClass().getName());

Object input = heap.consume(sink.getSimpleUpstream().getUid(), offset);
loader.load(input);
Expand All @@ -307,10 +324,20 @@ private boolean launchStreamAccumulator(StreamAccumulator<?, ?> node, Heap heap)
{
Accumulator accumulator = node.getActor();

logger.debug("Launching flow stream accumulator #{} of accumulator {}", node.getUid(), node.getClass().getName());
logger.debug("Launching flow stream accumulator {}#{} of accumulator {}", getName(node), node.getUid(), node.getClass().getName());

Collection<Object> input = heap.consumeAll(node.getSimpleUpstream().getUid());
heap.push(node.getUid(), accumulator.accumulate(input), node.getDownstream().size());
return true;
}

/**
*
* @param node
* @return
*/
private static String getName(Node node)
{
return node.getName() != null ? node.getName() : "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public String getUid()
return this.uid;
}

@Override
public String getName()
{
return this.getNode().getName();
}

public int getOffset()
{
return this.offset;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/noleme/flow/node/AbstractNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
public abstract class AbstractNode implements Node
{
private final String uid;
protected String name;
final List<Node> downstream;
final List<Node> requirements;
final List<Node> requiredBy;
Expand All @@ -30,6 +31,11 @@ public String getUid()
return this.uid;
}

public String getName()
{
return this.name;
}

@Override
public List<Node> getDownstream()
{
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/noleme/flow/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ public interface Node extends Comparable<Node>
*/
String getUid();

/**
* Returns a user-specified name for the node within a flow DAG.
* The name can be used to clarify logs and traces when a specific actor is used at multiple places within a DAG.
*
* @return the node name, null if none was specified
*/
String getName();

/**
* Returns the list of downstream nodes, ie. nodes which require the current node's output in order to be executed.
*
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/stream/StreamAccumulator.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,15 @@ public StreamAccumulator<I, O> sample(String name)
this.collect(name);
return this;
}

/**
*
* @param name
* @return
*/
public StreamAccumulator<I, O> name(String name)
{
this.name = name;
return this;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/stream/StreamGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,15 @@ public StreamGenerator<I, O> setMaxParallelism(int factor)
this.maxParallelism = factor;
return this;
}

/**
*
* @param name
* @return
*/
public StreamGenerator<I, O> name(String name)
{
this.name = name;
return this;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/stream/StreamJoin.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,15 @@ public StreamPipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
}

/**
*
* @param name
* @return
*/
public StreamJoin<I1, I2, O> name(String name)
{
this.name = name;
return this;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/stream/StreamPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,15 @@ public StreamPipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
}

/**
*
* @param name
* @return
*/
public StreamPipe<I, O> name(String name)
{
this.name = name;
return this;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/stream/StreamSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,15 @@ public StreamSink(Loader<I> actor)
{
super(actor);
}

/**
*
* @param name
* @return
*/
public StreamSink<I> name(String name)
{
this.name = name;
return this;
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/noleme/flow/stream/StreamSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,15 @@ public StreamPipe<O, O> interruptIf(Predicate<O> predicate)
{
return this.into(new Interruption<>(predicate));
}

/**
*
* @param name
* @return
*/
public StreamSource<O> name(String name)
{
this.name = name;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ void testAccumulation() throws RunException, CompilationException
void testStreamJoinSmall() throws RunException, CompilationException
{
var flowA = Flow
.from(() -> 3)
.into(i -> i + 2)
.from(() -> 3).name("source_a")
.into(i -> i + 2).name("flow_a")
;

var flow = Flow
.from(() -> List.of(1, 2, 3, 4, 5))
.from(() -> List.of(1, 2, 3, 4, 5)).name("source_b")
.stream(IterableGenerator::new)
.into(i -> i + 1)
.into(i -> i + 1).name("flow_b")
.join(flowA, (current, a) -> current * a)
.accumulate(ls -> ls.stream()
.reduce(Integer::sum)
Expand Down

0 comments on commit 6e2774a

Please sign in to comment.