Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to zig master #9

Open
wants to merge 9 commits into
base: blog
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions benchmarks/zig/async.zig
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ fn SpawnHandle(comptime T: type) type {
suspend {
// Acquire barrier to ensuer we see the join()'s *Waiter writes if present.
// Release barrier to ensure join() and detach() see our *Waiter writes.
const state = self.state.swap(@ptrToInt(&waiter), .AcqRel);
const state = self.state.swap(@intFromPtr(&waiter), .AcqRel);

// If join() or detach() were called before us.
if (state != 0) {
// Then fill the result value for join() & wake it up.
if (state != DETACHED) {
const joiner = @intToPtr(*Waiter, state);
const joiner = @ptrFromInt(*Waiter, state);
joiner.value = waiter.value;
joiner.task.schedule();
}
Expand All @@ -130,7 +130,7 @@ fn SpawnHandle(comptime T: type) type {
suspend {
// Acquire barrier to ensuer we see the complete()'s *Waiter writes if present.
// Release barrier to ensure complete() sees our *Waiter writes.
if (@intToPtr(?*Waiter, self.state.swap(@ptrToInt(&waiter), .AcqRel))) |completer| {
if (@ptrFromInt(?*Waiter, self.state.swap(@intFromPtr(&waiter), .AcqRel))) |completer| {
// complete() was waiting for us to consume its value.
// Do so and reschedule both of us.
waiter.value = completer.value;
Expand All @@ -149,7 +149,7 @@ fn SpawnHandle(comptime T: type) type {
// Mark the state as detached, making a subsequent complete() no-op
// Wake up the waiting complete() if it was there before us.
// Acquire barrier in order to see the complete()'s *Waiter writes.
if (@intToPtr(?*Waiter, self.state.swap(DETACHED, .Acquire))) |completer| {
if (@ptrFromInt(?*Waiter, self.state.swap(DETACHED, .Acquire))) |completer| {
completer.task.schedule();
}
}
Expand Down
27 changes: 16 additions & 11 deletions benchmarks/zig/build.zig
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
const std = @import("std");

pub fn build(b: *std.build.Builder) void {
const mode = b.standardReleaseOptions();
const optimize = b.standardOptimizeOption(.{});
const target = b.standardTargetOptions(.{});
const link_c = b.option(bool, "c", "link libc") orelse false;

const exe = b.addExecutable("qsort", "qsort.zig");
const exe = b.addExecutable(.{
.name = "qsort",
.root_source_file = .{ .path = "qsort.zig" },
.optimize = optimize,
.target = target
});
if (link_c) {
exe.linkLibC();
}

exe.addPackage(.{
.name = "thread_pool",
.path = .{ .path = "../../src/thread_pool.zig" },
});
exe.setTarget(target);
exe.setBuildMode(mode);
exe.install();
exe.addModule("thread_pool", b.createModule(.{
.source_file = .{ .path = "../../src/thread_pool.zig" }
}));
b.installArtifact(exe);

const run_cmd = b.addRunArtifact(exe);
run_cmd.step.dependOn(b.getInstallStep());

const run = b.step("run", "Run the benchmark");
run.dependOn(&exe.run().step);
const run_step = b.step("run", "Run the app");
run_step.dependOn(&run_cmd.step);
}
8 changes: 4 additions & 4 deletions benchmarks/zig/qsort.zig
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn asyncMain() void {
defer Async.allocator.free(arr);

std.debug.print("filling\n", .{});
for (arr) |*item, i| {
for (arr, 0..) |*item, i| {
item.* = @intCast(i32, i);
}

Expand Down Expand Up @@ -52,7 +52,7 @@ fn verify(arr: []const i32) bool {

fn shuffle(arr: []i32) void {
var xs: u32 = 0xdeadbeef;
for (arr) |_, i| {
for (arr, 0..) |_, i| {
xs ^= xs << 13;
xs ^= xs >> 17;
xs ^= xs << 5;
Expand All @@ -78,7 +78,7 @@ fn quickSort(arr: []i32) void {
fn partition(arr: []i32) usize {
const pivot = arr.len - 1;
var i: usize = 0;
for (arr[0..pivot]) |_, j| {
for (arr[0..pivot], 0..) |_, j| {
if (arr[j] <= arr[pivot]) {
std.mem.swap(i32, &arr[j], &arr[i]);
i += 1;
Expand All @@ -89,7 +89,7 @@ fn partition(arr: []i32) usize {
}

fn insertionSort(arr: []i32) void {
for (arr[1..]) |_, i| {
for (arr[1..], 0..) |_, i| {
var n = i + 1;
while (n > 0 and arr[n] < arr[n - 1]) {
std.mem.swap(i32, &arr[n], &arr[n - 1]);
Expand Down
6 changes: 6 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
const std = @import("std");

pub fn build(b: *std.build.Builder) void {
_ = b.addModule("zap", .{ .source_file = .{ .path = "src/thread_pool.zig" } });
_ = b.addModule("zap_go", .{ .source_file = .{ .path = "src/thread_pool_go_based.zig" } });
}
64 changes: 32 additions & 32 deletions src/thread_pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const Atomic = std.atomic.Atomic;

stack_size: u32,
max_threads: u32,
sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})),
sync: Atomic(u32) = Atomic(u32).init(@bitCast(Sync{})),
idle_event: Event = .{},
join_event: Event = .{},
run_queue: Node.Queue = .{},
Expand Down Expand Up @@ -46,8 +46,8 @@ pub const Config = struct {
/// Statically initialize the thread pool using the configuration.
pub fn init(config: Config) ThreadPool {
return .{
.stack_size = std.math.max(1, config.stack_size),
.max_threads = std.math.max(1, config.max_threads),
.stack_size = @max(1, config.stack_size),
.max_threads = @max(1, config.max_threads),
};
}

Expand All @@ -61,7 +61,7 @@ pub fn deinit(self: *ThreadPool) void {
/// The user provides a `callback` which is invoked when the *Task can run on a thread.
pub const Task = struct {
node: Node = .{},
callback: fn (*Task) void,
callback: *const fn (*Task) void,
};

/// An unordered collection of Tasks which can be submitted for scheduling as a group.
Expand Down Expand Up @@ -121,7 +121,7 @@ inline fn notify(self: *ThreadPool, is_waking: bool) void {
// Fast path to check the Sync state to avoid calling into notifySlow().
// If we're waking, then we need to update the state regardless
if (!is_waking) {
const sync = @bitCast(Sync, self.sync.load(.Monotonic));
const sync: Sync = @bitCast(self.sync.load(.Monotonic));
if (sync.notified) {
return;
}
Expand All @@ -131,7 +131,7 @@ inline fn notify(self: *ThreadPool, is_waking: bool) void {
}

noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
var sync: Sync = @bitCast(self.sync.load(.Monotonic));
while (sync.state != .shutdown) {

const can_wake = is_waking or (sync.state == .pending);
Expand All @@ -154,9 +154,9 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {

// Release barrier synchronizes with Acquire in wait()
// to ensure pushes to run queues happen before observing a posted notification.
sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
sync = @bitCast(self.sync.tryCompareAndSwap(
@bitCast(sync),
@bitCast(new_sync),
.Release,
.Monotonic,
) orelse {
Expand All @@ -180,7 +180,7 @@ noinline fn notifySlow(self: *ThreadPool, is_waking: bool) void {
noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
var is_idle = false;
var is_waking = _is_waking;
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
var sync: Sync = @bitCast(self.sync.load(.Monotonic));

while (true) {
if (sync.state == .shutdown) return error.Shutdown;
Expand All @@ -197,9 +197,9 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {

// Acquire barrier synchronizes with notify()
// to ensure that pushes to run queue are observed after wait() returns.
sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
sync = @bitCast(self.sync.tryCompareAndSwap(
@bitCast(sync),
@bitCast(new_sync),
.Acquire,
.Monotonic,
) orelse {
Expand All @@ -214,9 +214,9 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
if (is_waking)
new_sync.state = .pending;

sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
sync = @bitCast(self.sync.tryCompareAndSwap(
@bitCast(sync),
@bitCast(new_sync),
.Monotonic,
.Monotonic,
) orelse {
Expand All @@ -229,24 +229,24 @@ noinline fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
// TODO: Add I/O polling here.
} else {
self.idle_event.wait();
sync = @bitCast(Sync, self.sync.load(.Monotonic));
sync = @bitCast(self.sync.load(.Monotonic));
}
}
}

/// Marks the thread pool as shutdown
pub noinline fn shutdown(self: *ThreadPool) void {
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
var sync: Sync = @bitCast(self.sync.load(.Monotonic));
while (sync.state != .shutdown) {
var new_sync = sync;
new_sync.notified = true;
new_sync.state = .shutdown;
new_sync.idle = 0;

// Full barrier to synchronize with both wait() and notify()
sync = @bitCast(Sync, self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
sync = @bitCast(self.sync.tryCompareAndSwap(
@bitCast(sync),
@bitCast(new_sync),
.AcqRel,
.Monotonic,
) orelse {
Expand Down Expand Up @@ -274,8 +274,8 @@ fn register(noalias self: *ThreadPool, noalias thread: *Thread) void {

fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void {
// Un-spawn one thread, either due to a failed OS thread spawning or the thread is exitting.
const one_spawned = @bitCast(u32, Sync{ .spawned = 1 });
const sync = @bitCast(Sync, self.sync.fetchSub(one_spawned, .Release));
const one_spawned: u32 = @bitCast(Sync{ .spawned = 1 });
const sync: Sync = @bitCast(self.sync.fetchSub(one_spawned, .Release));
assert(sync.spawned > 0);

// The last thread to exit must wake up the thread pool join()er
Expand All @@ -297,10 +297,10 @@ fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void {

fn join(self: *ThreadPool) void {
// Wait for the thread pool to be shutdown() then for all threads to enter a joinable state
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
var sync: Sync = @bitCast(self.sync.load(.Monotonic));
if (!(sync.state == .shutdown and sync.spawned == 0)) {
self.join_event.wait();
sync = @bitCast(Sync, self.sync.load(.Monotonic));
sync = @bitCast(self.sync.load(.Monotonic));
}

assert(sync.state == .shutdown);
Expand Down Expand Up @@ -366,9 +366,9 @@ const Thread = struct {
}

// TODO: add optimistic I/O polling here

// Then try work stealing from other threads
var num_threads: u32 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned;
var num_threads: u32 = @as(Sync, @bitCast(thread_pool.sync.load(.Monotonic))).spawned;
while (num_threads > 0) : (num_threads -= 1) {
// Traverse the stack of registered threads on the thread pool
const target = self.target orelse thread_pool.threads.load(.Acquire) orelse unreachable;
Expand Down Expand Up @@ -451,7 +451,7 @@ const Event = struct {
// Acquiring to WAITING will make the next notify() or shutdown() wake a sleeping futex thread
// who will either exit on SHUTDOWN or acquire with WAITING again, ensuring all threads are awoken.
// This unfortunately results in the last notify() or shutdown() doing an extra futex wake but that's fine.
std.Thread.Futex.wait(&self.state, WAITING, null) catch unreachable;
std.Thread.Futex.wait(&self.state, WAITING);
state = self.state.load(.Monotonic);
acquire_with = WAITING;
}
Expand Down Expand Up @@ -509,11 +509,11 @@ const Node = struct {
var stack = self.stack.load(.Monotonic);
while (true) {
// Attach the list to the stack (pt. 1)
list.tail.next = @intToPtr(?*Node, stack & PTR_MASK);
list.tail.next = @ptrFromInt(stack & PTR_MASK);

// Update the stack with the list (pt. 2).
// Don't change the HAS_CACHE and IS_CONSUMING bits of the consumer.
var new_stack = @ptrToInt(list.head);
var new_stack = @intFromPtr(list.head);
assert(new_stack & ~PTR_MASK == 0);
new_stack |= (stack & ~PTR_MASK);

Expand Down Expand Up @@ -549,7 +549,7 @@ const Node = struct {
new_stack,
.Acquire,
.Monotonic,
) orelse return self.cache orelse @intToPtr(*Node, stack & PTR_MASK);
) orelse return self.cache orelse @ptrFromInt(stack & PTR_MASK);
}
}

Expand Down Expand Up @@ -586,7 +586,7 @@ const Node = struct {
assert(stack & IS_CONSUMING != 0);
assert(stack & PTR_MASK != 0);

const node = @intToPtr(*Node, stack & PTR_MASK);
const node: *Node = @ptrFromInt(stack & PTR_MASK);
consumer_ref.* = node.next;
return node;
}
Expand Down
8 changes: 4 additions & 4 deletions src/thread_pool_go_based.zig
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,11 @@ const Node = struct {
var stack = self.stack.load(.Monotonic);
while (true) {
// Attach the list to the stack (pt. 1)
list.tail.next = @intToPtr(?*Node, stack & PTR_MASK);
list.tail.next = @ptrFromInt(?*Node, stack & PTR_MASK);

// Update the stack with the list (pt. 2).
// Don't change the HAS_CACHE and IS_CONSUMING bits of the consumer.
var new_stack = @ptrToInt(list.head);
var new_stack = @intFromPtr(list.head);
assert(new_stack & ~PTR_MASK == 0);
new_stack |= (stack & ~PTR_MASK);

Expand Down Expand Up @@ -396,7 +396,7 @@ const Node = struct {
new_stack,
.Acquire,
.Monotonic,
) orelse return self.cache orelse @intToPtr(*Node, stack & PTR_MASK);
) orelse return self.cache orelse @ptrFromInt(*Node, stack & PTR_MASK);
}
}

Expand Down Expand Up @@ -433,7 +433,7 @@ const Node = struct {
assert(stack & IS_CONSUMING != 0);
assert(stack & PTR_MASK != 0);

const node = @intToPtr(*Node, stack & PTR_MASK);
const node = @ptrFromInt(*Node, stack & PTR_MASK);
consumer_ref.* = node.next;
return node;
}
Expand Down