Hi guys!

(Cross-posting this since I did not get a response in the Slick group.)

I use akka-http together with Slick to stream rows from a mysql database as 
csv via http.

Streaming works as expected unless I disconnect the http connection 
prematurely. In this case I would expect that the sql query should stop 
executing since there is no downstream demand anymore. However, I can see 
in the mysql processes that the query is still running and it only stops 
when I shut down the webserver (or when the query finishes fetching all the 
rows).

If I turn on debug logs I see repeatedly, while streaming:

22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending 
streaming action with continuation (more data available)
22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG 
slick.basic.BasicBackend.stream - Scheduling stream continuation after 
transition from demand = 0
22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting 
 streaming action, realDemand = 8

Then, once I disconnect the connection, and there is no downstream demand 
anymore, I see:

22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending 
streaming action with continuation (more data available)
22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG 
akka.io.TcpIncomingConnection - Closing connection due to IO error 
java.io.IOException: Broken pipe
22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG 
slick.basic.BasicBackend.stream - Scheduling stream continuation after 
transition from demand = 0
22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting 
 streaming action, realDemand = oo

What is going on here? Why is realDemand oo?

My code looks something like this:

object Main extends App { 
  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  implicit val csvMarshaller =
    Marshaller.withFixedContentType[CSVLine, 
ByteString](ContentTypes.`text/csv(UTF-8)`) {
      case CSVLine(line) => ByteString(line)
    }

  implicit val csvStreamingSupport = EntityStreamingSupport.csv()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  val query = ??? // a Slick query

  val publisher: DatabasePublisher[Tick] = DB.get.stream(
    query.result.withStatementParameters(statementInit = DB.enableStream))

  val routes = {
    logRequestResult("webservice") {
      encodeResponse {
        pathPrefix("csv") {
          pathSingleSlash {
            get {
              complete {
                Source.fromPublisher(publisher).map(t => CSVLine(t.toCSV()))
              }
            }
          }
        }
      }
    }
  }

  Http().bindAndHandle(routes, "127.0.0.1", 9000)
}

object DB {
  private val db = Database.forConfig("db")

  def get = db

  def enableStream(statement: java.sql.Statement): Unit = {
    statement match {
      case s if s.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl]) =>
        
s.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStreamingResults()
      case _ => // do nothing
    }
  }
}

Any thoughts on this?

Mark

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to