Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -323,41 +322,28 @@ private void updateClusterId(Admin adminClient) throws InterruptedException, Exe
*/
protected Collection<NewTopic> newTopics() {
Assert.state(this.applicationContext != null, "'applicationContext' cannot be null");
Map<String, NewTopic> newTopicsMap = new HashMap<>(
this.applicationContext.getBeansOfType(NewTopic.class, false, false));
Map<String, NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false);
AtomicInteger count = new AtomicInteger();
wrappers.forEach((name, newTopics) -> {
newTopics.getNewTopics().forEach(nt -> newTopicsMap.put(name + "#" + count.getAndIncrement(), nt));
});
Map<String, NewTopic> topicsForRetry = newTopicsMap.entrySet().stream()
.filter(entry -> entry.getValue() instanceof TopicForRetryable)
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
for (Entry<String, NewTopic> entry : topicsForRetry.entrySet()) {
Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator();
boolean remove = false;
while (iterator.hasNext()) {
Entry<String, NewTopic> nt = iterator.next();
// if we have a NewTopic and TopicForRetry with the same name, remove the latter
if (nt.getValue().name().equals(entry.getValue().name())
&& !(nt.getValue() instanceof TopicForRetryable)) {

remove = true;
break;
}
}
if (remove) {
newTopicsMap.remove(entry.getKey());
}
}
Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, NewTopic> next = iterator.next();
if (!this.createOrModifyTopic.test(next.getValue())) {
iterator.remove();
}
}
return new ArrayList<>(newTopicsMap.values());

// Deal with List<NewTopic> directly instead of Map (no need for bean names)
List<NewTopic> newTopicsList = new ArrayList<>(
this.applicationContext.getBeansOfType(NewTopic.class, false, false).values());

// Add topics from NewTopics wrappers (no need for bean names either)
this.applicationContext.getBeansOfType(NewTopics.class, false, false).values()
.forEach(wrapper -> newTopicsList.addAll(wrapper.getNewTopics()));

// Collect normal topic names to check against TopicForRetryable
Set<String> normalNames = newTopicsList.stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just nit-pick: I don't think "normal" is a proper term to use.
And not only here.
It raises a question like what is the "abnormal"?
Is a TopicForRetryable abnormal?

I would suggest to go with a "regular" instead.
This way a TopicForRetryable is really "unregular" and that does not cause extra questions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much!
I hadn't thought about it that way, but your point makes a lot of sense.
I'll update the code to reflect your feedback.

.filter(nt -> !(nt instanceof TopicForRetryable))
.map(NewTopic::name)
.collect(Collectors.toSet());

// Remove TopicForRetryable if there's a regular NewTopic with same name
newTopicsList.removeIf(nt -> nt instanceof TopicForRetryable && normalNames.contains(nt.name()));

// Apply predicate filter
newTopicsList.removeIf(nt -> !this.createOrModifyTopic.test(nt));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot we combine both conditions in the same removeIf()?
This way we would perform only one loop for removal logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I'm curious—should these kinds of cases always be combined into one? I separated them intentionally for readability. In what situations is it better to merge them, and what are your thoughts on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I wouldn't merge then if you would not raise a performance concern 😄


return newTopicsList;
}

@Override
Expand Down