Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
ashione committed Apr 21, 2024
1 parent 199cde7 commit 8fb7724
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 77 deletions.
2 changes: 1 addition & 1 deletion streaming/buildtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ function test_streaming_python()
fi
#python3 -m pytest $script_dir/python/raystreaming/tests/simple --capture=no
bazel build java:streaming_java_pkg
python3 -m pytest "$script_dir"/python/raystreaming/tests/ > "$TMP_LOG_OUTPUT"/python-test/python-test.log 2>&1
python3 -m pytest "$script_dir"/python/raystreaming/tests/ # > "$TMP_LOG_OUTPUT"/python-test/python-test.log 2>&1
exit_code=$?
echo "Running python test exit code : ${exit_code}"
echo "[Disabled] Uploding output to remote file."
Expand Down
7 changes: 3 additions & 4 deletions streaming/python/raystreaming/tests/simple/test_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@


def test_get_simple_function_class():
simple_map_func_class = function._get_simple_function_class(
function.MapFunction)
simple_map_func_class = function._get_simple_function_class(function.MapFunction)
assert simple_map_func_class is function.SimpleMapFunction


class MapFunc(function.MapFunction):

def map(self, value):
return str(value)

Expand All @@ -18,6 +16,7 @@ def test_load_function():
# function_bytes, module_name, function_name/class_name,
# function_interface
descriptor_func_bytes = gateway_client.serialize(
[None, __name__, MapFunc.__name__, "MapFunction"])
[None, __name__, MapFunc.__name__, "MapFunction"]
)
func = function.load_function(descriptor_func_bytes)
assert type(func) is MapFunc
11 changes: 5 additions & 6 deletions streaming/python/raystreaming/tests/simple/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ def test_create_operator_with_func():


class MapFunc(function.MapFunction):

def map(self, value):
return str(value)


class EmptyOperator(operator.StreamOperator):

def __init__(self):
super().__init__(function.EmptyFunction())

Expand All @@ -28,12 +26,13 @@ def operator_type(self) -> OperatorType:
def test_load_operator():
# function_bytes, module_name, class_name,
descriptor_func_bytes = gateway_client.serialize(
[None, __name__, MapFunc.__name__, "MapFunction"])
descriptor_op_bytes = gateway_client.serialize(
[descriptor_func_bytes, "", ""])
[None, __name__, MapFunc.__name__, "MapFunction"]
)
descriptor_op_bytes = gateway_client.serialize([descriptor_func_bytes, "", ""])
map_operator = operator.load_operator(descriptor_op_bytes)
assert type(map_operator) is operator.MapOperator
descriptor_op_bytes = gateway_client.serialize(
[None, __name__, EmptyOperator.__name__])
[None, __name__, EmptyOperator.__name__]
)
test_operator = operator.load_operator(descriptor_op_bytes)
assert isinstance(test_operator, EmptyOperator)
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ def test_serialize():
key_record = KeyRecord("key", "value")
key_record.stream = "stream2"
assert record == serializer.deserialize(serializer.serialize(record))
assert key_record == serializer.deserialize(
serializer.serialize(key_record))
assert key_record == serializer.deserialize(serializer.serialize(key_record))
39 changes: 20 additions & 19 deletions streaming/python/raystreaming/tests/test_direct_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

@ray.remote
class Worker:

def __init__(self):
self.writer_client = _streaming.WriterClient()
self.reader_client = _streaming.ReaderClient()
Expand All @@ -23,32 +22,36 @@ def __init__(self):
def init_writer(self, output_channel, reader_actor):
conf = {Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL}
reader_async_func = PythonFunctionDescriptor(
__name__, self.on_reader_message.__name__, self.__class__.__name__)
__name__, self.on_reader_message.__name__, self.__class__.__name__
)
reader_sync_func = PythonFunctionDescriptor(
__name__, self.on_reader_message_sync.__name__,
self.__class__.__name__)
__name__, self.on_reader_message_sync.__name__, self.__class__.__name__
)
transfer.ChannelCreationParametersBuilder.set_python_reader_function_descriptor(
reader_async_func, reader_sync_func)
self.writer = transfer.DataWriter([output_channel],
[pickle.loads(reader_actor)], conf)
reader_async_func, reader_sync_func
)
self.writer = transfer.DataWriter(
[output_channel], [pickle.loads(reader_actor)], conf
)
self.output_channel_id = transfer.ChannelID(output_channel)

def init_reader(self, input_channel, writer_actor):
conf = {Config.CHANNEL_TYPE: Config.NATIVE_CHANNEL}
writer_async_func = PythonFunctionDescriptor(
__name__, self.on_writer_message.__name__, self.__class__.__name__)
__name__, self.on_writer_message.__name__, self.__class__.__name__
)
writer_sync_func = PythonFunctionDescriptor(
__name__, self.on_writer_message_sync.__name__,
self.__class__.__name__)
__name__, self.on_writer_message_sync.__name__, self.__class__.__name__
)
transfer.ChannelCreationParametersBuilder.set_python_writer_function_descriptor(
writer_async_func, writer_sync_func)
self.reader = transfer.DataReader([input_channel],
[pickle.loads(writer_actor)], conf)
writer_async_func, writer_sync_func
)
self.reader = transfer.DataReader(
[input_channel], [pickle.loads(writer_actor)], conf
)

def start_write(self, msg_nums):
self.t = threading.Thread(target=self.run_writer,
args=[msg_nums],
daemon=True)
self.t = threading.Thread(target=self.run_writer, args=[msg_nums], daemon=True)
self.t.start()

def run_writer(self, msg_nums):
Expand All @@ -57,9 +60,7 @@ def run_writer(self, msg_nums):
print("WriterWorker done.")

def start_read(self, msg_nums):
self.t = threading.Thread(target=self.run_reader,
args=[msg_nums],
daemon=True)
self.t = threading.Thread(target=self.run_reader, args=[msg_nums], daemon=True)
self.t.start()

def run_reader(self, msg_nums):
Expand Down
47 changes: 27 additions & 20 deletions streaming/python/raystreaming/tests/test_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,30 @@
from raystreaming import StreamingContext


@pytest.mark.skip(
reason="Current log output in console, we can not capture from logs")
@pytest.mark.skip(reason="Current log output in console, we can not capture from logs")
def test_word_count():
try:
ray.init(job_config=ray.job_config.JobConfig(
code_search_path=sys.path))
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
# time.sleep(10) # for gdb to attach
ctx = (StreamingContext.Builder().option(
"streaming.context-backend.type",
"local_file").option("streaming.context-backend.file-state.root",
"/tmp/ray/cp_files/").option(
"streaming.checkpoint.timeout.secs",
"3").build())
ctx = (
StreamingContext.Builder()
.option("streaming.context-backend.type", "local_file")
.option("streaming.context-backend.file-state.root", "/tmp/ray/cp_files/")
.option("streaming.checkpoint.timeout.secs", "3")
.build()
)

print("-----------submit job-------------")

ctx.read_text_file(__file__).set_parallelism(1).flat_map(
lambda x: x.split()).map(lambda x: (x, 1)).key_by(
lambda x: x[0]).reduce(lambda old_value, new_value: (old_value[
0], old_value[1] + new_value[1])).filter(
lambda x: "ray" not in x).sink(
lambda x: print("####result", x))
lambda x: x.split()
).map(lambda x: (x, 1)).key_by(lambda x: x[0]).reduce(
lambda old_value, new_value: (old_value[0], old_value[1] + new_value[1])
).filter(
lambda x: "ray" not in x
).sink(
lambda x: print("####result", x)
)
ctx.submit("word_count")

print("-----------checking output-------------")
Expand All @@ -49,17 +51,21 @@ def test_word_count():
retry_count = 300000 / 5 # wait for 5min
while True:
cur_cp_num = checkpoint_success_num()
print("-----------checking checkpoint"
", cur_cp_num={}, old_cp_num={}-------------".format(
cur_cp_num, cp_ok_num))
print(
"-----------checking checkpoint"
", cur_cp_num={}, old_cp_num={}-------------".format(
cur_cp_num, cp_ok_num
)
)
if cur_cp_num > cp_ok_num:
print("--------------TEST OK!------------------")
break
time.sleep(5)
retry_count -= 1
if retry_count <= 0:
raise RuntimeError(
"Checkpoint keeps failing after fail-over, test failed!")
"Checkpoint keeps failing after fail-over, test failed!"
)
finally:
ray.shutdown()

Expand All @@ -74,7 +80,8 @@ def run_cmd(cmd: List):

def grep_log(keyword: str) -> str:
out = subprocess.check_output(
["grep", "-r", keyword, "/tmp/ray/session_latest/logs"])
["grep", "-r", keyword, "/tmp/ray/session_latest/logs"]
)
return out.decode()


Expand Down
29 changes: 18 additions & 11 deletions streaming/python/raystreaming/tests/test_hybrid_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@ def sink_func1(x):


def test_hybrid_stream():
subprocess.check_call([
"bazel",
"build",
"@com_github_ray_streaming//java:all_streaming_tests_deploy.jar",
])
subprocess.check_call(
[
"bazel",
"build",
"@com_github_ray_streaming//java:all_streaming_tests_deploy.jar",
]
)
current_dir = os.path.abspath(os.path.dirname(__file__))
jar_path = os.path.join(
current_dir, "../../../bazel-bin/java/all_streaming_tests_deploy.jar")
current_dir, "../../../bazel-bin/java/all_streaming_tests_deploy.jar"
)
jar_path = os.path.abspath(jar_path)
print("jar_path", jar_path)
assert not ray.is_initialized()
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path +
[jar_path]))
ray.init(
job_config=ray.job_config.JobConfig(code_search_path=sys.path + [jar_path])
)

sink_file = "/tmp/ray_streaming_test_hybrid_stream.txt"
if os.path.exists(sink_file):
Expand All @@ -49,9 +53,12 @@ def sink_func(x):

ctx = StreamingContext.Builder().build()
ctx.from_values("a", "b", "c").as_java_stream().map(
"io.ray.streaming.runtime.demo.HybridStreamTest$Mapper1").filter(
"io.ray.streaming.runtime.demo.HybridStreamTest$Filter1"
).as_python_stream().sink(sink_func)
"io.ray.streaming.runtime.demo.HybridStreamTest$Mapper1"
).filter(
"io.ray.streaming.runtime.demo.HybridStreamTest$Filter1"
).as_python_stream().sink(
sink_func
)
ctx.submit("HybridStreamTest")

def check_succeed():
Expand Down
5 changes: 3 additions & 2 deletions streaming/python/raystreaming/tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ def test_data_stream():
def test_key_data_stream():
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder().build()
key_stream = (ctx.from_values(
"a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0]))
key_stream = (
ctx.from_values("a", "b", "c").map(lambda x: (x, 1)).key_by(lambda x: x[0])
)
java_stream = key_stream.as_java_stream()
python_stream = java_stream.as_python_stream()
assert key_stream.get_id() == java_stream.get_id()
Expand Down
3 changes: 1 addition & 2 deletions streaming/python/raystreaming/tests/test_union_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

def test_union_stream():
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder().option("streaming.metrics.reporters",
"").build()
ctx = StreamingContext.Builder().option("streaming.metrics.reporters", "").build()
sink_file = "/tmp/test_union_stream.txt"
if os.path.exists(sink_file):
os.remove(sink_file)
Expand Down
26 changes: 16 additions & 10 deletions streaming/python/raystreaming/tests/test_word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
def test_word_count():
ray.init(job_config=ray.job_config.JobConfig(code_search_path=sys.path))
ctx = StreamingContext.Builder().build()
ctx.read_text_file(__file__).set_parallelism(1).flat_map(lambda x: x.split(
)).map(lambda x: (x, 1)).key_by(lambda x: x[0]).reduce(
lambda old_value, new_value:
(old_value[0], old_value[1] + new_value[1])).filter(
lambda x: "ray" not in x).sink(lambda x: print("result", x))
ctx.read_text_file(__file__).set_parallelism(1).flat_map(lambda x: x.split()).map(
lambda x: (x, 1)
).key_by(lambda x: x[0]).reduce(
lambda old_value, new_value: (old_value[0], old_value[1] + new_value[1])
).filter(
lambda x: "ray" not in x
).sink(
lambda x: print("result", x)
)
ctx.submit("word_count")
import time

Expand All @@ -33,11 +37,13 @@ def sink_func(x):
print("sink_func", line)
f.write(line)

ctx.from_values(
"a", "b",
"c").set_parallelism(1).flat_map(lambda x: [x, x]).map(lambda x: (
x, 1)).key_by(lambda x: x[0]).reduce(lambda old_value, new_value: (
old_value[0], old_value[1] + new_value[1])).sink(sink_func)
ctx.from_values("a", "b", "c").set_parallelism(1).flat_map(lambda x: [x, x]).map(
lambda x: (x, 1)
).key_by(lambda x: x[0]).reduce(
lambda old_value, new_value: (old_value[0], old_value[1] + new_value[1])
).sink(
sink_func
)
ctx.submit("word_count")

def check_succeed():
Expand Down

0 comments on commit 8fb7724

Please sign in to comment.