Skip to content

Commit 80da4ca

Browse files
kelebraktoso
authored andcommitted
Issue 24519: Created method lazilyAsync for both DSLs and adds section in docs (#24568)
* Issue 24519: Created method lazilyAsync for both DSLs and adds section in docs. * Issue 24519: Changes according to code review * Issue 24519: Added unit tests * Update LazilyAsyncSpec.scala * Issue 24519: Added copyright
1 parent 11b6065 commit 80da4ca

File tree

4 files changed

+110
-0
lines changed

4 files changed

+110
-0
lines changed

akka-docs/src/main/paradox/stream/stages-overview.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,16 @@ Defers creation and materialization of a `Source` until there is demand.
201201

202202
---------------------------------------------------------------
203203

204+
### lazilyAsync
205+
206+
Defers creation and materialization of a `CompletionStage` until there is demand.
207+
208+
**emits** the future completes
209+
210+
**completes** after the future has completed
211+
212+
---------------------------------------------------------------
213+
204214
### actorRef
205215

206216
Materialize an `ActorRef`, sending messages to it will emit them on the stream. The actor contain
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
3+
*/
4+
package akka.stream.scaladsl
5+
6+
import java.util.concurrent.atomic.AtomicBoolean
7+
8+
import akka.Done
9+
import akka.stream.ActorMaterializer
10+
import akka.stream.testkit.{ StreamSpec, TestSubscriber }
11+
import akka.stream.testkit.Utils.assertAllStagesStopped
12+
import akka.testkit.DefaultTimeout
13+
import org.scalatest.concurrent.ScalaFutures
14+
15+
import scala.concurrent.Future
16+
17+
class LazilyAsyncSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
18+
19+
private implicit val mat: ActorMaterializer = ActorMaterializer()
20+
21+
import mat.executionContext
22+
23+
"A lazy async source" should {
24+
25+
"work in happy path scenario" in assertAllStagesStopped {
26+
val stream = Source.lazilyAsync { () Future(42) }.runWith(Sink.head)
27+
28+
stream.futureValue should ===(42)
29+
}
30+
31+
"call factory method on demand only" in assertAllStagesStopped {
32+
val probe = TestSubscriber.probe[Int]()
33+
val constructed = new AtomicBoolean(false)
34+
35+
val result = Source.lazilyAsync { () constructed.set(true); Future(42) }
36+
.runWith(Sink.fromSubscriber(probe))
37+
probe.cancel()
38+
39+
constructed.get() should ===(false)
40+
}
41+
42+
"fail materialized value when downstream cancels without ever consuming any element" in assertAllStagesStopped {
43+
val materialization = Source.lazilyAsync { () Future(42) }
44+
.toMat(Sink.cancelled)(Keep.left)
45+
.run()
46+
47+
intercept[RuntimeException] {
48+
materialization.futureValue
49+
}
50+
}
51+
52+
"materialize when the source has been created" in assertAllStagesStopped {
53+
val probe = TestSubscriber.probe[Int]()
54+
55+
val materialization: Future[Done] =
56+
Source.lazilyAsync { () Future(42) }
57+
.mapMaterializedValue(_.map(_ Done))
58+
.to(Sink.fromSubscriber(probe))
59+
.run()
60+
61+
materialization.value shouldEqual None
62+
probe.request(1)
63+
probe.expectNext(42)
64+
materialization.futureValue should ===(Done)
65+
66+
probe.cancel()
67+
}
68+
69+
"propagate failed future from factory" in assertAllStagesStopped {
70+
val probe = TestSubscriber.probe[Int]()
71+
val failure = new RuntimeException("too bad")
72+
val materialization = Source.lazilyAsync { () Future.failed(failure) }
73+
.to(Sink.fromSubscriber(probe))
74+
.run()
75+
76+
probe.request(1)
77+
probe.expectError(failure)
78+
}
79+
}
80+
}

akka-stream/src/main/scala/akka/stream/javadsl/Source.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,16 @@ object Source {
251251
def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] =
252252
scaladsl.Source.lazily[T, M](() create.create().asScala).mapMaterializedValue(_.toJava).asJava
253253

254+
/**
255+
* Creates a `Source` from supplied future factory that is not called until downstream demand. When source gets
256+
* materialized the materialized future is completed with the value from the factory. If downstream cancels or fails
257+
* without any demand the create factory is never called and the materialized `Future` is failed.
258+
*
259+
* @see [[Source.lazily]]
260+
*/
261+
def lazilyAsync[T](create: function.Creator[CompletionStage[T]]): Source[T, Future[NotUsed]] =
262+
scaladsl.Source.lazilyAsync[T](() create.create().toScala).asJava
263+
254264
/**
255265
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
256266
*/

akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,16 @@ object Source {
421421
def lazily[T, M](create: () Source[T, M]): Source[T, Future[M]] =
422422
Source.fromGraph(new LazySource[T, M](create))
423423

424+
/**
425+
* Creates a `Source` from supplied future factory that is not called until downstream demand. When source gets
426+
* materialized the materialized future is completed with the value from the factory. If downstream cancels or fails
427+
* without any demand the create factory is never called and the materialized `Future` is failed.
428+
*
429+
* @see [[Source.lazily]]
430+
*/
431+
def lazilyAsync[T](create: () Future[T]): Source[T, Future[NotUsed]] =
432+
lazily(() fromFuture(create()))
433+
424434
/**
425435
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
426436
*/

0 commit comments

Comments
 (0)