Skip to content

Instantly share code, notes, and snippets.

@fokot
Created January 29, 2026 10:47
Show Gist options
  • Select an option

  • Save fokot/5ef950f6e98d96339e95dfba117bc7bc to your computer and use it in GitHub Desktop.

Select an option

Save fokot/5ef950f6e98d96339e95dfba117bc7bc to your computer and use it in GitHub Desktop.
Stream usage

Stream usage

Backend calclutaion could take a long time until json is returned. So we want to return a space every minute, so connection does not die.

Here are two ways how it was done:

  • ControllerOld.scala + ResponseStreamHandler.scala is without using streams
  • ControllerNew.scala is using streams
class Controller {
def getReuslt = {
...
val responseStream = Source.future(resultFuture).merge(Source.tick(1.minute, 1.minute, " "), eagerComplete = true)
Future.successful(Ok.chunked(responseStream, Some("text/json")))
}
}
class Controller {
def getReuslt = {
...
Future.successful(Ok.chunked(
ResponseStreamHandler.handle(result.map(s => IOUtils.toInputStream(s, "UTF-8"))),
).as("text/json"))
}
}
import akka.stream.IOResult
import akka.stream.scaladsl.{ Source, StreamConverters }
import akka.util.ByteString
import java.io.InputStream
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future, TimeoutException }
object ResponseStreamHandler {
// Sends a single space ' ' every minute if result isn't ready. This doesn't break JSON so we're juuust fine.
def handle(future: Future[InputStream]): Source[ByteString, Future[IOResult]] = {
val toInputStream = () => {
new InputStream {
private val futureResultIS = future
private var resultISOpt = Option.empty[InputStream]
// This doesn't work as chunked can't simply send a single space this way.
override def read(): Int = throw new NotImplementedError
override def read(bytes: Array[Byte]): Int = {
try {
if (resultISOpt.isEmpty) {
resultISOpt = Some(Await.result(futureResultIS, Duration(1, TimeUnit.MINUTES)))
}
resultISOpt.get.read(bytes)
} catch {
case _: TimeoutException =>
bytes(0) = ' '.toByte
1
}
}
}
}
StreamConverters.fromInputStream(in = toInputStream)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment