Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Obs window size #5

Open
wants to merge 11 commits into
base: stateful-actors-implementation
Choose a base branch
from
2 changes: 1 addition & 1 deletion ensemble_profiler/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#: nursery actor name
NURSERY_ACTOR = "HEALTH_SRTML_PATIENT_NURSERY"

#: Prediction Interval for
#: Prediction Interval for 30 sec * 250 Hz
PREDITICATE_INTERVAL = 7500

#: Profiling Ensemble
Expand Down
15 changes: 9 additions & 6 deletions ensemble_profiler/latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import subprocess
from ensemble_profiler.constants import (ROUTE_ADDRESS, PROFILE_ENSEMBLE,
PREDITICATE_INTERVAL)

from ensemble_profiler.tq_simulator import find_tq
import time
from threading import Event
Expand All @@ -21,10 +22,10 @@
package_directory = os.path.dirname(os.path.abspath(__file__))


def _calculate_throughput_ensemble(pipeline):
def _calculate_throughput_ensemble(pipeline, obs_w_30sec):
num_queries = 100
start_time = time.time()
futures = [pipeline.remote(data=torch.zeros(1, 1, PREDITICATE_INTERVAL))
futures = [pipeline.remote(data=torch.zeros(obs_w_30sec, 1, PREDITICATE_INTERVAL))
for _ in range(num_queries)]
result = ray.get(futures)
end_time = time.time()
Expand All @@ -49,7 +50,8 @@ def _calculate_latency(file_name, p=95):

def profile_ensemble(model_list, file_path,
constraint={"gpu": 1, "npatient": 1}, http_host="0.0.0.0",
fire_clients=True, with_data_collector=False):
fire_clients=True, with_data_collector=False,
obs_w_30sec=1):
if not ray.is_initialized():
# read constraint
num_patients = int(constraint["npatient"])
Expand Down Expand Up @@ -93,7 +95,7 @@ def profile_ensemble(model_list, file_path,

if not with_data_collector:
# calculating the throughput
mu_qps = _calculate_throughput_ensemble(pipeline)
mu_qps = _calculate_throughput_ensemble(pipeline, obs_w_30sec)
print("Throughput of Ensemble is : {} QPS".format(mu_qps))
lambda_qps = _heuristic_lambda_calculation(mu_qps)
waiting_time_ms = 1000.0/lambda_qps
Expand All @@ -116,7 +118,7 @@ def profile_ensemble(model_list, file_path,
for patient_name in actor_handles.keys():
final_cmd = cmd + [patient_name]
if not with_data_collector:
final_cmd += [str(waiting_time_ms)]
final_cmd += [str(waiting_time_ms), str(obs_w_30sec)]
ls_output = subprocess.Popen(final_cmd)
procs.append(ls_output)
for p in procs:
Expand Down Expand Up @@ -146,7 +148,8 @@ def profile_ensemble(model_list, file_path,
"serve_ip": IPv4addr,
"serve_port": serve_port,
"go_client_name": "profile_ensemble",
"waiting_time_ms": waiting_time_ms}
"waiting_time_ms": waiting_time_ms,
"obs_w_30sec":obs_w_30sec}
fire_remote_clients(url, req_params)
print("finish firing remote clients")
serve.shutdown()
Expand Down
3 changes: 2 additions & 1 deletion ensemble_profiler/profile_ensemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func main() {
if err != nil {
time_ms = 20.0
}
obs_w_30sec := os.Args[3]
fmt.Println(patient_name)
fmt.Print(time_ms)
ch := make(chan string)
Expand All @@ -32,7 +33,7 @@ func main() {
time.Sleep(time.Duration(time_ms) * time.Millisecond)
// This how actual client will send the result
go MakeRequest("http://127.0.0.1:8000/profileEnsemble?patient_name="+
patient_name+"&value=0.0&vtype=ECG", ch)
patient_name+"&value=0.0&obs_w_30sec="+ obs_w_30sec + "&vtype=ECG", ch)
// This is how profiling result is send
//go MakeRequest("http://127.0.0.1:8000/RayServeProfile/hospital", ch)
}
Expand Down
7 changes: 6 additions & 1 deletion ensemble_profiler/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ async def handle_profile_requests(self, scope, receive, send):
raise ValueError("Multiple Patients specified."
"Specify only one.")
patient_name = patient_name[0]
prediction_tensor = torch.zeros((1, 1, PREDITICATE_INTERVAL))

obs_w_30sec = query_kwargs.pop("obs_w_30sec", None)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why obs_window has to be per query basis? Can it part of http server instantiation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that when profile client make request it can change the obs_window.
If we put it as part of http server instantiation then the client will always have to send the same obs_window.
For example, when the patients are on stable condition client will only look at interval of obs_window 30s, however when the patients are critical condition, the client can send obs_window 30s, 2min, 5 min, etc

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the client has to be consistent in sending the information. Any reasons for the assumption you are making?

if obs_w_30sec is None:
raise ValueError("Specify obs_w_30sec in query")
obs_w_30sec = int(obs_w_30sec[0])
prediction_tensor = torch.zeros((obs_w_30sec, 1, PREDITICATE_INTERVAL))
Copy link
Owner

@alindkhare alindkhare Feb 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need one more profiling :

@ray.remote
def echo_tensor(a : torch.Tensor): 
	return a
>>> %timeit ray.get(echo_tensor.remote(torch.zeros((obs_w_30sec, 1, PREDITICATE_INTERVAL))))


request_sent_time = time.time()
result = await self.ensemble_pipeline.remote(data=prediction_tensor)
Expand Down