Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put cleanup nodes on separate chunked list. #16552

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 117 additions & 76 deletions src/google/protobuf/arena.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,46 @@ ArenaBlock* SentryArenaBlock() {
}
#endif

SizedPtr AllocateMemory(const AllocationPolicy* policy_ptr, size_t last_size,
size_t min_bytes) {
AllocationPolicy policy; // default policy
if (policy_ptr) policy = *policy_ptr;
size_t AllocationSize(size_t last_size, size_t min_size, size_t max_size) {
size_t size;
if (last_size != 0) {
// Double the current block size, up to a limit.
auto max_size = policy.max_block_size;
size = std::min(2 * last_size, max_size);
} else {
size = policy.start_block_size;
size = min_size;
}
// Verify that min_bytes + kBlockHeaderSize won't overflow.
ABSL_CHECK_LE(min_bytes, std::numeric_limits<size_t>::max() -
SerialArena::kBlockHeaderSize);
size = std::max(size, SerialArena::kBlockHeaderSize + min_bytes);
return size;
}

SizedPtr AllocateMemory(const AllocationPolicy& policy, size_t size) {
if (policy.block_alloc == nullptr) {
return AllocateAtLeast(size);
}
return {policy.block_alloc(size), size};
}

SizedPtr AllocateBlock(const AllocationPolicy* policy_ptr, size_t last_size,
size_t min_bytes) {
AllocationPolicy policy; // default policy
if (policy_ptr) policy = *policy_ptr;
size_t size = AllocationSize(last_size, min_bytes, policy.max_block_size);
// Verify that min_bytes + kBlockHeaderSize won't overflow.
ABSL_CHECK_LE(min_bytes, std::numeric_limits<size_t>::max() -
SerialArena::kBlockHeaderSize);
size = std::max(size, SerialArena::kBlockHeaderSize + min_bytes);

return AllocateMemory(policy, size);
}

SizedPtr AllocateCleanupChunk(const AllocationPolicy* policy_ptr,
size_t last_size) {
AllocationPolicy policy; // default policy
if (policy_ptr) policy = *policy_ptr;
const size_t size =
AllocationSize(last_size, /*min_size=*/64, /*max_size=*/4 << 10);
return AllocateMemory(policy, size);
}

class GetDeallocator {
public:
GetDeallocator(const AllocationPolicy* policy, size_t* space_allocated)
Expand All @@ -107,14 +124,90 @@ class GetDeallocator {

} // namespace

namespace cleanup {
struct ChunkList::Chunk {
CleanupNode* First() { return reinterpret_cast<CleanupNode*>(this + 1); }
CleanupNode* Last() { return First() + Capacity() - 1; }
static size_t Capacity(size_t size) {
return (size - sizeof(Chunk)) / sizeof(CleanupNode);
}
size_t Capacity() const { return Capacity(size); }

Chunk* next;
size_t size;
// Cleanup nodes follow.
};

void ChunkList::AddFallback(void* elem, void (*destructor)(void*),
SerialArena& arena) {
ABSL_DCHECK_EQ(next_, limit_);
SizedPtr mem = AllocateCleanupChunk(arena.parent_.AllocPolicy(),
head_ == nullptr ? 0 : head_->size);
arena.AddSpaceAllocated(mem.n);
head_ = new (mem.p) Chunk{head_, mem.n};
next_ = head_->First();
limit_ = next_ + Chunk::Capacity(mem.n);
AddFromExisting(elem, destructor);
}

void ChunkList::Cleanup(const SerialArena& arena, size_t* space_allocated) {
Chunk* c = head_;
if (c == nullptr) return;
GetDeallocator deallocator(arena.parent_.AllocPolicy(), space_allocated);

// Iterate backwards in order to destroy in the right order.
CleanupNode* it = next_ - 1;
head_ = nullptr;
next_ = limit_ = nullptr;
while (true) {
CleanupNode* first = c->First();
// A prefetch distance of 8 here was chosen arbitrarily.
CleanupNode* prefetch = it;
int prefetch_dist = 8;
for (; prefetch >= first && --prefetch_dist; --prefetch) {
prefetch->Prefetch();
}
for (; prefetch >= first; --it, --prefetch) {
it->Destroy();
prefetch->Prefetch();
}
absl::PrefetchToLocalCacheNta(c->next);
for (; it >= first; --it) {
it->Destroy();
}
Chunk* next = c->next;
deallocator({c, c->size});
if (next == nullptr) return;
c = next;
it = c->Last();
};
}

std::vector<void*> ChunkList::Peek() {
std::vector<void*> ret;
Chunk* c = head_;
if (c == nullptr) return ret;
// Iterate backwards to match destruction order.
CleanupNode* it = next_ - 1;
while (true) {
CleanupNode* first = c->First();
for (; it >= first; --it) {
ret.push_back(it->elem);
}
c = c->next;
if (c == nullptr) return ret;
it = c->Last();
};
}
} // namespace cleanup

// It is guaranteed that this is constructed in `b`. IOW, this is not the first
// arena and `b` cannot be sentry.
SerialArena::SerialArena(ArenaBlock* b, ThreadSafeArena& parent)
: ptr_{b->Pointer(kBlockHeaderSize + ThreadSafeArena::kSerialArenaSize)},
limit_{b->Limit()},
prefetch_ptr_(
b->Pointer(kBlockHeaderSize + ThreadSafeArena::kSerialArenaSize)),
prefetch_limit_(b->Limit()),
head_{b},
space_allocated_{b->size},
parent_{parent} {
Expand All @@ -135,22 +228,7 @@ SerialArena::SerialArena(FirstSerialArena, ArenaBlock* b,
}

std::vector<void*> SerialArena::PeekCleanupListForTesting() {
std::vector<void*> res;

ArenaBlock* b = head();
if (b->IsSentry()) return res;

const auto peek_list = [&](char* pos, char* end) {
for (; pos != end; pos += cleanup::Size()) {
cleanup::PeekNode(pos, res);
}
};

peek_list(limit_, b->Limit());
for (b = b->next; b; b = b->next) {
peek_list(reinterpret_cast<char*>(b->cleanup_nodes), b->Limit());
}
return res;
return cleanup_list_.Peek();
}

std::vector<void*> ThreadSafeArena::PeekCleanupListForTesting() {
Expand Down Expand Up @@ -228,25 +306,16 @@ void* SerialArena::AllocateFromStringBlockFallback() {
PROTOBUF_NOINLINE
void* SerialArena::AllocateAlignedWithCleanupFallback(
size_t n, size_t align, void (*destructor)(void*)) {
size_t required = AlignUpTo(n, align) + cleanup::Size();
size_t required = AlignUpTo(n, align);
AllocateNewBlock(required);
return AllocateAlignedWithCleanup(n, align, destructor);
}

PROTOBUF_NOINLINE
void SerialArena::AddCleanupFallback(void* elem, void (*destructor)(void*)) {
AllocateNewBlock(cleanup::Size());
AddCleanupFromExisting(elem, destructor);
}

void SerialArena::AllocateNewBlock(size_t n) {
size_t used = 0;
size_t wasted = 0;
ArenaBlock* old_head = head();
if (!old_head->IsSentry()) {
// Sync limit to block
old_head->cleanup_nodes = limit_;

// Record how much used in this block.
used = static_cast<size_t>(ptr() - old_head->Pointer(kBlockHeaderSize));
wasted = old_head->size - used - kBlockHeaderSize;
Expand All @@ -258,7 +327,7 @@ void SerialArena::AllocateNewBlock(size_t n) {
// but with a CPU regression. The regression might have been an artifact of
// the microbenchmark.

auto mem = AllocateMemory(parent_.AllocPolicy(), old_head->size, n);
auto mem = AllocateBlock(parent_.AllocPolicy(), old_head->size, n);
AddSpaceAllocated(mem.n);
ThreadSafeArenaStats::RecordAllocateStats(parent_.arena_stats_.MutableStats(),
/*used=*/used,
Expand Down Expand Up @@ -319,34 +388,6 @@ size_t SerialArena::FreeStringBlocks(StringBlock* string_block,
return deallocated;
}

void SerialArena::CleanupList() {
ArenaBlock* b = head();
if (b->IsSentry()) return;

b->cleanup_nodes = limit_;
do {
char* limit = b->Limit();
char* it = reinterpret_cast<char*>(b->cleanup_nodes);
ABSL_DCHECK(!b->IsSentry() || it == limit);
// A prefetch distance of 8 here was chosen arbitrarily.
char* prefetch = it;
int prefetch_dist = 8;
for (; prefetch < limit && --prefetch_dist; prefetch += cleanup::Size()) {
cleanup::PrefetchNode(prefetch);
}
for (; prefetch < limit;
it += cleanup::Size(), prefetch += cleanup::Size()) {
cleanup::DestroyNode(it);
cleanup::PrefetchNode(prefetch);
}
absl::PrefetchToLocalCacheNta(b->next);
for (; it < limit; it += cleanup::Size()) {
cleanup::DestroyNode(it);
}
b = b->next;
} while (b);
}

// Stores arrays of void* and SerialArena* instead of linked list of
// SerialArena* to speed up traversing all SerialArena. The cost of walk is non
// trivial when there are many nodes. Separately storing "ids" minimizes cache
Expand Down Expand Up @@ -549,7 +590,7 @@ ArenaBlock* ThreadSafeArena::FirstBlock(void* buf, size_t size,

SizedPtr mem;
if (buf == nullptr || size < kBlockHeaderSize + kAllocPolicySize) {
mem = AllocateMemory(&policy, 0, kAllocPolicySize);
mem = AllocateBlock(&policy, 0, kAllocPolicySize);
} else {
mem = {buf, size};
// Record user-owned block.
Expand Down Expand Up @@ -688,11 +729,11 @@ void ThreadSafeArena::Init() {
}

ThreadSafeArena::~ThreadSafeArena() {
size_t space_allocated = 0;
// Have to do this in a first pass, because some of the destructors might
// refer to memory in other blocks.
CleanupList();
CleanupList(&space_allocated);

size_t space_allocated = 0;
auto mem = Free(&space_allocated);
if (alloc_policy_.is_user_owned_initial_block()) {
// Unpoison the initial block, now that it's going back to the user.
Expand Down Expand Up @@ -730,13 +771,13 @@ SizedPtr ThreadSafeArena::Free(size_t* space_allocated) {
}

uint64_t ThreadSafeArena::Reset() {
size_t space_allocated = 0;
// Have to do this in a first pass, because some of the destructors might
// refer to memory in other blocks.
CleanupList();
CleanupList(&space_allocated);

// Discard all blocks except the first one. Whether it is user-provided or
// allocated, always reuse the first block for the first arena.
size_t space_allocated = 0;
auto mem = Free(&space_allocated);
space_allocated += mem.n;

Expand Down Expand Up @@ -868,24 +909,24 @@ template void* ThreadSafeArena::AllocateAlignedFallback<
template void*
ThreadSafeArena::AllocateAlignedFallback<AllocationClient::kArray>(size_t);

void ThreadSafeArena::CleanupList() {
void ThreadSafeArena::CleanupList(size_t* space_allocated) {
#ifdef PROTOBUF_ASAN
UnpoisonAllArenaBlocks();
#endif

WalkSerialArenaChunk([](SerialArenaChunk* chunk) {
WalkSerialArenaChunk([space_allocated](SerialArenaChunk* chunk) {
absl::Span<std::atomic<SerialArena*>> span = chunk->arenas();
// Walks arenas backward to handle the first serial arena the last.
// Destroying in reverse-order to the construction is often assumed by users
// and required not to break inter-object dependencies. (b/247560530)
for (auto it = span.rbegin(); it != span.rend(); ++it) {
SerialArena* serial = it->load(std::memory_order_relaxed);
ABSL_DCHECK_NE(serial, nullptr);
serial->CleanupList();
serial->CleanupList(space_allocated);
}
});
// First arena must be cleaned up last. (b/247560530)
first_arena_.CleanupList();
first_arena_.CleanupList(space_allocated);
}

PROTOBUF_NOINLINE
Expand Down Expand Up @@ -914,7 +955,7 @@ SerialArena* ThreadSafeArena::GetSerialArenaFallback(size_t n) {
// have any blocks yet. So we'll allocate its first block now. It must be
// big enough to host SerialArena and the pending request.
serial = SerialArena::New(
AllocateMemory(alloc_policy_.get(), 0, n + kSerialArenaSize), *this);
AllocateBlock(alloc_policy_.get(), 0, n + kSerialArenaSize), *this);

AddSerialArena(id, serial);
}
Expand Down
Loading
Loading