diff --git a/streaming/buildtest.sh b/streaming/buildtest.sh index 35ce28f..55b1601 100755 --- a/streaming/buildtest.sh +++ b/streaming/buildtest.sh @@ -111,7 +111,7 @@ function test_streaming_python() fi #python3 -m pytest $script_dir/python/raystreaming/tests/simple --capture=no bazel build java:streaming_java_pkg - python3 -m pytest "$script_dir"/python/raystreaming/tests/ > "$TMP_LOG_OUTPUT"/python-test/python-test.log 2>&1 + python3 -m pytest "$script_dir"/python/raystreaming/tests/ # > "$TMP_LOG_OUTPUT"/python-test/python-test.log 2>&1 exit_code=$? echo "Running python test exit code : ${exit_code}" echo "[Disabled] Uploding output to remote file." diff --git a/streaming/python/raystreaming/tests/simple/test_function.py b/streaming/python/raystreaming/tests/simple/test_function.py index 76f6da5..c85a12c 100644 --- a/streaming/python/raystreaming/tests/simple/test_function.py +++ b/streaming/python/raystreaming/tests/simple/test_function.py @@ -3,13 +3,11 @@ def test_get_simple_function_class(): - simple_map_func_class = function._get_simple_function_class( - function.MapFunction) + simple_map_func_class = function._get_simple_function_class(function.MapFunction) assert simple_map_func_class is function.SimpleMapFunction class MapFunc(function.MapFunction): - def map(self, value): return str(value) @@ -18,6 +16,7 @@ def test_load_function(): # function_bytes, module_name, function_name/class_name, # function_interface descriptor_func_bytes = gateway_client.serialize( - [None, __name__, MapFunc.__name__, "MapFunction"]) + [None, __name__, MapFunc.__name__, "MapFunction"] + ) func = function.load_function(descriptor_func_bytes) assert type(func) is MapFunc diff --git a/streaming/python/raystreaming/tests/simple/test_operator.py b/streaming/python/raystreaming/tests/simple/test_operator.py index 02e1d07..dff99b1 100644 --- a/streaming/python/raystreaming/tests/simple/test_operator.py +++ b/streaming/python/raystreaming/tests/simple/test_operator.py @@ -11,13 +11,11 @@ def test_create_operator_with_func(): class MapFunc(function.MapFunction): - def map(self, value): return str(value) class EmptyOperator(operator.StreamOperator): - def __init__(self): super().__init__(function.EmptyFunction()) @@ -28,12 +26,13 @@ def operator_type(self) -> OperatorType: def test_load_operator(): # function_bytes, module_name, class_name, descriptor_func_bytes = gateway_client.serialize( - [None, __name__, MapFunc.__name__, "MapFunction"]) - descriptor_op_bytes = gateway_client.serialize( - [descriptor_func_bytes, "", ""]) + [None, __name__, MapFunc.__name__, "MapFunction"] + ) + descriptor_op_bytes = gateway_client.serialize([descriptor_func_bytes, "", ""]) map_operator = operator.load_operator(descriptor_op_bytes) assert type(map_operator) is operator.MapOperator descriptor_op_bytes = gateway_client.serialize( - [None, __name__, EmptyOperator.__name__]) + [None, __name__, EmptyOperator.__name__] + ) test_operator = operator.load_operator(descriptor_op_bytes) assert isinstance(test_operator, EmptyOperator) diff --git a/streaming/python/raystreaming/tests/simple/test_serialization.py b/streaming/python/raystreaming/tests/simple/test_serialization.py index a461468..d18967f 100644 --- a/streaming/python/raystreaming/tests/simple/test_serialization.py +++ b/streaming/python/raystreaming/tests/simple/test_serialization.py @@ -9,5 +9,4 @@ def test_serialize(): key_record = KeyRecord("key", "value") key_record.stream = "stream2" assert record == serializer.deserialize(serializer.serialize(record)) - assert key_record == serializer.deserialize( - serializer.serialize(key_record)) + assert key_record == serializer.deserialize(serializer.serialize(key_record)) diff --git a/streaming/python/raystreaming/tests/test_direct_transfer.py b/streaming/python/raystreaming/tests/test_direct_transfer.py index 9de60b6..63a0987 100644 --- a/streaming/python/raystreaming/tests/test_direct_transfer.py +++ b/streaming/python/raystreaming/tests/test_direct_transfer.py @@ -12,7 +12,6 @@ @ray.remote class Worker: - def __init__(self): self.writer_client = _streaming.WriterClient() self.reader_client = _streaming.ReaderClient() @@ -23,32 +22,36 @@ def __init__(self): def init_writer(self, output_channel, reader_actor): conf = {Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL} reader_async_func = PythonFunctionDescriptor( - __name__, self.on_reader_message.__name__, self.__class__.__name__) + __name__, self.on_reader_message.__name__, self.__class__.__name__ + ) reader_sync_func = PythonFunctionDescriptor( - __name__, self.on_reader_message_sync.__name__, - self.__class__.__name__) + __name__, self.on_reader_message_sync.__name__, self.__class__.__name__ + ) transfer.ChannelCreationParametersBuilder.set_python_reader_function_descriptor( - reader_async_func, reader_sync_func) - self.writer = transfer.DataWriter([output_channel], - [pickle.loads(reader_actor)], conf) + reader_async_func, reader_sync_func + ) + self.writer = transfer.DataWriter( + [output_channel], [pickle.loads(reader_actor)], conf + ) self.output_channel_id = transfer.ChannelID(output_channel) def init_reader(self, input_channel, writer_actor): conf = {Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL} writer_async_func = PythonFunctionDescriptor( - __name__, self.on_writer_message.__name__, self.__class__.__name__) + __name__, self.on_writer_message.__name__, self.__class__.__name__ + ) writer_sync_func = PythonFunctionDescriptor( - __name__, self.on_writer_message_sync.__name__, - self.__class__.__name__) + __name__, self.on_writer_message_sync.__name__, self.__class__.__name__ + ) transfer.ChannelCreationParametersBuilder.set_python_writer_function_descriptor( - writer_async_func, writer_sync_func) - self.reader = transfer.DataReader([input_channel], - [pickle.loads(writer_actor)], conf) + writer_async_func, writer_sync_func + ) + self.reader = transfer.DataReader( + [input_channel], [pickle.loads(writer_actor)], conf + ) def start_write(self, msg_nums): - self.t = threading.Thread(target=self.run_writer, - args=[msg_nums], - daemon=True) + self.t = threading.Thread(target=self.run_writer, args=[msg_nums], daemon=True) self.t.start() def run_writer(self, msg_nums): @@ -57,9 +60,7 @@ def run_writer(self, msg_nums): print("WriterWorker done.") def start_read(self, msg_nums): - self.t = threading.Thread(target=self.run_reader, - args=[msg_nums], - daemon=True) + self.t = threading.Thread(target=self.run_reader, args=[msg_nums], daemon=True) self.t.start() def run_reader(self, msg_nums): diff --git a/streaming/python/raystreaming/tests/test_failover.py b/streaming/python/raystreaming/tests/test_failover.py index 3724e04..eede975 100644 --- a/streaming/python/raystreaming/tests/test_failover.py +++ b/streaming/python/raystreaming/tests/test_failover.py @@ -8,28 +8,30 @@ from raystreaming import StreamingContext -@pytest.mark.skip( - reason="Current log output in console, we can not capture from logs") +@pytest.mark.skip(reason="Current log output in console, we can not capture from logs") def test_word_count(): try: - ray.init(job_config=ray.job_config.JobConfig( - code_search_path=sys.path)) + ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) # time.sleep(10) # for gdb to attach - ctx = (StreamingContext.Builder().option( - "streaming.context-backend.type", - "local_file").option("streaming.context-backend.file-state.root", - "/tmp/ray/cp_files/").option( - "streaming.checkpoint.timeout.secs", - "3").build()) + ctx = ( + StreamingContext.Builder() + .option("streaming.context-backend.type", "local_file") + .option("streaming.context-backend.file-state.root", "/tmp/ray/cp_files/") + .option("streaming.checkpoint.timeout.secs", "3") + .build() + ) print("-----------submit job-------------") ctx.read_text_file(__file__).set_parallelism(1).flat_map( - lambda x: x.split()).map(lambda x: (x, 1)).key_by( - lambda x: x[0]).reduce(lambda old_value, new_value: (old_value[ - 0], old_value[1] + new_value[1])).filter( - lambda x: "ray" not in x).sink( - lambda x: print("####result", x)) + lambda x: x.split() + ).map(lambda x: (x, 1)).key_by(lambda x: x[0]).reduce( + lambda old_value, new_value: (old_value[0], old_value[1] + new_value[1]) + ).filter( + lambda x: "ray" not in x + ).sink( + lambda x: print("####result", x) + ) ctx.submit("word_count") print("-----------checking output-------------") @@ -49,9 +51,12 @@ def test_word_count(): retry_count = 300000 / 5 # wait for 5min while True: cur_cp_num = checkpoint_success_num() - print("-----------checking checkpoint" - ", cur_cp_num={}, old_cp_num={}-------------".format( - cur_cp_num, cp_ok_num)) + print( + "-----------checking checkpoint" + ", cur_cp_num={}, old_cp_num={}-------------".format( + cur_cp_num, cp_ok_num + ) + ) if cur_cp_num > cp_ok_num: print("--------------TEST OK!------------------") break @@ -59,7 +64,8 @@ def test_word_count(): retry_count -= 1 if retry_count <= 0: raise RuntimeError( - "Checkpoint keeps failing after fail-over, test failed!") + "Checkpoint keeps failing after fail-over, test failed!" + ) finally: ray.shutdown() @@ -74,7 +80,8 @@ def run_cmd(cmd: List): def grep_log(keyword: str) -> str: out = subprocess.check_output( - ["grep", "-r", keyword, "/tmp/ray/session_latest/logs"]) + ["grep", "-r", keyword, "/tmp/ray/session_latest/logs"] + ) return out.decode() diff --git a/streaming/python/raystreaming/tests/test_hybrid_stream.py b/streaming/python/raystreaming/tests/test_hybrid_stream.py index 13e2fcd..eff7eb8 100644 --- a/streaming/python/raystreaming/tests/test_hybrid_stream.py +++ b/streaming/python/raystreaming/tests/test_hybrid_stream.py @@ -23,19 +23,23 @@ def sink_func1(x): def test_hybrid_stream(): - subprocess.check_call([ - "bazel", - "build", - "@com_github_ray_streaming//java:all_streaming_tests_deploy.jar", - ]) + subprocess.check_call( + [ + "bazel", + "build", + "@com_github_ray_streaming//java:all_streaming_tests_deploy.jar", + ] + ) current_dir = os.path.abspath(os.path.dirname(__file__)) jar_path = os.path.join( - current_dir, "../../../bazel-bin/java/all_streaming_tests_deploy.jar") + current_dir, "../../../bazel-bin/java/all_streaming_tests_deploy.jar" + ) jar_path = os.path.abspath(jar_path) print("jar_path", jar_path) assert not ray.is_initialized() - ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path + - [jar_path])) + ray.init( + job_config=ray.job_config.JobConfig(code_search_path=sys.path + [jar_path]) + ) sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt" if os.path.exists(sink_file): @@ -49,9 +53,12 @@ def sink_func(x): ctx = StreamingContext.Builder().build() ctx.from_values("a", "b", "c").as_java_stream().map( - "io.ray.streaming.runtime.demo.HybridStreamTest$Mapper1").filter( - "io.ray.streaming.runtime.demo.HybridStreamTest$Filter1" - ).as_python_stream().sink(sink_func) + "io.ray.streaming.runtime.demo.HybridStreamTest$Mapper1" + ).filter( + "io.ray.streaming.runtime.demo.HybridStreamTest$Filter1" + ).as_python_stream().sink( + sink_func + ) ctx.submit("HybridStreamTest") def check_succeed(): diff --git a/streaming/python/raystreaming/tests/test_stream.py b/streaming/python/raystreaming/tests/test_stream.py index 9dad8c1..642f5b7 100644 --- a/streaming/python/raystreaming/tests/test_stream.py +++ b/streaming/python/raystreaming/tests/test_stream.py @@ -21,8 +21,9 @@ def test_data_stream(): def test_key_data_stream(): ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) ctx = StreamingContext.Builder().build() - key_stream = (ctx.from_values( - "a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0])) + key_stream = ( + ctx.from_values("a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0]) + ) java_stream = key_stream.as_java_stream() python_stream = java_stream.as_python_stream() assert key_stream.get_id() == java_stream.get_id() diff --git a/streaming/python/raystreaming/tests/test_union_stream.py b/streaming/python/raystreaming/tests/test_union_stream.py index 6d1cfe7..6cfdc74 100644 --- a/streaming/python/raystreaming/tests/test_union_stream.py +++ b/streaming/python/raystreaming/tests/test_union_stream.py @@ -7,8 +7,7 @@ def test_union_stream(): ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) - ctx = StreamingContext.Builder().option("streaming.metrics.reporters", - "").build() + ctx = StreamingContext.Builder().option("streaming.metrics.reporters", "").build() sink_file = "/tmp/test_union_stream.txt" if os.path.exists(sink_file): os.remove(sink_file) diff --git a/streaming/python/raystreaming/tests/test_word_count.py b/streaming/python/raystreaming/tests/test_word_count.py index 6eea859..c55eb3c 100644 --- a/streaming/python/raystreaming/tests/test_word_count.py +++ b/streaming/python/raystreaming/tests/test_word_count.py @@ -8,11 +8,15 @@ def test_word_count(): ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path)) ctx = StreamingContext.Builder().build() - ctx.read_text_file(__file__).set_parallelism(1).flat_map(lambda x: x.split( - )).map(lambda x: (x, 1)).key_by(lambda x: x[0]).reduce( - lambda old_value, new_value: - (old_value[0], old_value[1] + new_value[1])).filter( - lambda x: "ray" not in x).sink(lambda x: print("result", x)) + ctx.read_text_file(__file__).set_parallelism(1).flat_map(lambda x: x.split()).map( + lambda x: (x, 1) + ).key_by(lambda x: x[0]).reduce( + lambda old_value, new_value: (old_value[0], old_value[1] + new_value[1]) + ).filter( + lambda x: "ray" not in x + ).sink( + lambda x: print("result", x) + ) ctx.submit("word_count") import time @@ -33,11 +37,13 @@ def sink_func(x): print("sink_func", line) f.write(line) - ctx.from_values( - "a", "b", - "c").set_parallelism(1).flat_map(lambda x: [x, x]).map(lambda x: ( - x, 1)).key_by(lambda x: x[0]).reduce(lambda old_value, new_value: ( - old_value[0], old_value[1] + new_value[1])).sink(sink_func) + ctx.from_values("a", "b", "c").set_parallelism(1).flat_map(lambda x: [x, x]).map( + lambda x: (x, 1) + ).key_by(lambda x: x[0]).reduce( + lambda old_value, new_value: (old_value[0], old_value[1] + new_value[1]) + ).sink( + sink_func + ) ctx.submit("word_count") def check_succeed():