Skip to content

Commit

Permalink
Ext_Proc : Adding integration test with MxN streaming enabled in both…
Browse files Browse the repository at this point in the history
… directions (envoyproxy#37771)

Ext_Proc : Adding integration test with MxN streaming enabled in both directions

fixes issue: envoyproxy#37770

---------

Signed-off-by: Yanjun Xiang <[email protected]>
  • Loading branch information
yanjunxiang-google authored Dec 23, 2024
1 parent 712b736 commit 00281d1
Showing 1 changed file with 114 additions and 44 deletions.
158 changes: 114 additions & 44 deletions test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -709,13 +709,20 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
}

IntegrationStreamDecoderPtr initAndSendDataDuplexStreamedMode(absl::string_view body_sent,
bool end_of_stream) {
bool end_of_stream,
bool both_direction = false) {
config_helper_.setBufferLimits(1024, 1024);
auto* processing_mode = proto_config_.mutable_processing_mode();
processing_mode->set_request_header_mode(ProcessingMode::SEND);
processing_mode->set_request_body_mode(ProcessingMode::FULL_DUPLEX_STREAMED);
processing_mode->set_request_trailer_mode(ProcessingMode::SEND);
processing_mode->set_response_header_mode(ProcessingMode::SKIP);
if (!both_direction) {
processing_mode->set_response_header_mode(ProcessingMode::SKIP);
} else {
processing_mode->set_response_header_mode(ProcessingMode::SEND);
processing_mode->set_response_body_mode(ProcessingMode::FULL_DUPLEX_STREAMED);
processing_mode->set_response_trailer_mode(ProcessingMode::SEND);
}

initializeConfig();
HttpIntegrationTest::initialize();
Expand All @@ -731,24 +738,86 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
return response;
}

void serverReceiveHeaderDuplexStreamed(ProcessingRequest& header_request) {
EXPECT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, processor_connection_));
EXPECT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_));
EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, header_request));
EXPECT_TRUE(header_request.has_request_headers());
void serverReceiveHeaderDuplexStreamed(ProcessingRequest& header, bool first_message = true,
bool response = false) {
if (first_message) {
EXPECT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, processor_connection_));
EXPECT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_));
}
EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, header));
if (response) {
EXPECT_TRUE(header.has_response_headers());
} else {
EXPECT_TRUE(header.has_request_headers());
}
}

void serverSendHeaderRespDuplexStreamed() {
processor_stream_->startGrpcStream();
uint32_t serverReceiveBodyDuplexStreamed(absl::string_view body_sent, bool response = false,
bool compare_body = true) {
std::string body_received;
bool end_stream = false;
uint32_t total_req_body_msg = 0;
while (!end_stream) {
ProcessingRequest body_request;
EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, body_request));
if (response) {
EXPECT_TRUE(body_request.has_response_body());
body_received = absl::StrCat(body_received, body_request.response_body().body());
end_stream = body_request.response_body().end_of_stream();
} else {
EXPECT_TRUE(body_request.has_request_body());
body_received = absl::StrCat(body_received, body_request.request_body().body());
end_stream = body_request.request_body().end_of_stream();
}
total_req_body_msg++;
}
EXPECT_TRUE(end_stream);
if (compare_body) {
EXPECT_EQ(body_received, body_sent);
}
return total_req_body_msg;
}

void serverSendHeaderRespDuplexStreamed(bool first_message = true, bool response = false) {
if (first_message) {
processor_stream_->startGrpcStream();
}
ProcessingResponse response_header;
auto* header_resp = response_header.mutable_request_headers();
HeadersResponse* header_resp;
if (response) {
header_resp = response_header.mutable_response_headers();
} else {
header_resp = response_header.mutable_request_headers();
}
auto* header_mutation = header_resp->mutable_response()->mutable_header_mutation();
auto* header = header_mutation->add_set_headers()->mutable_header();
header->set_key("x-new-header");
header->set_raw_value("new");
processor_stream_->sendGrpcMessage(response_header);
}

void serverSendBodyRespDuplexStreamed(uint32_t total_resp_body_msg, bool end_of_stream = true,
bool response = false) {
for (uint32_t i = 0; i < total_resp_body_msg; i++) {
ProcessingResponse response_body;
BodyResponse* body_resp;
if (response) {
body_resp = response_body.mutable_response_body();
} else {
body_resp = response_body.mutable_request_body();
}

auto* body_mut = body_resp->mutable_response()->mutable_body_mutation();
auto* streamed_response = body_mut->mutable_streamed_response();
streamed_response->set_body("r");
if (end_of_stream) {
const bool end_of_stream = (i == total_resp_body_msg - 1) ? true : false;
streamed_response->set_end_of_stream(end_of_stream);
}
processor_stream_->sendGrpcMessage(response_body);
}
}

void serverSendTrailerRespDuplexStreamed() {
ProcessingResponse response_trailer;
auto* trailer_resp = response_trailer.mutable_request_trailers()->mutable_header_mutation();
Expand Down Expand Up @@ -4936,37 +5005,15 @@ TEST_P(ExtProcIntegrationTest, ServerWaitForBodyBeforeSendsHeaderRespDuplexStrea
// The ext_proc server receives the headers.
ProcessingRequest header_request;
serverReceiveHeaderDuplexStreamed(header_request);

std::string body_received;
bool end_stream = false;
uint32_t total_req_body_msg = 0;
while (!end_stream) {
ProcessingRequest body_request;
EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, body_request));
EXPECT_TRUE(body_request.has_request_body());
body_received = absl::StrCat(body_received, body_request.request_body().body());
end_stream = body_request.request_body().end_of_stream();
total_req_body_msg++;
}
EXPECT_TRUE(end_stream);
EXPECT_EQ(body_received, body_sent);
// The ext_proc server receives the body.
uint32_t total_req_body_msg = serverReceiveBodyDuplexStreamed(body_sent);

// The ext_proc server sends back the header response.
serverSendHeaderRespDuplexStreamed();

// The ext_proc server sends back the body response.
uint32_t total_resp_body_msg = 2 * total_req_body_msg;
const std::string body_upstream(total_resp_body_msg, 'r');
for (uint32_t i = 0; i < total_resp_body_msg; i++) {
ProcessingResponse response_body;
auto* body_resp = response_body.mutable_request_body();
auto* body_mut = body_resp->mutable_response()->mutable_body_mutation();
auto* streamed_response = body_mut->mutable_streamed_response();
streamed_response->set_body("r");
const bool end_of_stream = (i == total_resp_body_msg - 1) ? true : false;
streamed_response->set_end_of_stream(end_of_stream);
processor_stream_->sendGrpcMessage(response_body);
}
serverSendBodyRespDuplexStreamed(total_resp_body_msg);

handleUpstreamRequest();
EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new"));
Expand Down Expand Up @@ -5011,15 +5058,7 @@ TEST_P(ExtProcIntegrationTest,
// The ext_proc server sends back the body response.
uint32_t total_resp_body_msg = total_req_body_msg / 2;
const std::string body_upstream(total_resp_body_msg, 'r');
for (uint32_t i = 0; i < total_resp_body_msg; i++) {
ProcessingResponse response_body;
auto* streamed_response = response_body.mutable_request_body()
->mutable_response()
->mutable_body_mutation()
->mutable_streamed_response();
streamed_response->set_body("r");
processor_stream_->sendGrpcMessage(response_body);
}
serverSendBodyRespDuplexStreamed(total_resp_body_msg, false);

// The ext_proc server sends back the trailer response.
serverSendTrailerRespDuplexStreamed();
Expand Down Expand Up @@ -5110,6 +5149,37 @@ TEST_P(ExtProcIntegrationTest, ServerSendBodyRespWithouRecvEntireBodyDuplexStrea
verifyDownstreamResponse(*response, 200);
}

TEST_P(ExtProcIntegrationTest, DuplexStreamedInBothDirection) {
const std::string body_sent(8 * 1024, 's');
IntegrationStreamDecoderPtr response = initAndSendDataDuplexStreamedMode(body_sent, true, true);

// The ext_proc server receives the headers/body.
ProcessingRequest header_request;
serverReceiveHeaderDuplexStreamed(header_request);
uint32_t total_req_body_msg = serverReceiveBodyDuplexStreamed(body_sent);

// The ext_proc server sends back the response.
serverSendHeaderRespDuplexStreamed();
uint32_t total_resp_body_msg = 2 * total_req_body_msg;
const std::string body_upstream(total_resp_body_msg, 'r');
serverSendBodyRespDuplexStreamed(total_resp_body_msg);

handleUpstreamRequest();
EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new"));
EXPECT_EQ(upstream_request_->body().toString(), body_upstream);

// The ext_proc server receives the responses from backend server.
ProcessingRequest header_response;
serverReceiveHeaderDuplexStreamed(header_response, false, true);
uint32_t total_rsp_body_msg = serverReceiveBodyDuplexStreamed("", true, false);

// The ext_proc server sends back the response.
serverSendHeaderRespDuplexStreamed(false, true);
serverSendBodyRespDuplexStreamed(total_rsp_body_msg * 3, true, true);

verifyDownstreamResponse(*response, 200);
}

TEST_P(ExtProcIntegrationTest, ModeOverrideAllowed) {
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
proto_config_.set_allow_mode_override(true);
Expand Down

0 comments on commit 00281d1

Please sign in to comment.