Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions src/client/httplibs/juliaweb_http.jl
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,16 @@ function _http_streaming_request(ctx, method, url, headers, body, timeout, bytes
open_kwargs = merge(open_kwargs, (; verbose=get(ctx.client.clntoptions, :verbose, false)))
end

# Capture the streaming connection so the abort-on-close watcher below can
# unblock the read task when the consumer closes `stream_to`. (Mirrors the
# `:downloads` backend, which interrupts its download task on channel close.)
io_ref = Ref{Any}(nothing)

@sync begin
@async begin
read_task = @async begin
try
HTTP.open(method, url, headers; open_kwargs...) do io
io_ref[] = io
write(io, body)
captured_response[] = http_response = startread(io)
try
Expand All @@ -311,7 +317,15 @@ function _http_streaming_request(ctx, method, url, headers, body, timeout, bytes
end
catch ex
close(output)
rethrow(ex)
# When the consumer closes `stream_to`, the watcher task below aborts
# this read (close(io) + a scheduled InterruptException); that surfaces
# here as an IO error or InterruptException. Swallow it so the streaming
# request returns normally instead of propagating a spurious error. A
# read timeout or genuine network error arrives while `stream_to` is
# still open (and is not our InterruptException), so it still throws.
if isopen(stream_to) && !isa(ex, InterruptException)
rethrow(ex)
end
end
end

Expand All @@ -337,6 +351,45 @@ function _http_streaming_request(ctx, method, url, headers, body, timeout, bytes
close(stream_to)
end
end

@async begin
# Abort-on-close watcher: when the consumer closes `stream_to` (e.g. a
# k8s watch is stopped via `close(stream)`), abort the read task above so
# it unblocks immediately instead of hanging on the socket until the
# read-idle timeout. Mirrors the `:downloads` backend, which interrupts
# its download task on channel close.
try
while isopen(stream_to)
wait(stream_to)
yield()
end
catch ex
isa(ex, InvalidStateException) || rethrow(ex)
end
# Best-effort close of the connection. On HTTP/2 this alone does NOT wake a
# body read parked on the flow-control timer, so we also forcibly interrupt
# the read task (as `:downloads` does for non-interruptible downloads).
io = io_ref[]
# `io` may be `nothing` if the consumer closed before the connection was
# established; in practice a consumer only stops after receiving an event
# (or a timer fires seconds later), so the connection is already up. This
# is the same theoretical hole the `:downloads` watcher has.
if io !== nothing
try
close(io)
catch
# already closed / natural EOF — nothing to abort
end
end
# `stream_to` also closes on natural EOF (the chunk reader closes it after
# the read task finishes); the `istaskdone` guard skips the interrupt in
# that case. For JuliaRun's watch consumers an interrupt of an already-
# finishing read is harmless anyway — it maps to `is_request_interrupted`,
# the same signal a read-idle timeout produces, which they already retry on.
if !istaskdone(read_task)
schedule(read_task, InterruptException(); error=true)
end
end
end

return http_response, output
Expand Down
Loading