Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Julia 1.11 compat: Use Base.BufferStream for capturing Pkg IO #2915

Merged
merged 11 commits into from
May 21, 2024
32 changes: 17 additions & 15 deletions src/packages/IOListener.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,51 @@
module ANSIEmulation include("./ANSIEmulation.jl") end


"A polling system to watch for writes to an IOBuffer. Up-to-date content will be passed as string to the `callback` function."
"A polling system to watch for writes to a `Base.BufferStream`. Up-to-date content will be passed as string to the `callback` function."
Base.@kwdef struct IOListener
callback::Function
interval::Real=1.0/60
running::Ref{Bool}=Ref(false)

buffer::IOBuffer=IOBuffer()
last_size::Ref{Int}=Ref(0)
buffer::Base.BufferStream=Base.BufferStream()
ansi_state::ANSIEmulation.ANSITerminalState=ANSIEmulation.ANSITerminalState()
end

function trigger(listener::IOListener)
old_size = listener.last_size[]
new_size = listener.buffer.size
if new_size > old_size
# @debug "making string"
s = String(@view listener.buffer.data[old_size+1:new_size])
# @debug "making ansi"
if isreadable(listener.buffer)
newdata = readavailable(listener.buffer)
isempty(newdata) && return
s = String(newdata)
ANSIEmulation.consume_safe!(
listener.ansi_state,
s
)
# @debug "building string" s listener.ansi_state
new_contents = ANSIEmulation.build_str(listener.ansi_state)

listener.last_size[] = new_size
listener.callback(new_contents)
end
end

function startlistening(listener::IOListener)
if !listener.running[]
listener.running[] = true
@async while listener.running[]
trigger(listener)
sleep(listener.interval)
@async try
while listener.running[]
trigger(listener)
sleep(listener.interval)
end
catch ex
println(stderr, "IOListener loop error")
showerror(stderr, ex, stacktrace(catch_backtrace()))
rethrow(ex)
end
end
end
function stoplistening(listener::IOListener)
if listener.running[]
listener.running[] = false
trigger(listener)
bytesavailable(listener.buffer) > 0 && trigger(listener)
close(listener.buffer)
end
end

Expand Down
20 changes: 10 additions & 10 deletions src/packages/Packages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ function sync_nbpkg_core(
old_topology::NotebookTopology,
new_topology::NotebookTopology;
on_terminal_output::Function=((args...) -> nothing),
cleanup::Ref{Function}=Ref{Function}(_default_cleanup),
cleanup_iolistener::Ref{Function}=Ref{Function}(_default_cleanup),
lag::Real=0,
compiler_options::CompilerOptions=CompilerOptions(),
)
Expand Down Expand Up @@ -115,7 +115,7 @@ function sync_nbpkg_core(
report_to = ["nbpkg_sync", busy_packages...]
IOListener(callback=(s -> on_terminal_output(report_to, freeze_loading_spinners(s))))
end
cleanup[] = () -> stoplistening(iolistener)
cleanup_iolistener[] = () -> stoplistening(iolistener)



Expand Down Expand Up @@ -321,7 +321,7 @@ In addition to the steps performed by [`sync_nbpkg_core`](@ref):
function sync_nbpkg(session, notebook, old_topology::NotebookTopology, new_topology::NotebookTopology; save::Bool=true, take_token::Bool=true)
@assert will_run_pkg(notebook)

cleanup = Ref{Function}(_default_cleanup)
cleanup_iolistener = Ref{Function}(_default_cleanup)
try
Status.report_business_started!(notebook.status_tree, :pkg)

Expand All @@ -344,7 +344,7 @@ function sync_nbpkg(session, notebook, old_topology::NotebookTopology, new_topol
old_topology,
new_topology;
on_terminal_output=iocallback,
cleanup,
cleanup_iolistener,
lag=session.options.server.simulated_pkg_lag,
compiler_options=_merge_notebook_compiler_options(notebook, session.options.compiler),
)
Expand Down Expand Up @@ -399,7 +399,7 @@ function sync_nbpkg(session, notebook, old_topology::NotebookTopology, new_topol

save && save_notebook(session, notebook)
finally
cleanup[]()
cleanup_iolistener[]()
Status.report_business_finished!(notebook.status_tree, :pkg)
end
end
Expand Down Expand Up @@ -539,7 +539,7 @@ function update_nbpkg_core(
notebook::Notebook;
level::Pkg.UpgradeLevel=Pkg.UPLEVEL_MAJOR,
on_terminal_output::Function=((args...) -> nothing),
cleanup::Ref{Function}=Ref{Function}(default_cleanup),
cleanup_iolistener::Ref{Function}=Ref{Function}(default_cleanup),
compiler_options::CompilerOptions=CompilerOptions(),
)
if notebook.nbpkg_ctx !== nothing
Expand All @@ -552,7 +552,7 @@ function update_nbpkg_core(
report_to = ["nbpkg_update", old_packages...]
IOListener(callback=(s -> on_terminal_output(report_to, freeze_loading_spinners(s))))
end
cleanup[] = () -> stoplistening(iolistener)
cleanup_iolistener[] = () -> stoplistening(iolistener)

if !isready(pkg_token)
println(iolistener.buffer, "Waiting for other notebooks to finish Pkg operations...")
Expand Down Expand Up @@ -622,7 +622,7 @@ function update_nbpkg(session, notebook::Notebook; level::Pkg.UpgradeLevel=Pkg.U
bp = if backup && save
writebackup(notebook)
end
cleanup = Ref{Function}(_default_cleanup)
cleanup_iolistener = Ref{Function}(_default_cleanup)

try
pkg_result = withtoken(notebook.executetoken) do
Expand All @@ -640,7 +640,7 @@ function update_nbpkg(session, notebook::Notebook; level::Pkg.UpgradeLevel=Pkg.U
notebook;
level,
on_terminal_output=iocallback,
cleanup,
cleanup_iolistener,
compiler_options=_merge_notebook_compiler_options(notebook, session.options.compiler),
)
end
Expand All @@ -658,7 +658,7 @@ function update_nbpkg(session, notebook::Notebook; level::Pkg.UpgradeLevel=Pkg.U
!isnothing(bp) && isfile(bp) && rm(bp)
end
finally
cleanup[]()
cleanup_iolistener[]()
notebook.nbpkg_busy_packages = String[]
update_nbpkg_cache!(notebook)
send_notebook_changes!(ClientRequest(; session, notebook))
Expand Down
2 changes: 1 addition & 1 deletion src/packages/PkgCompat.jl
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ end

# I'm a pirate harrr 🏴‍☠️
@static if isdefined(Pkg, :can_fancyprint)
Pkg.can_fancyprint(io::IOContext{IOBuffer}) = get(io, :sneaky_enable_tty, false) === true
Pkg.can_fancyprint(io::Union{IOContext{IOBuffer},IOContext{Base.BufferStream}}) = get(io, :sneaky_enable_tty, false) === true
end

###
Expand Down
4 changes: 2 additions & 2 deletions test/packages/Basic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import Malt
@test notebook.nbpkg_restart_required_msg === nothing
@test notebook.nbpkg_ctx_instantiated
@test notebook.nbpkg_install_time_ns > 0
@test notebook.nbpkg_busy_packages |> isempty
@test notebook.nbpkg_busy_packages == []
last_install_time = notebook.nbpkg_install_time_ns

terminals = notebook.nbpkg_terminal_outputs
Expand Down Expand Up @@ -79,7 +79,7 @@ import Malt
@test notebook.nbpkg_restart_required_msg === nothing
@test notebook.nbpkg_ctx_instantiated
@test notebook.nbpkg_install_time_ns > last_install_time
@test notebook.nbpkg_busy_packages |> isempty
@test notebook.nbpkg_busy_packages == []
last_install_time = notebook.nbpkg_install_time_ns

@test haskey(terminals, "PlutoPkgTestB")
Expand Down
Loading