Skip to content

Comments

feat: Long timeout on actions to detect runaway Futures/CompletionStages#2383

Merged
johanandren merged 4 commits intomainfrom
wip-long-async-action-timeout
Nov 17, 2025
Merged

feat: Long timeout on actions to detect runaway Futures/CompletionStages#2383
johanandren merged 4 commits intomainfrom
wip-long-async-action-timeout

Conversation

@johanandren
Copy link
Contributor

In case an async reply or effect is returned that never completes, for example in a consumer, this will make sure it is logged as an error and retried, so that it hopefully is unstuck or at least shows up in logs.

Sample log output:

15:44:52.699 ERROR k.javasdk.impl.action.ActionsImpl - Failure during handling of command customer.action.CustomerByName.ProcessCustomerCreated
java.util.concurrent.TimeoutException: Command to action [CustomerByName] method [ProcessCustomerCreated], subject: [vip], sequence: [1] did not complete within 20 seconds
        at kalix.javasdk.impl.action.ActionsImpl.timeoutErrorFor(ActionsImpl.scala:193)
        at kalix.javasdk.impl.action.ActionsImpl.$anonfun$effectToResponse$2(ActionsImpl.scala:150)
        at akka.pattern.FutureTimeoutSupport.liftedTree1$1(FutureTimeoutSupport.scala:50)
        at akka.pattern.FutureTimeoutSupport.$anonfun$after$1(FutureTimeoutSupport.scala:50)
        at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:473)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:48)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:60)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:511)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1450)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2019)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)
15:44:52.716 WARN  kalix.javasdk.impl.DiscoveryImpl - Warning reported from Kalix system: KLX-00433 Eventing in service [customer.action.CustomerByName] is failing, will be retried.

Copy link
Contributor

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, with a small suggestion

.firstCompletedOf(
Seq(
futureEffect,
akka.pattern.after(actionTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, can there be a risk that we add many such timer tasks to the scheduler and they will not be done (removed) until after 1 hour (1000 per second would be 3.6 million)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. Maybe we need something smarter here, would be enough with one such timeout future per minute or maybe even fewer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added something that should create fewer timers and share them in 0b510bd


private val actionTimeout = system.settings.config.getDuration("kalix.action.timeout").toScala

@volatile private var previousTimeout: Option[(Instant, Future[Done])] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we create one instance of ActionsImpl per projection instance and keep that instance? not creating new instances of ActionsImpl for each request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActionsImpl is the actual gRPC service implementation, so it's even one for all actions in the same Kalix SDK service.

}
new TimeoutException(
s"Command to action [${service.actionClass.getOrElse(service.serviceName)}] method [${command.name}]${additionalDetails} did not complete within ${actionTimeout.toCoarsest}")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just thinking if there is an easier way? it's pretty cheap to add and cancel short lived scheduler tasks, so wonder if we could keep the previous Cancellable instead and when scheduling new we always cancel previous, since we know that it was handled when processing next message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even simpler: we can close over it as well, and cancel on completion of the other future. I'll do that instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewritten like that in 009faf1

effectToResponse(service, command, withSurroundingSideEffects, messageCodec)
}
.recover { case NonFatal(ex) =>
timeoutCancellable.cancel()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: safe to cancel an already cancelled

Copy link
Contributor

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@johanandren johanandren merged commit 2dea101 into main Nov 17, 2025
56 checks passed
@johanandren johanandren deleted the wip-long-async-action-timeout branch November 17, 2025 14:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants