Hi guys!
(Cross-posting this since I did not get a response in the Slick group.)
I use akka-http together with Slick to stream rows from a mysql database as
csv via http.
Streaming works as expected unless I disconnect the http connection
prematurely. In this case I would expect that the sql query should stop
executing since there is no downstream demand anymore. However, I can see
in the mysql processes that the query is still running and it only stops
when I shut down the webserver (or when the query finishes fetching all the
rows).
If I turn on debug logs I see repeatedly, while streaming:
22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending
streaming action with continuation (more data available)
22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG
slick.basic.BasicBackend.stream - Scheduling stream continuation after
transition from demand = 0
22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting
streaming action, realDemand = 8
Then, once I disconnect the connection, and there is no downstream demand
anymore, I see:
22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending
streaming action with continuation (more data available)
22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG
akka.io.TcpIncomingConnection - Closing connection due to IO error
java.io.IOException: Broken pipe
22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG
slick.basic.BasicBackend.stream - Scheduling stream continuation after
transition from demand = 0
22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting
streaming action, realDemand = oo
What is going on here? Why is realDemand oo?
My code looks something like this:
object Main extends App {
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
implicit val csvMarshaller =
Marshaller.withFixedContentType[CSVLine,
ByteString](ContentTypes.`text/csv(UTF-8)`) {
case CSVLine(line) => ByteString(line)
}
implicit val csvStreamingSupport = EntityStreamingSupport.csv()
.withParallelMarshalling(parallelism = 8, unordered = false)
val query = ??? // a Slick query
val publisher: DatabasePublisher[Tick] = DB.get.stream(
query.result.withStatementParameters(statementInit = DB.enableStream))
val routes = {
logRequestResult("webservice") {
encodeResponse {
pathPrefix("csv") {
pathSingleSlash {
get {
complete {
Source.fromPublisher(publisher).map(t => CSVLine(t.toCSV()))
}
}
}
}
}
}
}
Http().bindAndHandle(routes, "127.0.0.1", 9000)
}
object DB {
private val db = Database.forConfig("db")
def get = db
def enableStream(statement: java.sql.Statement): Unit = {
statement match {
case s if s.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl]) =>
s.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStreamingResults()
case _ => // do nothing
}
}
}
Any thoughts on this?
Mark
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.