Hi,
If an http.Request.WithContext is sent via an http.Client and then
cancelled via the context, the stream is not released.
Essentially, the read on the stream is never cancelled and the stream is
not removed from the set of streams on the ClientConn. This results not
only in "leaked" streams and associated data (request, response, context,
etc.), but also a new connection and associated goroutine when the streams
per connection is exhausted (default 250).
The suggested fix is in two parts:
1) Cancel the read by calling clientConnReadLoop.endStreamError
during clientStream.awaitRequestCancel instead of just
clientStream.bufPipe.CloseWithError
2) Remove the stream by calling something equivalent to
ClientConn.forgetStreamID during clientStream.cancelStream
I say equivalent because it's a touch wasteful to take and release the
mutex twice.
The following program demonstrates the issue:
--
package main
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"os"
"time"
"golang.org/x/net/http2"
)
func assert(err error) {
if err != nil {
panic(err)
}
}
type handler struct{}
func (h *handler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(200)
res.(http.Flusher).Flush()
<-req.Context().Done()
}
func makeServer() *http.Server {
httpServer := &http.Server{
Addr: ":" + os.Args[1],
}
httpServer.ConnState = func(_ net.Conn, state http.ConnState) {
if state == http.StateNew {
fmt.Println("connections++")
} else if state == http.StateClosed {
fmt.Println("connections--")
}
}
err := http2.ConfigureServer(httpServer, &http2.Server{})
assert(err)
httpServer.Handler = &handler{}
go func() {
err = httpServer.ListenAndServeTLS(os.Args[2], os.Args[3])
assert(err)
}()
return httpServer
}
func makeRequest(httpClient *http.Client) context.CancelFunc {
ctx, cancel := context.WithCancel(context.Background())
request, err := http.NewRequest("GET", "https://localhost:"+os.Args[1], nil)
assert(err)
request = request.WithContext(ctx)
response, err := httpClient.Do(request)
assert(err)
err = response.Body.Close()
assert(err)
return cancel
}
func makeClient() *http.Client {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
err := http2.ConfigureTransport(transport)
assert(err)
return &http.Client{Transport: transport}
}
func main() {
if len(os.Args) != 4 {
fmt.Println("Usage: ", os.Args[0], " <port> <cert> <key>")
os.Exit(1)
}
httpServer := makeServer()
time.Sleep(time.Second)
httpClient := makeClient()
for i := 0; i < 1001; i++ {
cancel := makeRequest(httpClient)
cancel()
if i%50 == 0 {
fmt.Println("Requests: ", i)
}
}
err := httpServer.Close()
assert(err)
}
--
The quick fix (with wasteful mutex locking) looks like:
diff --git a/http2/transport.go b/http2/transport.go
index 850d7ae..fc97566 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -221,7 +221,7 @@ type clientStream struct {
// request to be done (any way it might be removed from the cc.streams
// map: peer reset, successful completion, TCP connection breakage,
// etc)
-func (cs *clientStream) awaitRequestCancel(req *http.Request) {
+func (cs *clientStream) awaitRequestCancel(req *http.Request, rl
*clientConnReadLoop) {
ctx := reqContext(req)
if req.Cancel == nil && ctx.Done() == nil {
return
@@ -229,10 +229,10 @@ func (cs *clientStream) awaitRequestCancel(req
*http.Request) {
select {
case <-req.Cancel:
cs.cancelStream()
- cs.bufPipe.CloseWithError(errRequestCanceled)
+ rl.endStreamError(cs, errRequestCanceled)
case <-ctx.Done():
cs.cancelStream()
- cs.bufPipe.CloseWithError(ctx.Err())
+ rl.endStreamError(cs, ctx.Err())
case <-cs.done:
}
}
@@ -243,6 +243,8 @@ func (cs *clientStream) cancelStream() {
cs.didReset = true
cs.cc.mu.Unlock()
+ cs.cc.forgetStreamID(cs.ID)
+
if !didReset {
cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
}
@@ -1531,7 +1533,7 @@ func (rl *clientConnReadLoop) handleResponse(cs
*clientStream, f *MetaHeadersFra
cs.bufPipe = pipe{b: &dataBuffer{expected: res.ContentLength}}
cs.bytesRemain = res.ContentLength
res.Body = transportResponseBody{cs}
- go cs.awaitRequestCancel(cs.req)
+ go cs.awaitRequestCancel(cs.req, rl)
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip"
{
res.Header.Del("Content-Encoding")
Comments are most welcome on whether my understanding of the problem or
solution are lacking.
Thanks,
Ben
--
You received this message because you are subscribed to the Google Groups
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.