Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WebSocket concurrents send #1197

Merged
merged 2 commits into from
Nov 27, 2024
Merged

Conversation

attdona
Copy link
Contributor

@attdona attdona commented Nov 22, 2024

Fixes #1196

Copy link
Member

@quinnj quinnj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM; an alternative to consider is adding a write lock to the WebSocket that we hold throughout writeframe; it would avoid the additional IOBuffer for each writeframe. It can be tricky to benchmark network-related changes, but I suspect it might be faster too.

@attdona
Copy link
Contributor Author

attdona commented Nov 26, 2024

Ok, before merging I can try to do the comparison using a lock instead, I think it should work, let's see the benchmarks ....

@attdona
Copy link
Contributor Author

attdona commented Nov 26, 2024

I've put a ReentrantLock around the writes in writeframe:
concurrent writes are much slower than using an additional buffer. It make sense to me because the lock makes the code sequential.

@quinnj
Copy link
Member

quinnj commented Nov 27, 2024

Yeah, it makes sense that total throughput of concurrent writes would go down w/ the lock. I guess that's a shift in the paradigm of how I've traditionally used websockets where it's always a single send-and-receive. So all in all, let's go w/ the IOBuffer per writeframe approach. Is this PR ready to merge then?

(thanks for doing the ReentrantLock approach; I appreciate it!)

@fredrikekre
Copy link
Member

The buffer could possibly be a task local variable and reused between calls, but let's merge this to get the fix in first.

@fredrikekre fredrikekre merged commit 0854b3e into JuliaWeb:master Nov 27, 2024
13 of 16 checks passed
@attdona
Copy link
Contributor Author

attdona commented Nov 28, 2024

I tried the task-local storage but results are not what was expected, it seems that memory allocation gets worst, and I find
this hard to understand.

Below what I tried.

The modified writeframe() method:

const frameio = TaskLocalValue{IOBuffer}(() -> IOBuffer())

function writeframe(io::IO, x::Frame)
    n = write(frameio[], hton(uint16(x.flags)))
    if x.extendedlen !== nothing
        n += write(frameio[], hton(x.extendedlen))
    end
    if x.mask != EMPTY_MASK
        n += write(frameio[], UInt32(x.mask))
    end
    pl = x.payload
    # manually unroll a few known type cases to help the compiler
    if pl isa Vector{UInt8}
        n += write(frameio[], pl)
    elseif pl isa Base.CodeUnits{UInt8,String}
        n += write(frameio[], pl)
    else
        n += write(frameio[], pl)
    end
    write(io.io, take!(frameio[]))
    return n
end

bench.jl send 100 packets using 100 async tasks:

using HTTP.WebSockets

const COUNT = 100

function write_message(ws, msg)
    for i in 1:100
        send(ws, msg)
    end
end

function client_twin(ws)
    for count in 0:COUNT
        @async write_message(ws, count)
    end
end

function serve()
    server = WebSockets.listen!("127.0.0.1", 8081) do ws
        client_twin(ws)
        receive(ws)
    end
    wait(server)
end

function write()
    WebSockets.open("ws://127.0.0.1:8081") do ws
        try
            s = receive(ws)
            t = time()
            count = 0
            while count < COUNT * 100
                s = receive(ws)
                count += 1
            end
            delta = time() - t
            println("delta time: $delta")
            close(ws)
        catch e
            @error "ws client: $e"
        end
    end
end

srvtask = @async serve()
julia -i bench.jl
julia> write()
julia> @time write()

# writeframe current version with additional buffer
delta time: 0.13498997688293457
  0.139230 seconds (142.53 k allocations: 11.041 MiB)

# writeframe with TaskLocalValue
delta time: 0.14266705513000488
  0.146890 seconds (183.03 k allocations: 11.225 MiB)

@fredrikekre
Copy link
Member

IOBuffer is not a concrete type so every access to frameio is unstable. With

const frameio = TaskLocalValue(() -> IOBuffer())

and using buff = frameio[] once at the top of the function and replacing write(io.io, take!(frameio[])) with write(io.io, seekstart(buff)) you might see some differences. I get

julia> @time write();
delta time: 0.13005280494689941
  0.133447 seconds (202.61 k allocations: 11.956 MiB)

for the master branch and

julia> @time write();
delta time: 0.022337913513183594
  0.031560 seconds (70.95 k allocations: 18.650 MiB, 5.93% gc time)

with those changes to your bechmark.

@attdona
Copy link
Contributor Author

attdona commented Nov 28, 2024

Nice!, seekstart makes the difference. If adding the TaskLocalValue dep is not a problem I could make a new PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

WebSockets does not support concurrent sends
3 participants