Skip to content

Conversation

@pongad
Copy link
Contributor

@pongad pongad commented Sep 27, 2018

CC @csainty

This should let us clear through backlogs more quickly.

@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Sep 27, 2018
if (ackHandler.totalExpiration.isBefore(now())) {
// Message expired while waiting. We don't extend these messages anymore,
// so it was probably sent to someone else. Don't work on it.
// Don't nack it either, because we'd be nacking someone else's message.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


// forget removes from pendingMessages; this is OK, concurrent maps can
// handle concurrent iterations and modifications.
entry.getValue().forget();

This comment was marked as spam.

This comment was marked as spam.

@csainty
Copy link

csainty commented Sep 27, 2018

Generally like it, nice clean approach 👍

Won't handle a 0 extension period though will it? Since totalExpiration will be set to receive time

final Subscriber subscriber = Subscriber.newBuilder(subscriptionName, (msg, ack) -> {
    // do something
  })
  .setMaxAckExtensionPeriod(Duration.ZERO)
  .build();

@pongad
Copy link
Contributor Author

pongad commented Sep 27, 2018

That's right. This won't handle the case where max ack extension is zero. I'm a little concerned by this; this essentially changes the behavior from "we won't extend these" to "we'll never work on these". I'll make a small fix for this.

@csainty
Copy link

csainty commented Sep 28, 2018

👍
I pulled this morning and ran my sample project with the build, hits exactly-once consumption. Thanks for looking in to it.

I think this will fix #3383 and #2465
Also make it easier to configure for https://github.com/GoogleCloudPlatform/google-cloud-java/issues/3152 by tweaking the max deadline so all the messages don't pull to a single subscriber and stay there, as currently reducing the max deadline to prevent this gets you in to double processing sadness.

@pongad
Copy link
Contributor Author

pongad commented Sep 28, 2018

Don't merge this yet. Someone from pubsub should take a look.

@Override
public void run() {
try {
if (ackHandler.totalExpiration.plusSeconds(messageDeadlineSeconds.get()).isBefore(now())) {

This comment was marked as spam.

This comment was marked as spam.

@pongad pongad requested a review from a team as a code owner October 18, 2018 00:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla: yes This human has signed the Contributor License Agreement.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants