Skip to content

Commit

Permalink
Merge branch 'snappy-snap' of ssh://github.com/ctiller/grpc into snap…
Browse files Browse the repository at this point in the history
…py-snap
  • Loading branch information
ctiller committed Dec 3, 2024
2 parents 694f371 + e04d25b commit c396ec1
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 79 deletions.
140 changes: 75 additions & 65 deletions test/core/xds/xds_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,27 @@ class XdsClientTest : public ::testing::Test {
absl::optional<ResourceAndReadDelayHandle> WaitForNextResourceAndHandle(
SourceLocation location = SourceLocation()) {
while (true) {
event_engine_->Tick();
MutexLock lock(&mu_);
if (queue_.empty()) {
{
MutexLock lock(&mu_);
if (!queue_.empty()) {
Event& event = queue_.front();
if (!absl::holds_alternative<ResourceAndReadDelayHandle>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<absl::Status>(event)
? "error"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
return absl::nullopt;
}
auto resource_and_handle =
std::move(absl::get<ResourceAndReadDelayHandle>(event));
queue_.pop_front();
return resource_and_handle;
}
if (event_engine_->IsIdle()) return absl::nullopt;
continue;
}
Event& event = queue_.front();
if (!absl::holds_alternative<ResourceAndReadDelayHandle>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<absl::Status>(event)
? "error"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
return absl::nullopt;
}
auto foo = std::move(absl::get<ResourceAndReadDelayHandle>(event));
queue_.pop_front();
return foo;
event_engine_->Tick();
}
}

Expand All @@ -322,47 +324,51 @@ class XdsClientTest : public ::testing::Test {
absl::optional<absl::Status> WaitForNextError(
SourceLocation location = SourceLocation()) {
while (true) {
event_engine_->Tick();
MutexLock lock(&mu_);
if (queue_.empty()) {
{
MutexLock lock(&mu_);
if (!queue_.empty()) {
Event& event = queue_.front();
if (!absl::holds_alternative<absl::Status>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<ResourceAndReadDelayHandle>(
event)
? "resource"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
return absl::nullopt;
}
absl::Status error = std::move(absl::get<absl::Status>(event));
queue_.pop_front();
return std::move(error);
}
if (event_engine_->IsIdle()) return absl::nullopt;
continue;
}
Event& event = queue_.front();
if (!absl::holds_alternative<absl::Status>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<ResourceAndReadDelayHandle>(event)
? "resource"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
return absl::nullopt;
}
absl::Status error = std::move(absl::get<absl::Status>(event));
queue_.pop_front();
return std::move(error);
event_engine_->Tick();
}
}

bool WaitForDoesNotExist(SourceLocation location = SourceLocation()) {
while (true) {
event_engine_->Tick();
MutexLock lock(&mu_);
if (queue_.empty()) {
{
MutexLock lock(&mu_);
if (!queue_.empty()) {
Event& event = queue_.front();
if (!absl::holds_alternative<DoesNotExist>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<absl::Status>(event)
? "error"
: "resource")
<< " at " << location.file() << ":" << location.line();
return false;
}
queue_.pop_front();
return true;
}
if (event_engine_->IsIdle()) return false;
continue;
}
Event& event = queue_.front();
if (!absl::holds_alternative<DoesNotExist>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<absl::Status>(event) ? "error"
: "resource")
<< " at " << location.file() << ":" << location.line();
return false;
}
queue_.pop_front();
return true;
event_engine_->Tick();
}
}

Expand Down Expand Up @@ -650,23 +656,27 @@ class XdsClientTest : public ::testing::Test {
::testing::Matcher<ServerFailureMap> server_failures_matcher,
SourceLocation location = SourceLocation()) {
while (true) {
event_engine_->Tick();
MutexLock lock(&mu_);
if (::testing::Matches(resource_updates_valid_matcher)(
resource_updates_valid_) &&
::testing::Matches(resource_updates_invalid_matcher)(
resource_updates_invalid_) &&
::testing::Matches(server_failures_matcher)(server_failures_)) {
return true;
{
MutexLock lock(&mu_);
if (::testing::Matches(resource_updates_valid_matcher)(
resource_updates_valid_) &&
::testing::Matches(resource_updates_invalid_matcher)(
resource_updates_invalid_) &&
::testing::Matches(server_failures_matcher)(server_failures_)) {
return true;
}
if (event_engine_->IsIdle()) {
EXPECT_THAT(resource_updates_valid_, resource_updates_valid_matcher)
<< location.file() << ":" << location.line();
EXPECT_THAT(resource_updates_invalid_,
resource_updates_invalid_matcher)
<< location.file() << ":" << location.line();
EXPECT_THAT(server_failures_, server_failures_matcher)
<< location.file() << ":" << location.line();
return false;
}
}
if (!event_engine_->IsIdle()) continue;
EXPECT_THAT(resource_updates_valid_, resource_updates_valid_matcher)
<< location.file() << ":" << location.line();
EXPECT_THAT(resource_updates_invalid_, resource_updates_invalid_matcher)
<< location.file() << ":" << location.line();
EXPECT_THAT(server_failures_, server_failures_matcher)
<< location.file() << ":" << location.line();
return false;
event_engine_->Tick();
}
}

Expand Down
33 changes: 19 additions & 14 deletions test/core/xds/xds_transport_fake.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,16 @@ bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() {
absl::optional<std::string>
FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient() {
while (true) {
event_engine_->Tick();
MutexLock lock(&mu_);
if (from_client_messages_.empty()) {
{
MutexLock lock(&mu_);
if (!from_client_messages_.empty()) {
std::string payload = std::move(from_client_messages_.front());
from_client_messages_.pop_front();
return payload;
}
if (event_engine_->IsIdle()) return absl::nullopt;
continue;
}
std::string payload = std::move(from_client_messages_.front());
from_client_messages_.pop_front();
return payload;
event_engine_->Tick();
}
}

Expand Down Expand Up @@ -183,10 +184,12 @@ void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
bool FakeXdsTransportFactory::FakeStreamingCall::WaitForReadsStarted(
size_t expected) {
while (true) {
{
MutexLock lock(&mu_);
if (reads_started_ == expected) return true;
if (event_engine_->IsIdle()) return false;
}
event_engine_->Tick();
MutexLock lock(&mu_);
if (reads_started_ == expected) return true;
if (event_engine_->IsIdle()) return false;
}
}

Expand Down Expand Up @@ -236,11 +239,13 @@ void FakeXdsTransportFactory::FakeXdsTransport::Orphaned() {
RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
FakeXdsTransportFactory::FakeXdsTransport::WaitForStream(const char* method) {
while (true) {
{
MutexLock lock(&mu_);
auto it = active_calls_.find(method);
if (it != active_calls_.end() && it->second != nullptr) return it->second;
if (event_engine_->IsIdle()) return nullptr;
}
event_engine_->Tick();
MutexLock lock(&mu_);
auto it = active_calls_.find(method);
if (it != active_calls_.end() && it->second != nullptr) return it->second;
if (event_engine_->IsIdle()) return nullptr;
}
}

Expand Down

0 comments on commit c396ec1

Please sign in to comment.