Hi,

If an http.Request.WithContext is sent via an http.Client and then 
cancelled via the context, the stream is not released.

Essentially, the read on the stream is never cancelled and the stream is 
not removed from the set of streams on the ClientConn.  This results not 
only in "leaked" streams and associated data (request, response, context, 
etc.), but also a new connection and associated goroutine when the streams 
per connection is exhausted (default 250).

The suggested fix is in two parts:
1) Cancel the read by calling clientConnReadLoop.endStreamError 
during clientStream.awaitRequestCancel instead of just 
clientStream.bufPipe.CloseWithError
2) Remove the stream by calling something equivalent to 
ClientConn.forgetStreamID during clientStream.cancelStream

I say equivalent because it's a touch wasteful to take and release the 
mutex twice.

The following program demonstrates the issue:

--

package main

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"os"
"time"

"golang.org/x/net/http2"
)

func assert(err error) {
if err != nil {
panic(err)
}
}

type handler struct{}

func (h *handler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(200)
res.(http.Flusher).Flush()
<-req.Context().Done()
}

func makeServer() *http.Server {
httpServer := &http.Server{
Addr: ":" + os.Args[1],
}

httpServer.ConnState = func(_ net.Conn, state http.ConnState) {
if state == http.StateNew {
fmt.Println("connections++")
} else if state == http.StateClosed {
fmt.Println("connections--")
}
}

err := http2.ConfigureServer(httpServer, &http2.Server{})
assert(err)

httpServer.Handler = &handler{}

go func() {
err = httpServer.ListenAndServeTLS(os.Args[2], os.Args[3])
assert(err)
}()

return httpServer
}

func makeRequest(httpClient *http.Client) context.CancelFunc {
ctx, cancel := context.WithCancel(context.Background())
request, err := http.NewRequest("GET", "https://localhost:"+os.Args[1], nil)
assert(err)

request = request.WithContext(ctx)
response, err := httpClient.Do(request)
assert(err)

err = response.Body.Close()
assert(err)

return cancel
}

func makeClient() *http.Client {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}

err := http2.ConfigureTransport(transport)
assert(err)

return &http.Client{Transport: transport}
}

func main() {
if len(os.Args) != 4 {
fmt.Println("Usage: ", os.Args[0], " <port> <cert> <key>")
os.Exit(1)
}

httpServer := makeServer()
time.Sleep(time.Second)
httpClient := makeClient()

for i := 0; i < 1001; i++ {
cancel := makeRequest(httpClient)
cancel()
if i%50 == 0 {
fmt.Println("Requests: ", i)
}
}

err := httpServer.Close()
assert(err)
}

--

The quick fix (with wasteful mutex locking) looks like:

diff --git a/http2/transport.go b/http2/transport.go
index 850d7ae..fc97566 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -221,7 +221,7 @@ type clientStream struct {
 // request to be done (any way it might be removed from the cc.streams
 // map: peer reset, successful completion, TCP connection breakage,
 // etc)
-func (cs *clientStream) awaitRequestCancel(req *http.Request) {
+func (cs *clientStream) awaitRequestCancel(req *http.Request, rl 
*clientConnReadLoop) {
        ctx := reqContext(req)
        if req.Cancel == nil && ctx.Done() == nil {
                return
@@ -229,10 +229,10 @@ func (cs *clientStream) awaitRequestCancel(req 
*http.Request) {
        select {
        case <-req.Cancel:
                cs.cancelStream()
-               cs.bufPipe.CloseWithError(errRequestCanceled)
+               rl.endStreamError(cs, errRequestCanceled)
        case <-ctx.Done():
                cs.cancelStream()
-               cs.bufPipe.CloseWithError(ctx.Err())
+               rl.endStreamError(cs, ctx.Err())
        case <-cs.done:
        }
 }
@@ -243,6 +243,8 @@ func (cs *clientStream) cancelStream() {
        cs.didReset = true
        cs.cc.mu.Unlock()
 
+       cs.cc.forgetStreamID(cs.ID)
+
        if !didReset {
                cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
        }
@@ -1531,7 +1533,7 @@ func (rl *clientConnReadLoop) handleResponse(cs 
*clientStream, f *MetaHeadersFra
        cs.bufPipe = pipe{b: &dataBuffer{expected: res.ContentLength}}
        cs.bytesRemain = res.ContentLength
        res.Body = transportResponseBody{cs}
-       go cs.awaitRequestCancel(cs.req)
+       go cs.awaitRequestCancel(cs.req, rl)
 
        if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" 
{
                res.Header.Del("Content-Encoding")

Comments are most welcome on whether my understanding of the problem or 
solution are lacking.

Thanks,

Ben

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to