Skip to content

Instantly share code, notes, and snippets.

@fancellu
Last active December 18, 2025 13:12
Show Gist options
  • Select an option

  • Save fancellu/e51fadc133b5bd1b7097a20af91f7c65 to your computer and use it in GitHub Desktop.

Select an option

Save fancellu/e51fadc133b5bd1b7097a20af91f7c65 to your computer and use it in GitHub Desktop.
Example scio/beam usage inside of ZIO and Cats Effect

ZIO Beam WordCount (with added Cats Effect now!)

A simple word count program built with ZIO and Apache Beam/Scio that demonstrates retry logic and error handling.

Features

  • Counts words in a text file using Apache Beam
  • Built with ZIO for functional effect management or Cats Effect and cats-retry
  • Includes retry logic with fixed delays
  • Simulates random failures for testing resilience

Running the Program

From sbt terminal:

sbt "runMain beam.WordCountZIO --output=wc-output"
sbt "runMain beam.WordCountCatsEffect --output=wc-output"

From IntelliJ:

Run the WordCountZIO object directly with program arguments: --output=wc-output

Run the WordCountCatsEffect object directly with program arguments: --output=wc-output

Program Arguments

  • --input=<path> - Input file path (defaults to src/main/resources/kinglear.txt)
  • --output=<path> - Output directory (defaults to wc-output)
  • --runner=DirectRunner - Beam runner (optional, DirectRunner is default)

Behavior

The program:

  1. Generates a random number and fails 90% of the time to demonstrate retry logic
  2. Retries up to 10 times with 100 millisecond fixed delays
  3. Processes the input text file and counts word occurrences
  4. Outputs results to numbered part files in the specified directory

Output

Results are written as text files in the output directory with format: word: count

0.32632995
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.8723043
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.89009273
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.4717887
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.63323087
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.7908458
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.7886189
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.6163985
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.85410094
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.64915174
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.8425201
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
Task failed after all retries: Random failure occurred
timestamp=2025-12-03T16:35:28.393615Z level=ERROR thread=#zio-fiber-1804108919 message="" cause="java.lang.RuntimeException: Random failure occurred
at beam.WordCountZIO$.$anonfun$wordCount$9(WordCountZIO.scala:20)
at zio.ZIO$.$anonfun$fail$1(ZIO.scala:3252)
at zio.ZIO$.$anonfun$failCause$1(ZIO.scala:3263)
at beam.WordCountZIO.wordCount(WordCountZIO.scala:20)
at beam.WordCountZIO.wordCount(WordCountZIO.scala:19)
at beam.WordCountZIO.run(WordCountZIO.scala:10)
at beam.WordCountZIO.run(WordCountZIO.scala:11)
at beam.WordCountZIO.run(WordCountZIO.scala:12)
0.7614539
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.88121456
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.940289
Should work now
package beam
import com.spotify.scio._
import cats.effect.{ExitCode, IO, IOApp}
import cats.effect.std.Random
import retry._
import retry.RetryPolicies._
import scala.concurrent.duration._
object WordCountCatsEffect extends IOApp {
private def wordCount(rawArgs: List[String]) = {
for {
random <- Random.scalaUtilRandom[IO]
randomFloat <- random.nextFloat
_ <- IO.println(randomFloat)
shouldFailBool = randomFloat < 0.9f
_ <- IO.whenA(shouldFailBool)(IO.println("Should fail!") *> IO.raiseError(new RuntimeException("Random failure occurred")))
_ <- IO.println("Should work now")
result <- IO(ContextAndArgs(rawArgs.toArray))
(sc, args) = result
exampleData = "src/main/resources/kinglear.txt"
input = args.getOrElse("input", exampleData)
output = args.getOrElse("output", "wc-output")
_ <- IO(
sc.textFile(input)
.map(_.trim)
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.countByValue
.map(t => t._1 + ": " + t._2)
.saveAsTextFile(output))
scioResult <- IO(sc.run().waitUntilFinish())
} yield scioResult
}
override def run(args: List[String]): IO[ExitCode] =
retryingOnAllErrors(
policy = constantDelay[IO](100.millis).join(limitRetries[IO](10)),
onError = (err: Throwable, _: RetryDetails) => IO.println(s"Failed, retrying because $err ...")
)(wordCount(args))
.handleErrorWith((th: Throwable) => IO.println(s"Task failed after all retries: ${th.getMessage}") *> IO.raiseError(th))
.as(ExitCode.Success)
}
package beam
import com.spotify.scio._
import zio.Console.printLine
import zio._
object WordCountZIO extends ZIOAppDefault {
def run = for {
rawArgs <- getArgs // had to refactor due to scio 0.15.x changes to ScioContext ContextAndArgs
result <- wordCount(rawArgs.toArray)
.tapError(th => printLine(s"Failed, retrying because $th ..."))
.retry(Schedule.recurs(10).delayed(_ => 100.millis)).tapError(th => printLine(s"Task failed after all retries: ${th.getMessage}"))
} yield result
private def wordCount(rawArgs: Array[String]) = {
for {
randomFloat <- Random.nextFloat
_ <- printLine(randomFloat)
shouldFailBool = randomFloat < 0.9f
_ <- ZIO.when(shouldFailBool)(printLine("Should fail!")
*> ZIO.fail(new RuntimeException("Random failure occurred")))
_<- printLine("Should work now")
result <- ZIO.attempt(ContextAndArgs(rawArgs))
(sc, args) = result
exampleData = "src/main/resources/kinglear.txt"
input = args.getOrElse("input", exampleData)
output = args.getOrElse("output", "wc-output")
_ <-
ZIO.attempt(sc.textFile(input)
.map(_.trim)
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.countByValue
.map(t => t._1 + ": " + t._2)
.saveAsTextFile(output))
scioResult <- ZIO.attempt(sc.run.waitUntilFinish())
} yield scioResult
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment