background-image: url(../img/fp-tower/website-background.svg) class: center, middle, white .title[Concurrent IO] --- class: medium-code # fromTwoClients
```scala def searchByClient(client: SearchFlightClient) = client .search(from, to, date) .handleErrorWith(_ => IO(Nil)) for { flights1 <- searchByClient(client1) flights2 <- searchByClient(client2) } yield SearchResult(flights1 ++ flights2) ``` ] --- class: medium-code # fromTwoClients
```scala def searchByClient(client: SearchFlightClient) = `client` `.search(from, to, date)` .handleErrorWith(_ => IO(Nil)) for { flights1 <- searchByClient(client1) flights2 <- searchByClient(client2) } yield SearchResult(flights1 ++ flights2) ``` ] --- class: medium-code # fromTwoClients
```scala def searchByClient(client: SearchFlightClient) = client .search(from, to, date) .handleErrorWith(_ => IO(Nil)) for { flights1 `<-` searchByClient(client1) flights2 `<-` searchByClient(client2) } yield SearchResult(flights1 ++ flights2) ``` ] --- class: medium-code # fromTwoClients is sequential
```scala def searchByClient(client: SearchFlightClient) = client .search(from, to, date) .handleErrorWith(_ => IO(Nil)) for { flights1 <- searchByClient(client1) flights2 <- searchByClient(client2) } yield SearchResult(flights1 ++ flights2) ``` ] .forty-seven-right[.center[
]] --- class: medium-code # For comprehensions are sequential
```scala def searchByClient(client: SearchFlightClient) = client .search(from, to, date) .handleErrorWith(_ => IO(Nil)) for { `flights1` <- searchByClient(client1) flights2 <- `searchByClient(client2)` } yield SearchResult(flights1 ++ flights2) ``` ] .forty-seven-right[.center[
]] --- class: medium-code # map and flatMap are sequential
```scala searchByClient(client1).flatMap { `flights1` => searchByClient(client2).map { `flights2` => SearchResult(flights1 ++ flights2) } } ``` ] .forty-seven-right[.center[
]] --- class: medium-code # Concurrent IOs .seventy-seven-left[ ```scala trait IO[A] { // Runs both the current IO and "other" concurrently, // then combine their results into a tuple def parZip[Other](other: IO[Other])(ec: ExecutionContext): IO[(A, Other)] = ??? } ``` ] --- class: medium-code # Concurrent IOs .seventy-seven-left[ ```scala trait IO[A] { // Runs both the current IO and "other" `concurrently`, // then combine their results into a tuple def parZip[Other](other: IO[Other])(ec: ExecutionContext): IO[(A, Other)] = ??? // Runs both the current IO and "other" `sequentially`, // then combine their results into a tuple def zip[Other](other: IO[Other]): IO[(A, Other)] = for { first <- this second <- other } yield (first, second) } ``` ] --- class: medium-code # Sequential vs Concurrent .seventy-two-left[ ```scala def stream(taskName: String, iteration: Int, duration: FiniteDuration) = List.range(0, iteration).traverse { n => IO.debug(s"Task $taskName $n") andThen IO.sleep(duration) } val streamA = stream("A", 4, 200.millis) ``` ] .seventy-two-left[ ```scala streamA.unsafeRun() // Task A 0 // Task A 1 // Task A 2 // Task A 3 ``` ] --- class: medium-code # .hl2[Sequential] vs Concurrent .seventy-two-left[ ```scala def stream(taskName: String, iteration: Int, duration: FiniteDuration) = List.range(0, iteration).traverse { n => IO.debug(s"Task $taskName $n") andThen IO.sleep(duration) } val streamA = stream("A", 4, 200.millis) val streamB = stream("B", 3, 300.millis) ``` ] .seventy-two-left[ ```scala streamA.`zip`(streamB).unsafeRun() // Task A 0 // Task A 1 // Task A 2 // Task A 3 // Task B 0 // Task B 1 // Task B 2 ``` ] --- class: medium-code # .hl2[Sequential] vs Concurrent .seventy-two-left[ ```scala def stream(taskName: String, iteration: Int, duration: FiniteDuration) = List.range(0, iteration).traverse { n => IO.debug(s"Task $taskName $n") andThen IO.sleep(duration) } val streamA = stream("A", 4, 200.millis) val streamB = stream("B", 3, 300.millis) ``` ] .seventy-two-left[ ```scala // Task` A `0 // Task` A `1 // Task` A `2 // Task` A `3 // Task` B `0 // Task` B `1 // Task` B `2 ``` ] --- class: medium-code # Sequential vs .hl2[Concurrent] .seventy-two-left[ ```scala def stream(taskName: String, iteration: Int, duration: FiniteDuration) = List.range(0, iteration).traverse { n => IO.debug(s"Task $taskName $n") andThen IO.sleep(duration) } val streamA = stream("A", 4, 200.millis) val streamB = stream("B", 3, 300.millis) ``` ] .seventy-two-left[ ```scala streamA.`parZip`(streamB)(ec).unsafeRun() // Task A 0 // Task B 0 // Task A 1 // Task B 1 // Task A 2 // Task A 3 // Task B 2 ``` ] --- class: medium-code # Sequential vs .hl2[Concurrent] .seventy-two-left[ ```scala def stream(taskName: String, iteration: Int, duration: FiniteDuration) = List.range(0, iteration).traverse { n => IO.debug(s"Task $taskName $n") andThen IO.sleep(duration) } val streamA = stream("A", 4, 200.millis) val streamB = stream("B", 3, 300.millis) ``` ] .seventy-two-left[ ```scala streamA.parZip(streamB)(ec).unsafeRun() // Task` A `0 // Task` B `0 // Task` A `1 // Task` B `1 // Task` A `2 // Task` A `3 // Task` B `2 ``` ] --- class: medium-code # How to implement parZip? .seventy-seven-left[ ```scala trait IO[A] { // Runs both the current IO and "other" concurrently, // then combine their results into a tuple def parZip[Other](other: IO[Other])(ec: ExecutionContext): IO[(A, Other)] = `???` } ``` ] --- class: medium-code # Futures .eighty-two-left[ ```scala trait SearchFlightClient { def search(from: Airport, to: Airport, date: LocalDate): `Future`[List[Flight]] } ``` ] --- class: medium-code # Concurrent Futures .eighty-two-left[ ```scala trait SearchFlightClient { def search(from: Airport, to: Airport, date: LocalDate): Future[List[Flight]] } ``` ] .forty-seven-left[
```scala val search1 =, to, date) val search2 =, to, date) ``` ] .fifty-two-right[.center[
]] --- class: medium-code # Concurrent Futures .eighty-two-left[ ```scala trait SearchFlightClient { def search(from: Airport, to: Airport, date: LocalDate): Future[List[Flight]] } ``` ] .forty-seven-left[
```scala val search1 =, to, date) val search2 =, to, date) for { flights1 <- search1 flights2 <- search2 } yield SearchResult(flights1 ++ flights2) ``` ] .fifty-two-right[.center[
]] --- class: medium-code # Concurrent Futures .eighty-two-left[ ```scala trait SearchFlightClient { def search(from: Airport, to: Airport, date: LocalDate): Future[List[Flight]] } ``` ] .forty-seven-left[
```scala val search1 =, to, date) val search2 =, to, date) for { flights1 <- search1 flights2 <- search2 } yield SearchResult(flights1 ++ flights2) ``` ] .fifty-two-right[.center[
]] --- class: medium-code # Sequential Futures .eighty-two-left[ ```scala trait SearchFlightClient { def search(from: Airport, to: Airport, date: LocalDate): Future[List[Flight]] } ``` ] .forty-seven-left[
```scala for { flights1 <-, to, date) flights2 <-, to, date) } yield SearchResult(flights1 ++ flights2) ``` ] .fifty-two-right[.center[
]] --- class: medium-code # Concurrent Futures with zip .eighty-two-left[ ```scala trait SearchFlightClient { def search(from: Airport, to: Airport, date: LocalDate): Future[List[Flight]] } ``` ] .forty-seven-left[
```scala, to, date) .zip(, to, date)) .map { case (flights1, flights2) => SearchResult(flights1 ++ flights2) } ``` ] .fifty-two-right[.center[
]] --- class: medium-code # Concurrent Futures with zip .eighty-two-left[ ```scala trait SearchFlightClient { def search(from: Airport, to: Airport, date: LocalDate): Future[List[Flight]] } ``` ] .forty-seven-left[
```scala val search1 =, to, date) val search2 =, to, date) search1 .zip(search2) .map { case (flights1, flights2) => SearchResult(flights1 ++ flights2) } ``` ] .fifty-two-right[.center[
]] --- background-image: url(../img/fp-tower/website-background-white.svg) class: middle, white ## package exercises.action.fp # IO.scala --- class: medium-code # parZip is blocking
.fifty-seven-left[ ```scala trait IO[A] { // Runs both the current IO and "other" concurrently, // then combine their results into a tuple def parZip[Other](other: IO[Other]) (ec: ExecutionContext) = IO { val future1 = Future { this.unsafeRun() }(ec) val future2 = Future { other.unsafeRun() }(ec) val combined = `Await.result`(combined, Duration.Inf) } } ``` ] -- .forty-two-right[
] --- class: medium-code # parSequence calls parZip recursively .fifty-seven-left[
```scala def parSequence[A](actions: List[IO[A]]) (ec: ExecutionContext): IO[List[A]] = actions.`foldLeft`(IO(List.empty[A]))( (state, action) => state.`parZip`(action)(ec).map { case (list, value) => value :: list } ).map(_.reverse) ``` ] .forty-two-right[
] --- class: medium-code # parSequence calls parZip recursively .fifty-seven-left[
```scala def parSequence[A](actions: List[IO[A]]) (ec: ExecutionContext): IO[List[A]] = actions.`foldLeft`(IO(List.empty[A]))( (state, action) => state.`parZip`(action)(ec).map { case (list, value) => value :: list } ).map(_.reverse) ``` ] .forty-two-right[
] --- class: medium-code # parSequence can cause a deadlock .fifty-seven-left[
```scala def parSequence[A](actions: List[IO[A]]) (ec: ExecutionContext): IO[List[A]] = actions.`foldLeft`(IO(List.empty[A]))( (state, action) => state.`parZip`(action)(ec).map { case (list, value) => value :: list } ).map(_.reverse) ``` ] .forty-two-right[
] --- # Blocking kills performance
--- # Blocking kills performance
--- class: middle, center # Can we make parZip non-blocking? --- class: medium-code # unsafeRun is blocking .fifty-two-left[ ```scala trait IO[A] { def unsafeRun(): A } ``` ```scala val ioUser = db.getUser(1234) // ioUser: IO[User] = IO$$anon$1@39ba5a14 val ioName = // ioName: IO[String] = IO$$anon$1@39ba5c89 val name = ioName.unsafeRun() // name: String = Bob ``` ] --- # Future is non-blocking .sixty-two-left[ ```scala val futureUser = db.getUser(1234) // futureUser: Future[User] = Future(
) val futureName = // futureName: Future[String] = Future(
) ``` ] -- .sixty-two-left[ ```scala trait Future[A] { def onComplete(callback: Try[A] => Unit): Unit } ``` ] -- .sixty-two-left[ ```scala futureName.onComplete { case Failure(error) => println("Something went wrong") case Success(name) => println(s"Username is name") } ``` ] --- class: medium-code # Non-blocking IO .fifty-two-left[ ```scala trait IO[A] { def onComplete(callback: Try[A] => Unit): Unit } ``` ] --- class: medium-code # Non-blocking IO .fifty-two-left[ ```scala trait IO[A] { // executes an IO `asyn`chronously def unsafeRunAsync(callback: Try[A] => Unit): Unit // executes an IO `sync`hronously def unsafeRun(): A } ``` ] --- class: medium-code
.forty-two-left[ # Direct Style ```scala def getUser(userId: Long): User ``` ```scala val user1 = getUser(1234) val user2 = getUser(5555) println(s"Users are $user1 and $user2") ``` ] .fifty-seven-right[ # Continuation Passing Style ```scala def getUser(userId: Long)(callback: User => Unit): Unit ``` ```scala getUser(1234) { user1 => getUser(555) { user2 => println(s"Users are $user1 and $user2") } } ``` ] --- class: medium-code # Independent IO in a for comprehension .fifty-two-left[ ```scala for { flights1 <- searchByClient(client1) flights2 <- searchByClient(client2) } yield SearchResult(flights1 ++ flights2) ``` ]