Skip to content

Commit

Permalink
Put cleanup nodes on separate chunked list.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 625777108
  • Loading branch information
protobuf-github-bot authored and copybara-github committed Apr 17, 2024
1 parent 1d44d84 commit 0e5f4a6
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 174 deletions.
192 changes: 116 additions & 76 deletions src/google/protobuf/arena.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,46 @@ ArenaBlock* SentryArenaBlock() {
}
#endif

SizedPtr AllocateMemory(const AllocationPolicy* policy_ptr, size_t last_size,
size_t min_bytes) {
AllocationPolicy policy; // default policy
if (policy_ptr) policy = *policy_ptr;
size_t AllocationSize(size_t last_size, size_t min_size, size_t max_size) {
size_t size;
if (last_size != 0) {
// Double the current block size, up to a limit.
auto max_size = policy.max_block_size;
size = std::min(2 * last_size, max_size);
} else {
size = policy.start_block_size;
size = min_size;
}
// Verify that min_bytes + kBlockHeaderSize won't overflow.
ABSL_CHECK_LE(min_bytes, std::numeric_limits<size_t>::max() -
SerialArena::kBlockHeaderSize);
size = std::max(size, SerialArena::kBlockHeaderSize + min_bytes);
return size;
}

SizedPtr AllocateMemory(const AllocationPolicy& policy, size_t size) {
if (policy.block_alloc == nullptr) {
return AllocateAtLeast(size);
}
return {policy.block_alloc(size), size};
}

SizedPtr AllocateBlock(const AllocationPolicy* policy_ptr, size_t last_size,
size_t min_bytes) {
AllocationPolicy policy; // default policy
if (policy_ptr) policy = *policy_ptr;
size_t size = AllocationSize(last_size, min_bytes, policy.max_block_size);
// Verify that min_bytes + kBlockHeaderSize won't overflow.
ABSL_CHECK_LE(min_bytes, std::numeric_limits<size_t>::max() -
SerialArena::kBlockHeaderSize);
size = std::max(size, SerialArena::kBlockHeaderSize + min_bytes);

return AllocateMemory(policy, size);
}

SizedPtr AllocateCleanupChunk(const AllocationPolicy* policy_ptr,
size_t last_size) {
AllocationPolicy policy; // default policy
if (policy_ptr) policy = *policy_ptr;
const size_t size =
AllocationSize(last_size, /*min_size=*/64, /*max_size=*/4 << 10);
return AllocateMemory(policy, size);
}

class GetDeallocator {
public:
GetDeallocator(const AllocationPolicy* policy, size_t* space_allocated)
Expand All @@ -107,14 +124,89 @@ class GetDeallocator {

} // namespace

namespace cleanup {
struct ChunkList::Chunk {
CleanupNode* First() { return reinterpret_cast<CleanupNode*>(this + 1); }
CleanupNode* Last() { return First() + Capacity() - 1; }
static size_t Capacity(size_t size) {
return (size - sizeof(Chunk)) / sizeof(CleanupNode);
}
size_t Capacity() const { return Capacity(size); }

Chunk* next;
size_t size;
// Cleanup nodes follow.
};

void ChunkList::AddFallback(const ThreadSafeArena& arena, void* elem,
void (*destructor)(void*)) {
ABSL_DCHECK_EQ(next_, limit_);
SizedPtr mem = AllocateCleanupChunk(arena.AllocPolicy(),
head_ == nullptr ? 0 : head_->size);
head_ = new (mem.p) Chunk{head_, mem.n};
next_ = head_->First();
limit_ = next_ + Chunk::Capacity(mem.n);
AddFromExisting(elem, destructor);
}

void ChunkList::Cleanup(const ThreadSafeArena& arena, size_t* space_allocated) {
Chunk* c = head_;
if (c == nullptr) return;
GetDeallocator deallocator(arena.AllocPolicy(), space_allocated);

// Iterate backwards in order to destroy in the right order.
CleanupNode* it = next_ - 1;
head_ = nullptr;
next_ = limit_ = nullptr;
while (true) {
CleanupNode* first = c->First();
// A prefetch distance of 8 here was chosen arbitrarily.
CleanupNode* prefetch = it;
int prefetch_dist = 8;
for (; prefetch >= first && --prefetch_dist; --prefetch) {
prefetch->Prefetch();
}
for (; prefetch >= first; --it, --prefetch) {
it->Destroy();
prefetch->Prefetch();
}
absl::PrefetchToLocalCacheNta(c->next);
for (; it >= first; --it) {
it->Destroy();
}
Chunk* next = c->next;
deallocator({c, c->size});
if (next == nullptr) return;
c = next;
it = c->Last();
};
}

std::vector<void*> ChunkList::Peek() {
std::vector<void*> ret;
Chunk* c = head_;
if (c == nullptr) return ret;
// Iterate backwards to match destruction order.
CleanupNode* it = next_ - 1;
while (true) {
CleanupNode* first = c->First();
for (; it >= first; --it) {
ret.push_back(it->elem);
}
c = c->next;
if (c == nullptr) return ret;
it = c->Last();
};
}
} // namespace cleanup

// It is guaranteed that this is constructed in `b`. IOW, this is not the first
// arena and `b` cannot be sentry.
SerialArena::SerialArena(ArenaBlock* b, ThreadSafeArena& parent)
: ptr_{b->Pointer(kBlockHeaderSize + ThreadSafeArena::kSerialArenaSize)},
limit_{b->Limit()},
prefetch_ptr_(
b->Pointer(kBlockHeaderSize + ThreadSafeArena::kSerialArenaSize)),
prefetch_limit_(b->Limit()),
head_{b},
space_allocated_{b->size},
parent_{parent} {
Expand All @@ -135,22 +227,7 @@ SerialArena::SerialArena(FirstSerialArena, ArenaBlock* b,
}

std::vector<void*> SerialArena::PeekCleanupListForTesting() {
std::vector<void*> res;

ArenaBlock* b = head();
if (b->IsSentry()) return res;

const auto peek_list = [&](char* pos, char* end) {
for (; pos != end; pos += cleanup::Size()) {
cleanup::PeekNode(pos, res);
}
};

peek_list(limit_, b->Limit());
for (b = b->next; b; b = b->next) {
peek_list(reinterpret_cast<char*>(b->cleanup_nodes), b->Limit());
}
return res;
return cleanup_list_.Peek();
}

std::vector<void*> ThreadSafeArena::PeekCleanupListForTesting() {
Expand Down Expand Up @@ -228,25 +305,16 @@ void* SerialArena::AllocateFromStringBlockFallback() {
PROTOBUF_NOINLINE
void* SerialArena::AllocateAlignedWithCleanupFallback(
size_t n, size_t align, void (*destructor)(void*)) {
size_t required = AlignUpTo(n, align) + cleanup::Size();
size_t required = AlignUpTo(n, align);
AllocateNewBlock(required);
return AllocateAlignedWithCleanup(n, align, destructor);
}

PROTOBUF_NOINLINE
void SerialArena::AddCleanupFallback(void* elem, void (*destructor)(void*)) {
AllocateNewBlock(cleanup::Size());
AddCleanupFromExisting(elem, destructor);
}

void SerialArena::AllocateNewBlock(size_t n) {
size_t used = 0;
size_t wasted = 0;
ArenaBlock* old_head = head();
if (!old_head->IsSentry()) {
// Sync limit to block
old_head->cleanup_nodes = limit_;

// Record how much used in this block.
used = static_cast<size_t>(ptr() - old_head->Pointer(kBlockHeaderSize));
wasted = old_head->size - used - kBlockHeaderSize;
Expand All @@ -258,7 +326,7 @@ void SerialArena::AllocateNewBlock(size_t n) {
// but with a CPU regression. The regression might have been an artifact of
// the microbenchmark.

auto mem = AllocateMemory(parent_.AllocPolicy(), old_head->size, n);
auto mem = AllocateBlock(parent_.AllocPolicy(), old_head->size, n);
AddSpaceAllocated(mem.n);
ThreadSafeArenaStats::RecordAllocateStats(parent_.arena_stats_.MutableStats(),
/*used=*/used,
Expand Down Expand Up @@ -319,34 +387,6 @@ size_t SerialArena::FreeStringBlocks(StringBlock* string_block,
return deallocated;
}

void SerialArena::CleanupList() {
ArenaBlock* b = head();
if (b->IsSentry()) return;

b->cleanup_nodes = limit_;
do {
char* limit = b->Limit();
char* it = reinterpret_cast<char*>(b->cleanup_nodes);
ABSL_DCHECK(!b->IsSentry() || it == limit);
// A prefetch distance of 8 here was chosen arbitrarily.
char* prefetch = it;
int prefetch_dist = 8;
for (; prefetch < limit && --prefetch_dist; prefetch += cleanup::Size()) {
cleanup::PrefetchNode(prefetch);
}
for (; prefetch < limit;
it += cleanup::Size(), prefetch += cleanup::Size()) {
cleanup::DestroyNode(it);
cleanup::PrefetchNode(prefetch);
}
absl::PrefetchToLocalCacheNta(b->next);
for (; it < limit; it += cleanup::Size()) {
cleanup::DestroyNode(it);
}
b = b->next;
} while (b);
}

// Stores arrays of void* and SerialArena* instead of linked list of
// SerialArena* to speed up traversing all SerialArena. The cost of walk is non
// trivial when there are many nodes. Separately storing "ids" minimizes cache
Expand Down Expand Up @@ -549,7 +589,7 @@ ArenaBlock* ThreadSafeArena::FirstBlock(void* buf, size_t size,

SizedPtr mem;
if (buf == nullptr || size < kBlockHeaderSize + kAllocPolicySize) {
mem = AllocateMemory(&policy, 0, kAllocPolicySize);
mem = AllocateBlock(&policy, 0, kAllocPolicySize);
} else {
mem = {buf, size};
// Record user-owned block.
Expand Down Expand Up @@ -688,11 +728,11 @@ void ThreadSafeArena::Init() {
}

ThreadSafeArena::~ThreadSafeArena() {
size_t space_allocated = 0;
// Have to do this in a first pass, because some of the destructors might
// refer to memory in other blocks.
CleanupList();
CleanupList(&space_allocated);

size_t space_allocated = 0;
auto mem = Free(&space_allocated);
if (alloc_policy_.is_user_owned_initial_block()) {
// Unpoison the initial block, now that it's going back to the user.
Expand Down Expand Up @@ -730,13 +770,13 @@ SizedPtr ThreadSafeArena::Free(size_t* space_allocated) {
}

uint64_t ThreadSafeArena::Reset() {
size_t space_allocated = 0;
// Have to do this in a first pass, because some of the destructors might
// refer to memory in other blocks.
CleanupList();
CleanupList(&space_allocated);

// Discard all blocks except the first one. Whether it is user-provided or
// allocated, always reuse the first block for the first arena.
size_t space_allocated = 0;
auto mem = Free(&space_allocated);
space_allocated += mem.n;

Expand Down Expand Up @@ -868,24 +908,24 @@ template void* ThreadSafeArena::AllocateAlignedFallback<
template void*
ThreadSafeArena::AllocateAlignedFallback<AllocationClient::kArray>(size_t);

void ThreadSafeArena::CleanupList() {
void ThreadSafeArena::CleanupList(size_t* space_allocated) {
#ifdef PROTOBUF_ASAN
UnpoisonAllArenaBlocks();
#endif

WalkSerialArenaChunk([](SerialArenaChunk* chunk) {
WalkSerialArenaChunk([space_allocated](SerialArenaChunk* chunk) {
absl::Span<std::atomic<SerialArena*>> span = chunk->arenas();
// Walks arenas backward to handle the first serial arena the last.
// Destroying in reverse-order to the construction is often assumed by users
// and required not to break inter-object dependencies. (b/247560530)
for (auto it = span.rbegin(); it != span.rend(); ++it) {
SerialArena* serial = it->load(std::memory_order_relaxed);
ABSL_DCHECK_NE(serial, nullptr);
serial->CleanupList();
serial->CleanupList(space_allocated);
}
});
// First arena must be cleaned up last. (b/247560530)
first_arena_.CleanupList();
first_arena_.CleanupList(space_allocated);
}

PROTOBUF_NOINLINE
Expand Down Expand Up @@ -914,7 +954,7 @@ SerialArena* ThreadSafeArena::GetSerialArenaFallback(size_t n) {
// have any blocks yet. So we'll allocate its first block now. It must be
// big enough to host SerialArena and the pending request.
serial = SerialArena::New(
AllocateMemory(alloc_policy_.get(), 0, n + kSerialArenaSize), *this);
AllocateBlock(alloc_policy_.get(), 0, n + kSerialArenaSize), *this);

AddSerialArena(id, serial);
}
Expand Down
Loading

0 comments on commit 0e5f4a6

Please sign in to comment.