Skip to content

Commit 217fbc6

Browse files
committed
Polishing.
Introduce CollectionName abstraction. Reduce duplications between BulkWriter and ReactiveBulkWriter. Use PersistentEntity instead of a Class and Entity mix. Remove inCollection(…).update style in favor of capturing inCollection(…, it -> update) style to avoid two programming models. Remove typing to allow usage of Document and entities with the bulk API.
1 parent f99cde9 commit 217fbc6

29 files changed

Lines changed: 1205 additions & 965 deletions

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoClusterCapable.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,19 @@
1515
*/
1616
package org.springframework.data.mongodb;
1717

18+
import com.mongodb.client.MongoCluster;
19+
1820
/**
1921
* Interface that can provide access to a MongoDB cluster.
2022
*
21-
* @param <T> the MongoDB cluster/client type (e.g. {@link com.mongodb.client.MongoCluster} or
22-
* {@link com.mongodb.reactivestreams.client.MongoCluster}).
2323
* @author Christoph Strobl
2424
* @since 5.1
2525
*/
26-
public interface MongoClusterCapable<T> {
26+
public interface MongoClusterCapable {
2727

2828
/**
29-
* Returns the MongoDB cluster used by this factory.
30-
*
31-
* @return the cluster; never {@literal null}.
32-
* @throws IllegalStateException if cluster cannot be obtained.
29+
* Return the {@link com.mongodb.client.MongoCluster} associated with this component.
3330
*/
34-
T getMongoCluster();
31+
MongoCluster getMongoCluster();
32+
3533
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb;
17+
18+
import com.mongodb.reactivestreams.client.MongoCluster;
19+
20+
/**
21+
* Interface that can provide access to a MongoDB cluster.
22+
*
23+
* @author Christoph Strobl
24+
* @since 5.1
25+
*/
26+
public interface ReactiveMongoClusterCapable {
27+
28+
/**
29+
* Return the {@link MongoCluster} associated with this component.
30+
*/
31+
MongoCluster getMongoCluster();
32+
33+
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriter.java

Lines changed: 21 additions & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,11 @@
1515
*/
1616
package org.springframework.data.mongodb.core;
1717

18-
import java.util.ArrayList;
19-
import java.util.List;
2018
import java.util.Set;
21-
import java.util.function.Function;
22-
import java.util.stream.Collectors;
2319

2420
import org.bson.Document;
21+
2522
import org.springframework.dao.DataAccessException;
26-
import org.springframework.data.mongodb.core.MongoTemplate.SourceAwareDocument;
2723
import org.springframework.data.mongodb.core.QueryOperations.DeleteContext;
2824
import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
2925
import org.springframework.data.mongodb.core.bulk.Bulk;
@@ -35,40 +31,36 @@
3531
import org.springframework.data.mongodb.core.bulk.BulkOperation.Update;
3632
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst;
3733
import org.springframework.data.mongodb.core.bulk.BulkOperationContext.TypedNamespace;
38-
import org.springframework.data.mongodb.core.bulk.BulkWriteResult;
3934
import org.springframework.data.mongodb.core.bulk.BulkWriteOptions;
35+
import org.springframework.data.mongodb.core.bulk.BulkWriteResult;
4036
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
4137
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
42-
import org.springframework.util.StringUtils;
4338

4439
import com.mongodb.MongoBulkWriteException;
4540
import com.mongodb.MongoNamespace;
4641
import com.mongodb.client.model.DeleteOptions;
47-
import com.mongodb.client.model.InsertOneModel;
4842
import com.mongodb.client.model.UpdateOptions;
49-
import com.mongodb.client.model.WriteModel;
5043
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
5144
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
52-
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
5345

5446
/**
5547
* Internal API wrapping a {@link MongoTemplate} to encapsulate {@link Bulk} handling.
5648
*
5749
* @author Christoph Strobl
5850
* @since 5.1
5951
*/
60-
class BulkWriter {
52+
class BulkWriter extends BulkWriterSupport {
6153

62-
MongoTemplate template;
54+
private final MongoTemplate template;
6355

6456
BulkWriter(MongoTemplate template) {
57+
super(template.getEntityOperations(), template.getQueryOperations(), template.getConverter().getMappingContext());
6558
this.template = template;
6659
}
6760

6861
public BulkWriteResult write(String defaultDatabase, Bulk bulk, BulkWriteOptions options) {
6962

70-
Set<TypedNamespace> namespaces = bulk.operations().stream().map(it -> it.context().namespace())
71-
.collect(Collectors.toSet());
63+
Set<TypedNamespace> namespaces = getTypedNamespaces(bulk);
7264
if (namespaces.size() == 1) {
7365
return writeToSingleCollection(defaultDatabase, bulk, options, namespaces.iterator().next());
7466
}
@@ -79,8 +71,7 @@ private BulkWriteResult writeToSingleCollection(String defaultDatabase, Bulk bul
7971
BulkWriteOptions options, TypedNamespace namespace) {
8072

8173
MongoNamespace mongoNamespace = new MongoNamespace(defaultDatabase,
82-
StringUtils.hasText(namespace.collection()) ? namespace.collection()
83-
: template.getCollectionName(namespace.type()));
74+
resolveCollectionName(namespace));
8475

8576
SingleCollectionCollector collector = new SingleCollectionCollector(mongoNamespace);
8677
buildWriteModels(bulk, collector);
@@ -132,202 +123,53 @@ private BulkWriteResult writeToMultipleCollections(String defaultDatabase, Bulk
132123
}
133124
}
134125

135-
private void buildWriteModels(Bulk bulk, WriteModelCollector<?> collector) {
126+
private void buildWriteModels(Bulk bulk, WriteModelCollector collector) {
136127

137128
for (BulkOperation bulkOp : bulk.operations()) {
138129

139-
MongoNamespace namespace = collector.resolveNamespace(bulkOp, it -> template.getCollectionName(it.type()));
130+
MongoNamespace namespace = collector.resolveNamespace(resolveCollectionName(bulkOp));
131+
MongoPersistentEntity<?> entity = getPersistentEntity(bulkOp.context());
140132

141133
if (bulkOp instanceof Insert insert) {
142134

143135
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(namespace.getCollectionName(),
144-
insert.value(), template.getConverter());
136+
insert.value());
145137
collector.addInsert(namespace, sourceAwareDocument.document(), sourceAwareDocument);
146138
} else if (bulkOp instanceof Update update) {
147139

148-
Class<?> domainType = update.context().namespace().type();
149140
boolean multi = !(bulkOp instanceof UpdateFirst);
150141

151-
UpdateContext updateContext = template.getQueryOperations().updateContext(update.update(), update.query(),
142+
UpdateContext updateContext = queryOperations.updateContext(update.update(), update.query(),
152143
update.upsert());
153-
MongoPersistentEntity<?> entity = template.getPersistentEntity(domainType);
154144

155145
Document mappedQuery = updateContext.getMappedQuery(entity);
156-
Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(domainType)
146+
Object mappedUpdate = updateContext.isAggregationUpdate() ? updateContext.getUpdatePipeline(entity)
157147
: updateContext.getMappedUpdate(entity);
158-
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query());
148+
UpdateOptions updateOptions = updateContext.getUpdateOptions(entity, update.query());
159149

160150
collector.addUpdate(namespace, multi, mappedQuery, mappedUpdate, updateOptions);
161151
} else if (bulkOp instanceof Remove remove) {
162152

163-
Class<?> domainType = remove.context().namespace().type();
164-
DeleteContext deleteContext = template.getQueryOperations().deleteQueryContext(remove.query());
165-
166-
Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType));
167-
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType);
153+
DeleteContext deleteContext = queryOperations.deleteQueryContext(remove.query());
154+
Document mappedQuery = deleteContext.getMappedQuery(entity);
155+
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(entity);
168156

169157
collector.addRemove(namespace, remove instanceof RemoveFirst, mappedQuery, deleteOptions);
170158
} else if (bulkOp instanceof Replace replace) {
171159

172-
Class<?> domainType = replace.context().namespace().type();
173-
174160
SourceAwareDocument<Object> sourceAwareDocument = template.prepareObjectForSave(namespace.getCollectionName(),
175-
replace.replacement(), template.getConverter());
161+
replace.replacement());
176162

177-
UpdateContext updateContext = template.getQueryOperations().replaceSingleContext(replace.query(),
163+
UpdateContext updateContext = queryOperations.replaceSingleContext(replace.query(),
178164
MappedDocument.of(sourceAwareDocument.document()), replace.upsert());
179165

180-
Document mappedQuery = updateContext.getMappedQuery(template.getPersistentEntity(domainType));
181-
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, replace.query());
166+
Document mappedQuery = updateContext.getMappedQuery(entity);
167+
UpdateOptions updateOptions = updateContext.getUpdateOptions(entity, replace.query());
182168

183169
collector.addReplace(namespace, mappedQuery, sourceAwareDocument.document(), updateOptions,
184170
sourceAwareDocument);
185171
}
186172
}
187173
}
188174

189-
private interface WriteModelCollector<T> {
190-
191-
MongoNamespace resolveNamespace(String collectionName);
192-
193-
default MongoNamespace resolveNamespace(BulkOperation operation, Function<TypedNamespace, String> fallback) {
194-
195-
TypedNamespace typedNamespace = operation.context().namespace();
196-
if (StringUtils.hasText(typedNamespace.collection())) {
197-
return resolveNamespace(typedNamespace.collection());
198-
}
199-
200-
return resolveNamespace(fallback.apply(typedNamespace));
201-
}
202-
203-
void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc);
204-
205-
void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update, UpdateOptions options);
206-
207-
void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options);
208-
209-
void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
210-
SourceAwareDocument<Object> sourceDoc);
211-
212-
List<SourceAwareDocument<Object>> getAfterSaveCallables();
213-
}
214-
215-
private static class SingleCollectionCollector implements WriteModelCollector<WriteModel<Document>> {
216-
217-
private final List<WriteModel<Document>> writeModels = new ArrayList<>();
218-
private final List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
219-
private MongoNamespace namespace;
220-
221-
public SingleCollectionCollector(MongoNamespace namespace) {
222-
this.namespace = namespace;
223-
}
224-
225-
@Override
226-
public MongoNamespace resolveNamespace(String collectionName) {
227-
return namespace;
228-
}
229-
230-
@Override
231-
public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc) {
232-
writeModels.add(new InsertOneModel<>(document));
233-
afterSaveCallables.add(sourceDoc);
234-
}
235-
236-
@Override
237-
public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update,
238-
UpdateOptions options) {
239-
if (multi) {
240-
writeModels.add(BulkWriteSupport.updateMany(query, update, options));
241-
} else {
242-
writeModels.add(BulkWriteSupport.updateOne(query, update, options));
243-
}
244-
}
245-
246-
@Override
247-
public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) {
248-
if (removeFirst) {
249-
writeModels.add(BulkWriteSupport.removeOne(query, options));
250-
} else {
251-
writeModels.add(BulkWriteSupport.removeMany(query, options));
252-
}
253-
}
254-
255-
@Override
256-
public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
257-
SourceAwareDocument<Object> sourceDoc) {
258-
writeModels.add(BulkWriteSupport.replaceOne(query, replacement, options));
259-
afterSaveCallables.add(sourceDoc);
260-
}
261-
262-
@Override
263-
public List<SourceAwareDocument<Object>> getAfterSaveCallables() {
264-
return afterSaveCallables;
265-
}
266-
267-
MongoNamespace getNamespace() {
268-
return namespace;
269-
}
270-
271-
List<WriteModel<Document>> getWriteModels() {
272-
return writeModels;
273-
}
274-
}
275-
276-
private static class MultiCollectionCollector implements WriteModelCollector<ClientNamespacedWriteModel> {
277-
278-
private final List<ClientNamespacedWriteModel> writeModels = new ArrayList<>();
279-
private final List<SourceAwareDocument<Object>> afterSaveCallables = new ArrayList<>();
280-
private final String defaultDatabaseName;
281-
282-
public MultiCollectionCollector(String defaultDatabaseName) {
283-
this.defaultDatabaseName = defaultDatabaseName;
284-
}
285-
286-
@Override
287-
public MongoNamespace resolveNamespace(String collectionName) {
288-
return new MongoNamespace(defaultDatabaseName, collectionName);
289-
}
290-
291-
@Override
292-
public void addInsert(MongoNamespace namespace, Document document, SourceAwareDocument<Object> sourceDoc) {
293-
writeModels.add(ClientNamespacedWriteModel.insertOne(namespace, document));
294-
afterSaveCallables.add(sourceDoc);
295-
}
296-
297-
@Override
298-
public void addUpdate(MongoNamespace namespace, boolean multi, Document query, Object update,
299-
UpdateOptions options) {
300-
if (multi) {
301-
writeModels.add(BulkWriteSupport.updateMany(namespace, query, update, options));
302-
} else {
303-
writeModels.add(BulkWriteSupport.updateOne(namespace, query, update, options));
304-
}
305-
}
306-
307-
@Override
308-
public void addRemove(MongoNamespace namespace, boolean removeFirst, Document query, DeleteOptions options) {
309-
if (removeFirst) {
310-
writeModels.add(BulkWriteSupport.removeOne(namespace, query, options));
311-
} else {
312-
writeModels.add(BulkWriteSupport.removeMany(namespace, query, options));
313-
}
314-
}
315-
316-
@Override
317-
public void addReplace(MongoNamespace namespace, Document query, Document replacement, UpdateOptions options,
318-
SourceAwareDocument<Object> sourceDoc) {
319-
writeModels.add(BulkWriteSupport.replaceOne(namespace, query, replacement, options));
320-
afterSaveCallables.add(sourceDoc);
321-
}
322-
323-
@Override
324-
public List<SourceAwareDocument<Object>> getAfterSaveCallables() {
325-
return afterSaveCallables;
326-
}
327-
328-
List<ClientNamespacedWriteModel> getWriteModels() {
329-
return writeModels;
330-
}
331-
}
332-
333175
}

0 commit comments

Comments
 (0)