Hi, Since I didn't get an answer on SO <https://stackoverflow.com/questions/51597056/streaming-to-fasthttp-requestctx>, though I'll try here :)
I'd like to let client do streaming API and using fasthttp as the backend. I don't want to expose the `fasthttp.RequestCtx` to the client and don't want to get a channel from the client function. Here's the code I have (the client function is `worker`), when I run it and then do `curl -iv http://localhost:8080` it gets stuck. On the server side I see 5 `WRITE: ...` prints. Debugging it seems like the server hangs in `s.writer.Flush()` Any ideas how to implement this? package main import ( "bufio" "fmt" "log" "github.com/valyala/fasthttp" ) var ( poem = []string{ "The Road goes ever on and on", "Down from the door where it began.", "Now far ahead the Road has gone,", "And I must follow, if I can,", "Pursuing it with eager feet,", "Until it joins some larger way", "Where many paths and errands meet.", "And whither then? I cannot say.", } ) // Streamer is streaming API type Streamer interface { // Write writes bytes to streamer Write(p []byte) (n int, err error) // Flush flushes data to the client Flush() error // SetStatusCode sets the status code. *Must* be called before Write and Flush SetStatusCode(statusCode int) error // SetHeader sets a response header. *Must* be called before Write and Flush SetHeader(key string, value interface{}) error } // HTTPStreamer implement Streamer with fasthttp type HTTPStreamer struct { ctx *fasthttp.RequestCtx done chan bool writer *bufio.Writer } // NewHTTPStreamer returns a new HTTPStreamer func NewHTTPStreamer(ctx *fasthttp.RequestCtx) *HTTPStreamer { return &HTTPStreamer{ ctx: ctx, } } // Close closes the stream func (s *HTTPStreamer) Close() { if s.done != nil { close(s.done) } } // Write writes bytes to streamer func (s *HTTPStreamer) Write(p []byte) (n int, err error) { if s.writer == nil { s.done = make(chan bool) ready := make(chan bool) go func() { s.ctx.SetBodyStreamWriter(func(w *bufio.Writer) { s.writer = w ready <- true // Signal that writer is set <-s.done // Wait for stream to be closed }) }() <-ready // Wait until writer is set } fmt.Printf("WRITE: %s", string(p)) return s.writer.Write(p) } // Flush flushes data to the client func (s *HTTPStreamer) Flush() error { if s.writer != nil { return s.writer.Flush() } return nil } // SetStatusCode sets the status code. *Must* be called before Write and Flush func (s *HTTPStreamer) SetStatusCode(statusCode int) error { if s.writer != nil { return fmt.Errorf("Streaming started - can't set status") } s.ctx.SetStatusCode(statusCode) return nil } // SetHeader sets a response header. *Must* be called before Write and Flush // value can be string or []byte func (s *HTTPStreamer) SetHeader(key string, value interface{}) error { if s.writer != nil { return fmt.Errorf("Streaming started - can't set header") } switch v := value.(type) { case string: s.ctx.Response.Header.Set(key, v) case []byte: s.ctx.Response.Header.SetBytesV(key, v) default: return fmt.Errorf("Unsupported header value type - %T", value) } return nil } func worker(s Streamer) { s.SetStatusCode(201) s.SetHeader("X-T", "VALUE") for _, line := range poem { s.Write([]byte(line + "\n")) s.Flush() } } func handler(ctx *fasthttp.RequestCtx) { s := NewHTTPStreamer(ctx) worker(s) s.Close() } func main() { srv := &fasthttp.Server{ Handler: handler, } if err := srv.ListenAndServe(":8080"); err != nil { log.Fatal(err) } } -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.
