Skip to content

Commit

Permalink
hybirdtest
Browse files Browse the repository at this point in the history
  • Loading branch information
ashione committed Apr 13, 2024
1 parent 077441e commit 1d234f9
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public static StreamingContext buildContext() {
public void execute(String jobName) {
JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(this.streamSinks, jobName);
JobGraph originalJobGraph = jobGraphBuilder.build();
originalJobGraph.printJobGraph();
this.jobGraph = new JobGraphOptimizer(originalJobGraph).optimize();
jobGraph.printJobGraph();
LOG.info("JobGraph digraph\n{}", jobGraph.generateDigraph());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public class OutputCollector implements Collector<Record> {

private static final Logger LOGGER = LoggerFactory.getLogger(OutputCollector.class);

/** Collector id belongs to source id of edge. */
private final Integer collectorId;

/** DownStream id belongs to target id of edge. */
private final Integer downStreamId;

private final DataWriter writer;
private final ChannelId[] outputQueues;
private final Collection<BaseActorHandle> targetActors;
Expand All @@ -29,10 +35,14 @@ public class OutputCollector implements Collector<Record> {
private final Serializer crossLangSerializer = new CrossLangSerializer();

public OutputCollector(
Integer collectorId,
Integer downStreamId,
DataWriter writer,
Collection<String> outputChannelIds,
Collection<BaseActorHandle> targetActors,
Partition partition) {
this.collectorId = collectorId;
this.downStreamId = downStreamId;
this.writer = writer;
this.outputQueues = outputChannelIds.stream().map(ChannelId::from).toArray(ChannelId[]::new);
this.targetActors = targetActors;
Expand All @@ -41,14 +51,15 @@ public OutputCollector(
.map(actor -> actor instanceof PyActorHandle ? Language.PYTHON : Language.JAVA)
.toArray(Language[]::new);
this.partition = partition;
LOGGER.debug(
LOGGER.info(
"OutputCollector constructed, outputChannelIds:{}, partition:{}.",
outputChannelIds,
this.partition);
}

@Override
public void collect(Record record) {
LOGGER.info("Collect in output {}.", record);
int[] partitions = this.partition.partition(record, outputQueues.length);
ByteBuffer javaBuffer = null;
ByteBuffer crossLangBuffer = null;
Expand Down Expand Up @@ -78,4 +89,14 @@ public void collect(Record record) {
}
}
}

@Override
public int getId() {
return collectorId;
}

@Override
public int getDownStreamOpId() {
return downStreamId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public void testHybridDataStream() throws Exception {
streamSource
.map(x -> x + x)
.asPythonStream()
.map("ray.streaming.tests.test_hybrid_stream", "map_func1")
.filter("ray.streaming.tests.test_hybrid_stream", "filter_func1")
.map("raystreaming.tests.test_hybrid_stream", "map_func1")
.filter("raystreaming.tests.test_hybrid_stream", "filter_func1")
.asJavaStream()
.sink(
(SinkFunction<Object>)
Expand Down

0 comments on commit 1d234f9

Please sign in to comment.