-
Notifications
You must be signed in to change notification settings - Fork 11
Best Practices
This page contains best practices for writing Euphoria flows.
In order to create a testable Flow
, following rules should be respected.
Every flow should be constructed using FlowBuilder
. FlowBuilder
could be anything that accepts 1..n
input Datasets and returns single "transformed" output Dataset. It should not be aware of how data are read nor persisted.
Lets create a simple FlowBuilder
, that will concat ordered strings within the same key.
public class MyFlowBuilder {
Dataset<String> build(Dataset<Pair<Integer, String>> input) {
return ReduceByKey.named("concat-strings-within-key")
.of(input)
.keyBy(Pair::getFirst)
.valueBy(Pair::getSecond)
.reduceBy((Stream<String> stream) -> stream.collect(Collectors.joining(",")))
.withSortedValues(String::compareTo)
.output();
}
}
This flow design allows very easy testing. We just need to define input Dataset
(which easy, because it does not care about how data are read) and excepted output Dataset
. It is usually good practice to create AbstractTestCase class, with abstract methods to define input and output, and reuse it between test cases.
Please note, that outputs should be compared using DatasetAssert.unorderedEquals
, which is available in euphoria-testing
artifact.
Example for our FlowBuilder
could look like this:
public class MyFlowTest {
private static long startTime;
private static abstract class SingleInputFlowTestCase<IN, OUT> {
abstract List<IN> getInput();
abstract List<OUT> getOutput();
abstract void execute();
}
private static abstract class MyFlowTestCase
extends SingleInputFlowTestCase<Pair<Integer, String>, String> {
@Override
void execute() {
final DataSource<Pair<Integer, String> input = ListDataSource.unbounded(getInput());
final Flow flow = Flow.create();
final ListDataSink<Strink> sink = ListDataSink.get();
// Create flow and persist it
MyFlowBuilder.build(flow.createInput(input)).persist(sink);
final Executor executor = new LocalExecutor();
executor.submit(flow).join();
// Part of euphoria-core test jar
DatasetAssert.unorderedEquals(getOutput(), sink.getOutputs());
}
}
private static void execute(FlowTest test) {
test.execute();
}
@Test
public void test_first() {
execute(new MyFlowTestCase() {
@Override
List<Pair<String, Integer> getInput() {
return Collections.singletonList(Pair.of(1, "a"));
}
@Override
List<String> getOutput() {
return Collections.singletonList("a"));
}
});
}
@Test
public void test_second() {
execute(new MyFlowTestCase() {
@Override
List<Pair<String, Integer> getInput() {
return Arrays.asList(
Pair.of(1, "a"), Pair.of(2, "a"), Pair.of(1, "b"));
}
@Override
List<String> getOutput() {
return Arrays.asList("a", "a,b"));
}
});
}
}
Sorting of user values in reducing operations should be generally avoided, if possible. If user wants to retrieve top N candidates from a dataset by some score, the best option is TopPerKey
.
If sorting is unavoidable, then there is possibility to sort keys in ReduceByKey
as follows:
Dataset<Pair<String, Integer>> in = null;
Dataset<Pair<String, List<Integer>>> output = ReduceByKey.of(in)
.keyBy(Pair::getFirst)
.valueBy(Pair::getSecond)
.reduceBy(s -> s.collect(Collectors.toList()))
.withSortedValues(Integer::compare)
.output();
which will produce output dataset with list of inputs per input key sorted. There are several configuration options to pay attention to, mostly:
-
euphoria.spill.buffer.items
... Number of items to be buffered in memory before spill occurs (default 10000) -
euphoria.spill.tmp.dir
........ Path to local directory to use as spilling directory (default ./)