From 6656df71b843725d04dd6a3c60c295a5e14a7140 Mon Sep 17 00:00:00 2001 From: Lingxuan Zuo Date: Sun, 21 Apr 2024 22:13:58 +0800 Subject: [PATCH] [Python]fix hybird stream test (#66) * fix hybird stream test * hybirdtest * make workaround for hybirdstream test * fix pytest current all of running tests * check python test exit code * add java 11 compatible compiler arguments for sun.ni package * lint --- README.rst | 1 + streaming/buildtest.sh | 9 +++++--- streaming/java/pom.xml | 17 ++++++++++++++ .../api/context/StreamingContext.java | 1 + .../impl/PythonPartitionFunction.java | 2 +- .../impl/RoundRobinPartitionFunction.java | 5 +++-- .../operator/AbstractStreamOperator.java | 1 + .../operator/chain/ChainedOperator.java | 10 ++++++++- .../common/utils}/BinaryFileUtil.java | 2 +- .../ray/streaming/common/utils/JniUtils.java | 1 - .../core/collector/OutputCollector.java | 22 ++++++++++++++++++- .../runtime/transfer/TransferHandler.java | 7 +++--- .../streaming/runtime/worker/JobWorker.java | 2 +- .../runtime/worker/tasks/StreamTask.java | 19 +++++++++++++++- .../runtime/demo/HybridStreamTest.java | 4 ++-- .../python/raystreaming/examples/wordcount.py | 8 ++++--- .../tests/simple/test_function.py | 4 ++-- .../tests/simple/test_operator.py | 8 +++---- .../tests/test_direct_transfer.py | 8 +++---- .../raystreaming/tests/test_failover.py | 2 +- .../raystreaming/tests/test_hybrid_stream.py | 4 ++-- .../python/raystreaming/tests/test_stream.py | 2 +- .../raystreaming/tests/test_union_stream.py | 2 +- streaming/src/data_reader.cc | 8 ++++++- 24 files changed, 112 insertions(+), 37 deletions(-) rename streaming/java/{streaming-runtime/src/main/java/io/ray/streaming/runtime/util => streaming-common/src/main/java/io/ray/streaming/common/utils}/BinaryFileUtil.java (98%) diff --git a/README.rst b/README.rst index 67c256c2..3f19bd55 100644 --- a/README.rst +++ b/README.rst @@ -165,6 +165,7 @@ Build ---------------- Build from source code : + - Build a docker using docker/Dockerfile-env - Execute `scripts/install.sh` diff --git a/streaming/buildtest.sh b/streaming/buildtest.sh index 1e2ecc79..55b1601c 100755 --- a/streaming/buildtest.sh +++ b/streaming/buildtest.sh @@ -111,9 +111,12 @@ function test_streaming_python() fi #python3 -m pytest $script_dir/python/raystreaming/tests/simple --capture=no bazel build java:streaming_java_pkg - python3 -m pytest "$script_dir"/python/raystreaming/tests/ > "$TMP_LOG_OUTPUT"/python-test/python-test.log 2>&1 - zip_and_upload_log "$TMP_LOG_OUTPUT"/python-test/ "${script_dir}/${ZIP_FILE}" "/${GITHUB_SHA}/${TIME}/${ZIP_FILE}" - exit $? + python3 -m pytest "$script_dir"/python/raystreaming/tests/ # > "$TMP_LOG_OUTPUT"/python-test/python-test.log 2>&1 + exit_code=$? + echo "Running python test exit code : ${exit_code}" + echo "[Disabled] Uploding output to remote file." + #zip_and_upload_log "$TMP_LOG_OUTPUT"/python-test/ "${script_dir}/${ZIP_FILE}" "/${GITHUB_SHA}/${TIME}/${ZIP_FILE}" + exit $exit_code popd || exit } diff --git a/streaming/java/pom.xml b/streaming/java/pom.xml index f192507a..dfbf1a4d 100644 --- a/streaming/java/pom.xml +++ b/streaming/java/pom.xml @@ -56,6 +56,7 @@ 1.8 + 11 UTF-8 2.9.3 0.0.1 @@ -128,6 +129,22 @@ + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + ${java.new.version} + ${java.new.version} + ${project.build.sourceEncoding} + + --add-modules=jdk.unsupported + --add-exports=java.base/sun.nio.ch=ALL-UNNAMED + + + + + org.apache.maven.plugins maven-source-plugin diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/StreamingContext.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/StreamingContext.java index 7b99c650..69bfd506 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/StreamingContext.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/context/StreamingContext.java @@ -56,6 +56,7 @@ public static StreamingContext buildContext() { public void execute(String jobName) { JobGraphBuilder jobGraphBuilder = new JobGraphBuilder(this.streamSinks, jobName); JobGraph originalJobGraph = jobGraphBuilder.build(); + originalJobGraph.printJobGraph(); this.jobGraph = new JobGraphOptimizer(originalJobGraph).optimize(); jobGraph.printJobGraph(); LOG.info("JobGraph digraph\n{}", jobGraph.generateDigraph()); diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/partition/impl/PythonPartitionFunction.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/partition/impl/PythonPartitionFunction.java index 4cbf4d48..d8ef22da 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/partition/impl/PythonPartitionFunction.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/partition/impl/PythonPartitionFunction.java @@ -23,7 +23,7 @@ public class PythonPartitionFunction implements Partition { public static final PythonPartitionFunction KeyPartition = new PythonPartitionFunction("raystreaming.partition", "KeyPartition"); public static final PythonPartitionFunction RoundRobinPartition = - new PythonPartitionFunction("raystreaming.partition", "RoundRobinPartitionFunction"); + new PythonPartitionFunction("raystreaming.partition", "RoundRobinPartition"); public static final String FORWARD_PARTITION_CLASS = "ForwardPartition"; public static final PythonPartitionFunction ForwardPartition = new PythonPartitionFunction("raystreaming.partition", FORWARD_PARTITION_CLASS); diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/partition/impl/RoundRobinPartitionFunction.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/partition/impl/RoundRobinPartitionFunction.java index 9d99adb8..76b4835d 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/partition/impl/RoundRobinPartitionFunction.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/api/partition/impl/RoundRobinPartitionFunction.java @@ -25,8 +25,9 @@ public int[] partition(T value, int currentIndex, int numPartition) { @Override public int[] partition(T record, int numPartition) { - // TODO - return new int[0]; + seq = (seq + 1) % numPartition; + partitions[0] = seq; + return partitions; } @Override diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/operator/AbstractStreamOperator.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/operator/AbstractStreamOperator.java index a2b11736..8be74970 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/operator/AbstractStreamOperator.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/operator/AbstractStreamOperator.java @@ -54,6 +54,7 @@ public void setFunction(F function) { @Override public void open(List collectorList, RuntimeContext runtimeContext) { + LOG.info("Abstract {}, {} open : {}.", this.getId(), this.getName(), collectorList.size()); this.collectorList = collectorList; this.runtimeContext = runtimeContext; if (runtimeContext != null && runtimeContext.getOpConfig() != null) { diff --git a/streaming/java/streaming-api/src/main/java/io/ray/streaming/operator/chain/ChainedOperator.java b/streaming/java/streaming-api/src/main/java/io/ray/streaming/operator/chain/ChainedOperator.java index 20b71f80..490964e0 100644 --- a/streaming/java/streaming-api/src/main/java/io/ray/streaming/operator/chain/ChainedOperator.java +++ b/streaming/java/streaming-api/src/main/java/io/ray/streaming/operator/chain/ChainedOperator.java @@ -59,7 +59,7 @@ public ChainedOperator( @Override public void open(List collectorList, RuntimeContext runtimeContext) { // Dont' call super.open() as we `open` every operator separately. - LOG.info("chainedOperator open."); + LOG.info("ChainedOperator open."); for (int i = 0; i < operators.size(); i++) { StreamOperator operator = operators.get(i); List succeedingCollectors = new ArrayList<>(); @@ -77,6 +77,14 @@ public void open(List collectorList, RuntimeContext runtimeContext) { (collector.getId() == operator.getId() && collector.getDownStreamOpId() == subOperator.getId())) .collect(Collectors.toList())); + // FIXME(lingxuan.zlx): Workaround for edge mismatch, see more detail from + // https://github.com/ray-project/mobius/issues/67. + if (succeedingCollectors.isEmpty()) { + succeedingCollectors.addAll( + collectorList.stream() + .filter(x -> (x.getDownStreamOpId() == subOperator.getId())) + .collect(Collectors.toList())); + } } }); operator.open(succeedingCollectors, createRuntimeContext(runtimeContext, i)); diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java b/streaming/java/streaming-common/src/main/java/io/ray/streaming/common/utils/BinaryFileUtil.java similarity index 98% rename from streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java rename to streaming/java/streaming-common/src/main/java/io/ray/streaming/common/utils/BinaryFileUtil.java index 4717b506..de00f06f 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/util/BinaryFileUtil.java +++ b/streaming/java/streaming-common/src/main/java/io/ray/streaming/common/utils/BinaryFileUtil.java @@ -1,4 +1,4 @@ -package io.ray.streaming.runtime.util; +package io.ray.streaming.common.utils; import com.google.common.base.Preconditions; import java.io.File; diff --git a/streaming/java/streaming-common/src/main/java/io/ray/streaming/common/utils/JniUtils.java b/streaming/java/streaming-common/src/main/java/io/ray/streaming/common/utils/JniUtils.java index a8675aec..6592af7c 100644 --- a/streaming/java/streaming-common/src/main/java/io/ray/streaming/common/utils/JniUtils.java +++ b/streaming/java/streaming-common/src/main/java/io/ray/streaming/common/utils/JniUtils.java @@ -2,7 +2,6 @@ import com.google.common.collect.Sets; import com.sun.jna.NativeLibrary; -import io.ray.runtime.util.BinaryFileUtil; import java.io.File; import java.io.IOException; import java.nio.file.Files; diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/collector/OutputCollector.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/collector/OutputCollector.java index 877f3c5b..7fb3c644 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/collector/OutputCollector.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/core/collector/OutputCollector.java @@ -20,6 +20,12 @@ public class OutputCollector implements Collector { private static final Logger LOGGER = LoggerFactory.getLogger(OutputCollector.class); + /** Collector id belongs to source id of edge. */ + private final Integer collectorId; + + /** DownStream id belongs to target id of edge. */ + private final Integer downStreamId; + private final DataWriter writer; private final ChannelId[] outputQueues; private final Collection targetActors; @@ -29,10 +35,14 @@ public class OutputCollector implements Collector { private final Serializer crossLangSerializer = new CrossLangSerializer(); public OutputCollector( + Integer collectorId, + Integer downStreamId, DataWriter writer, Collection outputChannelIds, Collection targetActors, Partition partition) { + this.collectorId = collectorId; + this.downStreamId = downStreamId; this.writer = writer; this.outputQueues = outputChannelIds.stream().map(ChannelId::from).toArray(ChannelId[]::new); this.targetActors = targetActors; @@ -41,7 +51,7 @@ public OutputCollector( .map(actor -> actor instanceof PyActorHandle ? Language.PYTHON : Language.JAVA) .toArray(Language[]::new); this.partition = partition; - LOGGER.debug( + LOGGER.info( "OutputCollector constructed, outputChannelIds:{}, partition:{}.", outputChannelIds, this.partition); @@ -78,4 +88,14 @@ public void collect(Record record) { } } } + + @Override + public int getId() { + return collectorId; + } + + @Override + public int getDownStreamOpId() { + return downStreamId; + } } diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java index 124f8256..0e00feb2 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/transfer/TransferHandler.java @@ -1,7 +1,6 @@ package io.ray.streaming.runtime.transfer; -import io.ray.runtime.util.BinaryFileUtil; -import io.ray.runtime.util.JniUtils; +import io.ray.streaming.common.utils.JniUtils; /** * TransferHandler is used for handle direct call based data transfer between workers. @@ -10,8 +9,8 @@ public class TransferHandler { static { - JniUtils.loadLibrary(BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true); - io.ray.streaming.common.utils.JniUtils.loadLibrary("streaming_java"); + JniUtils.loadLibrary(io.ray.runtime.util.BinaryFileUtil.CORE_WORKER_JAVA_LIBRARY, true); + JniUtils.loadLibrary("streaming_java"); } private long writerClientNative; diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/JobWorker.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/JobWorker.java index bf57dc23..fcda2a8e 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/JobWorker.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/JobWorker.java @@ -58,7 +58,7 @@ public class JobWorker implements Serializable { private static final byte[] NOT_READY_FLAG = new byte[4]; static { - EnvUtil.loadNativeLibraries(); + // EnvUtil.loadNativeLibraries(); } /** JobWorker runtime context state. Used for creating stateful operator like reduce operator. */ diff --git a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java index 02563c5d..f6cc5272 100644 --- a/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java +++ b/streaming/java/streaming-runtime/src/main/java/io/ray/streaming/runtime/worker/tasks/StreamTask.java @@ -5,6 +5,7 @@ import io.ray.streaming.api.collector.Collector; import io.ray.streaming.api.context.RuntimeContext; import io.ray.streaming.api.partition.Partition; +import io.ray.streaming.common.tuple.Tuple2; import io.ray.streaming.runtime.config.worker.WorkerInternalConfig; import io.ray.streaming.runtime.context.ContextBackend; import io.ray.streaming.runtime.core.checkpoint.OperatorCheckpointInfo; @@ -190,8 +191,15 @@ private void openProcessor() { Map> opGroupedChannelId = new HashMap<>(); Map> opGroupedActor = new HashMap<>(); Map opPartitionMap = new HashMap<>(); + Map> opIdAndDownStreamIdMap = new HashMap<>(); for (int i = 0; i < outputEdges.size(); ++i) { ExecutionEdge edge = outputEdges.get(i); + LOG.info( + "Upstream {} {}, downstream {} {}.", + edge.getSource().getExecutionVertexName(), + edge.getSource().getOperator().getId(), + edge.getTargetExecutionJobVertexName(), + edge.getTarget().getOperator().getId()); String opName = edge.getTargetExecutionJobVertexName(); if (!opPartitionMap.containsKey(opName)) { opGroupedChannelId.put(opName, new ArrayList<>()); @@ -202,6 +210,10 @@ private void openProcessor() { .get(opName) .add(new ArrayList<>(executionVertex.getChannelIdOutputActorMap().values()).get(i)); opPartitionMap.put(opName, edge.getPartition()); + opIdAndDownStreamIdMap.put( + opName, + Tuple2.of( + edge.getSource().getOperator().getId(), edge.getTarget().getOperator().getId())); } opPartitionMap .keySet() @@ -209,6 +221,8 @@ private void openProcessor() { opName -> { collectors.add( new OutputCollector( + opIdAndDownStreamIdMap.get(opName).f0, + opIdAndDownStreamIdMap.get(opName).f1, writer, opGroupedChannelId.get(opName), opGroupedActor.get(opName), @@ -217,7 +231,10 @@ private void openProcessor() { RuntimeContext runtimeContext = new StreamingTaskRuntimeContext(executionVertex, lastCheckpointId); - + for (Collector collector : collectors) { + LOG.info( + "Collector id {}, downstream id {}.", collector.getId(), collector.getDownStreamOpId()); + } processor.open(collectors, runtimeContext); } diff --git a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/HybridStreamTest.java b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/HybridStreamTest.java index af45ff32..08e88910 100644 --- a/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/HybridStreamTest.java +++ b/streaming/java/streaming-runtime/src/test/java/io/ray/streaming/runtime/demo/HybridStreamTest.java @@ -54,8 +54,8 @@ public void testHybridDataStream() throws Exception { streamSource .map(x -> x + x) .asPythonStream() - .map("ray.streaming.tests.test_hybrid_stream", "map_func1") - .filter("ray.streaming.tests.test_hybrid_stream", "filter_func1") + .map("raystreaming.tests.test_hybrid_stream", "map_func1") + .filter("raystreaming.tests.test_hybrid_stream", "filter_func1") .asJavaStream() .sink( (SinkFunction) diff --git a/streaming/python/raystreaming/examples/wordcount.py b/streaming/python/raystreaming/examples/wordcount.py index ec77da6a..bd7172b6 100644 --- a/streaming/python/raystreaming/examples/wordcount.py +++ b/streaming/python/raystreaming/examples/wordcount.py @@ -5,8 +5,10 @@ import ray import wikipedia -from ray.streaming import StreamingContext -from ray.streaming.config import Config + +# from ray.streaming import StreamingContext +from raystreaming import StreamingContext +from raystreaming.config import Config logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) @@ -75,7 +77,7 @@ def splitter(line): .build() ) # A Ray streaming environment with the default configuration - ctx.set_parallelism(1) # Each operator will be executed by two actors + # ctx.set_parallelism(1) # Each operator will be executed by two actors # Reads articles from wikipedia, splits them in words, # shuffles words, and counts the occurrences of each word. diff --git a/streaming/python/raystreaming/tests/simple/test_function.py b/streaming/python/raystreaming/tests/simple/test_function.py index b5ff43d0..c85a12cc 100644 --- a/streaming/python/raystreaming/tests/simple/test_function.py +++ b/streaming/python/raystreaming/tests/simple/test_function.py @@ -1,5 +1,5 @@ -from ray.streaming import function -from ray.streaming.runtime import gateway_client +from raystreaming import function +from raystreaming.runtime import gateway_client def test_get_simple_function_class(): diff --git a/streaming/python/raystreaming/tests/simple/test_operator.py b/streaming/python/raystreaming/tests/simple/test_operator.py index 5878868a..dff99b14 100644 --- a/streaming/python/raystreaming/tests/simple/test_operator.py +++ b/streaming/python/raystreaming/tests/simple/test_operator.py @@ -1,7 +1,7 @@ -from ray.streaming import function -from ray.streaming import operator -from ray.streaming.operator import OperatorType -from ray.streaming.runtime import gateway_client +from raystreaming import function +from raystreaming import operator +from raystreaming.operator import OperatorType +from raystreaming.runtime import gateway_client def test_create_operator_with_func(): diff --git a/streaming/python/raystreaming/tests/test_direct_transfer.py b/streaming/python/raystreaming/tests/test_direct_transfer.py index 3c956285..63a09875 100644 --- a/streaming/python/raystreaming/tests/test_direct_transfer.py +++ b/streaming/python/raystreaming/tests/test_direct_transfer.py @@ -3,10 +3,10 @@ import time import ray -import ray.streaming._streaming as _streaming -import ray.streaming.runtime.transfer as transfer +from raystreaming import _streaming +import raystreaming.runtime.transfer as transfer from ray._raylet import PythonFunctionDescriptor -from ray.streaming.config import Config +from raystreaming.config import Config import pytest @@ -102,7 +102,7 @@ def on_writer_message_sync(self, buffer: bytes): return result.to_pybytes() -@pytest.mark.skip(reason="Waitting to fix") +# @pytest.mark.skip(reason="Waitting to fix") def test_queue(): ray.init() writer = Worker._remote() diff --git a/streaming/python/raystreaming/tests/test_failover.py b/streaming/python/raystreaming/tests/test_failover.py index 8d36c647..eede9757 100644 --- a/streaming/python/raystreaming/tests/test_failover.py +++ b/streaming/python/raystreaming/tests/test_failover.py @@ -5,7 +5,7 @@ import ray import pytest -from ray.streaming import StreamingContext +from raystreaming import StreamingContext @pytest.mark.skip(reason="Current log output in console, we can not capture from logs") diff --git a/streaming/python/raystreaming/tests/test_hybrid_stream.py b/streaming/python/raystreaming/tests/test_hybrid_stream.py index 715e0e66..eff7eb82 100644 --- a/streaming/python/raystreaming/tests/test_hybrid_stream.py +++ b/streaming/python/raystreaming/tests/test_hybrid_stream.py @@ -3,7 +3,7 @@ import sys import ray -from ray.streaming import StreamingContext +from raystreaming import StreamingContext from ray._private.test_utils import wait_for_condition import pytest @@ -32,7 +32,7 @@ def test_hybrid_stream(): ) current_dir = os.path.abspath(os.path.dirname(__file__)) jar_path = os.path.join( - current_dir, "../../../bazel-bin/streaming/java/all_streaming_tests_deploy.jar" + current_dir, "../../../bazel-bin/java/all_streaming_tests_deploy.jar" ) jar_path = os.path.abspath(jar_path) print("jar_path", jar_path) diff --git a/streaming/python/raystreaming/tests/test_stream.py b/streaming/python/raystreaming/tests/test_stream.py index 1d424f18..642f5b7a 100644 --- a/streaming/python/raystreaming/tests/test_stream.py +++ b/streaming/python/raystreaming/tests/test_stream.py @@ -1,7 +1,7 @@ import sys import ray -from ray.streaming import StreamingContext +from raystreaming import StreamingContext def test_data_stream(): diff --git a/streaming/python/raystreaming/tests/test_union_stream.py b/streaming/python/raystreaming/tests/test_union_stream.py index 9fa8c152..6cfdc749 100644 --- a/streaming/python/raystreaming/tests/test_union_stream.py +++ b/streaming/python/raystreaming/tests/test_union_stream.py @@ -2,7 +2,7 @@ import sys import ray -from ray.streaming import StreamingContext +from raystreaming import StreamingContext def test_union_stream(): diff --git a/streaming/src/data_reader.cc b/streaming/src/data_reader.cc index d94a02ed..20e6d6e9 100644 --- a/streaming/src/data_reader.cc +++ b/streaming/src/data_reader.cc @@ -76,6 +76,11 @@ StreamingStatus DataReader::InitChannel( for (const auto &input_channel : unready_queue_ids_) { auto &channel_info = channel_info_map_[input_channel]; + auto it = channel_map_.find(input_channel); + if (it != channel_map_.end()) { + STREAMING_LOG(INFO) << "Channel id " << input_channel << " has been initialized."; + continue; + } std::shared_ptr channel; if (runtime_context_->IsMockTest()) { channel = std::make_shared(transfer_config_, channel_info); @@ -86,7 +91,8 @@ StreamingStatus DataReader::InitChannel( channel_map_.emplace(input_channel, channel); TransferCreationStatus status = channel->CreateTransferChannel(); creation_status.push_back(status); - if (TransferCreationStatus::PullOk != status) { + if (TransferCreationStatus::DataLost == status || + TransferCreationStatus::Timeout == status) { STREAMING_LOG(ERROR) << "Initialize queue failed, id=" << input_channel << ", status=" << static_cast(status); }