From a89bb8ac464321126fabfb7b6679c43cf3a8f0ea Mon Sep 17 00:00:00 2001 From: tan Date: Thu, 2 Jul 2026 20:22:08 +0530 Subject: [PATCH] Abort :http streaming read when the consumer closes the channel The :http backend's streaming request had no way to abort an in-flight socket read when the consumer closes `stream_to`. Closing the channel stopped the chunk-reader task but left the read task blocked in `eof(io)`/`readbytes!(io)` on the socket, so a streaming request whose consumer stops early (e.g. a Kubernetes watch stopped via `close(stream)` after the awaited event arrives, or a timer firing) would hang until the read-idle timeout instead of returning promptly. Add an abort-on-close watcher task, mirroring the :downloads backend: capture the connection `io`, and when `stream_to` closes, close `io` and schedule an `InterruptException` on the read task. `close(io)` alone does not wake an HTTP/2 body read parked on the flow-control timer (`_wait_h2_body_progress!` -> `timedwait`), so the scheduled interrupt is what reliably unblocks it (the same fallback :downloads uses for non-interruptible downloads). The read task swallows the abort (channel closed, or our InterruptException) and returns normally; a genuine network error or read-idle timeout arrives while `stream_to` is still open, so it still propagates. The interrupt surfaces to callers as `InvocationException("request was interrupted")` via `exec`, which `is_request_interrupted` already recognizes. Affects both HTTP.jl 1.x and 2.x (shared :http code path). Validated against a live Kubernetes watch on HTTP 1.11 and 2.5: consumer-initiated stop now returns in ~2-3s instead of hanging. --- src/client/httplibs/juliaweb_http.jl | 57 +++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/src/client/httplibs/juliaweb_http.jl b/src/client/httplibs/juliaweb_http.jl index 57a5759..7db730b 100644 --- a/src/client/httplibs/juliaweb_http.jl +++ b/src/client/httplibs/juliaweb_http.jl @@ -290,10 +290,16 @@ function _http_streaming_request(ctx, method, url, headers, body, timeout, bytes open_kwargs = merge(open_kwargs, (; verbose=get(ctx.client.clntoptions, :verbose, false))) end + # Capture the streaming connection so the abort-on-close watcher below can + # unblock the read task when the consumer closes `stream_to`. (Mirrors the + # `:downloads` backend, which interrupts its download task on channel close.) + io_ref = Ref{Any}(nothing) + @sync begin - @async begin + read_task = @async begin try HTTP.open(method, url, headers; open_kwargs...) do io + io_ref[] = io write(io, body) captured_response[] = http_response = startread(io) try @@ -311,7 +317,15 @@ function _http_streaming_request(ctx, method, url, headers, body, timeout, bytes end catch ex close(output) - rethrow(ex) + # When the consumer closes `stream_to`, the watcher task below aborts + # this read (close(io) + a scheduled InterruptException); that surfaces + # here as an IO error or InterruptException. Swallow it so the streaming + # request returns normally instead of propagating a spurious error. A + # read timeout or genuine network error arrives while `stream_to` is + # still open (and is not our InterruptException), so it still throws. + if isopen(stream_to) && !isa(ex, InterruptException) + rethrow(ex) + end end end @@ -337,6 +351,45 @@ function _http_streaming_request(ctx, method, url, headers, body, timeout, bytes close(stream_to) end end + + @async begin + # Abort-on-close watcher: when the consumer closes `stream_to` (e.g. a + # k8s watch is stopped via `close(stream)`), abort the read task above so + # it unblocks immediately instead of hanging on the socket until the + # read-idle timeout. Mirrors the `:downloads` backend, which interrupts + # its download task on channel close. + try + while isopen(stream_to) + wait(stream_to) + yield() + end + catch ex + isa(ex, InvalidStateException) || rethrow(ex) + end + # Best-effort close of the connection. On HTTP/2 this alone does NOT wake a + # body read parked on the flow-control timer, so we also forcibly interrupt + # the read task (as `:downloads` does for non-interruptible downloads). + io = io_ref[] + # `io` may be `nothing` if the consumer closed before the connection was + # established; in practice a consumer only stops after receiving an event + # (or a timer fires seconds later), so the connection is already up. This + # is the same theoretical hole the `:downloads` watcher has. + if io !== nothing + try + close(io) + catch + # already closed / natural EOF — nothing to abort + end + end + # `stream_to` also closes on natural EOF (the chunk reader closes it after + # the read task finishes); the `istaskdone` guard skips the interrupt in + # that case. For JuliaRun's watch consumers an interrupt of an already- + # finishing read is harmless anyway — it maps to `is_request_interrupted`, + # the same signal a read-idle timeout produces, which they already retry on. + if !istaskdone(read_task) + schedule(read_task, InterruptException(); error=true) + end + end end return http_response, output