2222import org .apache .commons .logging .Log ;
2323import org .apache .commons .logging .LogFactory ;
2424
25- import org .springframework .core .Ordered ;
2625import org .springframework .dao .DataAccessResourceFailureException ;
2726import org .springframework .lang .Nullable ;
28- import org .springframework .transaction .NoTransactionException ;
29- import org .springframework .transaction .reactive .TransactionSynchronization ;
3027import org .springframework .transaction .reactive .TransactionSynchronizationManager ;
3128import org .springframework .util .Assert ;
3229
@@ -68,8 +65,7 @@ private ConnectionFactoryUtils() {}
6865 * @see #releaseConnection
6966 */
7067 public static Mono <Connection > getConnection (ConnectionFactory connectionFactory ) {
71- return doGetConnection (connectionFactory )
72- .onErrorMap (e -> new DataAccessResourceFailureException ("Failed to obtain R2DBC Connection" , e ));
68+ return org .springframework .r2dbc .connection .ConnectionFactoryUtils .getConnection (connectionFactory );
7369 }
7470
7571 /**
@@ -83,64 +79,7 @@ public static Mono<Connection> getConnection(ConnectionFactory connectionFactory
8379 * @return a R2DBC {@link io.r2dbc.spi.Connection} from the given {@link io.r2dbc.spi.ConnectionFactory}.
8480 */
8581 public static Mono <Connection > doGetConnection (ConnectionFactory connectionFactory ) {
86-
87- Assert .notNull (connectionFactory , "ConnectionFactory must not be null!" );
88-
89- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (synchronizationManager -> {
90-
91- ConnectionHolder conHolder = (ConnectionHolder ) synchronizationManager .getResource (connectionFactory );
92- if (conHolder != null && (conHolder .hasConnection () || conHolder .isSynchronizedWithTransaction ())) {
93- conHolder .requested ();
94- if (!conHolder .hasConnection ()) {
95-
96- if (logger .isDebugEnabled ()) {
97- logger .debug ("Fetching resumed R2DBC Connection from ConnectionFactory" );
98- }
99- return fetchConnection (connectionFactory ).doOnNext (conHolder ::setConnection );
100- }
101- return Mono .just (conHolder .getConnection ());
102- }
103- // Else we either got no holder or an empty thread-bound holder here.
104-
105- if (logger .isDebugEnabled ()) {
106- logger .debug ("Fetching R2DBC Connection from ConnectionFactory" );
107- }
108-
109- Mono <Connection > con = fetchConnection (connectionFactory );
110-
111- if (synchronizationManager .isSynchronizationActive ()) {
112-
113- return con .flatMap (it -> {
114-
115- return Mono .just (it ).doOnNext (conn -> {
116-
117- // Use same Connection for further R2DBC actions within the transaction.
118- // Thread-bound object will get removed by synchronization at transaction completion.
119- ConnectionHolder holderToUse = conHolder ;
120- if (holderToUse == null ) {
121- holderToUse = new ConnectionHolder (conn );
122- } else {
123- holderToUse .setConnection (conn );
124- }
125- holderToUse .requested ();
126- synchronizationManager
127- .registerSynchronization (new ConnectionSynchronization (holderToUse , connectionFactory ));
128- holderToUse .setSynchronizedWithTransaction (true );
129- if (holderToUse != conHolder ) {
130- synchronizationManager .bindResource (connectionFactory , holderToUse );
131- }
132- }).onErrorResume (e -> {
133- // Unexpected exception from external delegation call -> close Connection and rethrow.
134- return releaseConnection (it , connectionFactory ).then (Mono .error (e ));
135- });
136- });
137- }
138-
139- return con ;
140- }) //
141- .onErrorResume (NoTransactionException .class , e -> {
142- return Mono .from (connectionFactory .create ());
143- });
82+ return org .springframework .r2dbc .connection .ConnectionFactoryUtils .doGetConnection (connectionFactory );
14483 }
14584
14685 /**
@@ -183,17 +122,8 @@ public static Mono<Void> releaseConnection(io.r2dbc.spi.Connection con, Connecti
183122 public static Mono <Void > doReleaseConnection (io .r2dbc .spi .Connection connection ,
184123 ConnectionFactory connectionFactory ) {
185124
186- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (it -> {
187-
188- ConnectionHolder conHolder = (ConnectionHolder ) it .getResource (connectionFactory );
189- if (conHolder != null && connectionEquals (conHolder , connection )) {
190- // It's the transactional Connection: Don't close it.
191- conHolder .released ();
192- }
193- return Mono .from (connection .close ());
194- }).onErrorResume (NoTransactionException .class , e -> {
195- return doCloseConnection (connection , connectionFactory );
196- });
125+ return org .springframework .r2dbc .connection .ConnectionFactoryUtils .doReleaseConnection (connection ,
126+ connectionFactory );
197127 }
198128
199129 /**
@@ -246,37 +176,7 @@ public static Mono<Void> doCloseConnection(Connection connection, @Nullable Conn
246176 */
247177 public static Mono <ConnectionFactory > currentConnectionFactory (ConnectionFactory connectionFactory ) {
248178
249- return TransactionSynchronizationManager .forCurrentTransaction ()
250- .filter (TransactionSynchronizationManager ::isSynchronizationActive ).filter (it -> {
251-
252- ConnectionHolder conHolder = (ConnectionHolder ) it .getResource (connectionFactory );
253- if (conHolder != null && (conHolder .hasConnection () || conHolder .isSynchronizedWithTransaction ())) {
254- return true ;
255- }
256- return false ;
257- }).map (it -> connectionFactory );
258- }
259-
260- /**
261- * Determine whether the given two {@link io.r2dbc.spi.Connection}s are equal, asking the target
262- * {@link io.r2dbc.spi.Connection} in case of a proxy. Used to detect equality even if the user passed in a raw target
263- * Connection while the held one is a proxy.
264- *
265- * @param conHolder the {@link .ConnectionHolder} for the held {@link io.r2dbc.spi.Connection} (potentially a proxy).
266- * @param passedInCon the {@link io.r2dbc.spi.Connection} passed-in by the user (potentially a target
267- * {@link io.r2dbc.spi.Connection} without proxy).
268- * @return whether the given Connections are equal
269- * @see #getTargetConnection
270- */
271- private static boolean connectionEquals (ConnectionHolder conHolder , Connection passedInCon ) {
272-
273- if (!conHolder .hasConnection ()) {
274- return false ;
275- }
276- Connection heldCon = conHolder .getConnection ();
277- // Explicitly check for identity too: for Connection handles that do not implement
278- // "equals" properly).
279- return (heldCon == passedInCon || heldCon .equals (passedInCon ) || getTargetConnection (heldCon ).equals (passedInCon ));
179+ return org .springframework .r2dbc .connection .ConnectionFactoryUtils .currentConnectionFactory (connectionFactory );
280180 }
281181
282182 /**
@@ -317,112 +217,4 @@ private static int getConnectionSynchronizationOrder(ConnectionFactory connectio
317217 return order ;
318218 }
319219
320- /**
321- * Callback for resource cleanup at the end of a non-native R2DBC transaction.
322- */
323- private static class ConnectionSynchronization implements TransactionSynchronization , Ordered {
324-
325- private final ConnectionHolder connectionHolder ;
326-
327- private final ConnectionFactory connectionFactory ;
328-
329- private int order ;
330-
331- private boolean holderActive = true ;
332-
333- ConnectionSynchronization (ConnectionHolder connectionHolder , ConnectionFactory connectionFactory ) {
334- this .connectionHolder = connectionHolder ;
335- this .connectionFactory = connectionFactory ;
336- this .order = getConnectionSynchronizationOrder (connectionFactory );
337- }
338-
339- @ Override
340- public int getOrder () {
341- return this .order ;
342- }
343-
344- @ Override
345- public Mono <Void > suspend () {
346- if (this .holderActive ) {
347-
348- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (it -> {
349-
350- it .unbindResource (this .connectionFactory );
351- if (this .connectionHolder .hasConnection () && !this .connectionHolder .isOpen ()) {
352- // Release Connection on suspend if the application doesn't keep
353- // a handle to it anymore. We will fetch a fresh Connection if the
354- // application accesses the ConnectionHolder again after resume,
355- // assuming that it will participate in the same transaction.
356- return releaseConnection (this .connectionHolder .getConnection (), this .connectionFactory )
357- .doOnTerminate (() -> this .connectionHolder .setConnection (null ));
358- }
359- return Mono .empty ();
360- });
361- }
362-
363- return Mono .empty ();
364- }
365-
366- @ Override
367- public Mono <Void > resume () {
368- if (this .holderActive ) {
369- return TransactionSynchronizationManager .forCurrentTransaction ().doOnNext (it -> {
370- it .bindResource (this .connectionFactory , this .connectionHolder );
371- }).then ();
372- }
373- return Mono .empty ();
374- }
375-
376- @ Override
377- public Mono <Void > beforeCompletion () {
378-
379- // Release Connection early if the holder is not open anymore
380- // (that is, not used by another resource
381- // that has its own cleanup via transaction synchronization),
382- // to avoid issues with strict transaction implementations that expect
383- // the close call before transaction completion.
384- if (!this .connectionHolder .isOpen ()) {
385- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (it -> {
386-
387- it .unbindResource (this .connectionFactory );
388- this .holderActive = false ;
389- if (this .connectionHolder .hasConnection ()) {
390- return releaseConnection (this .connectionHolder .getConnection (), this .connectionFactory );
391- }
392- return Mono .empty ();
393- });
394- }
395-
396- return Mono .empty ();
397- }
398-
399- @ Override
400- public Mono <Void > afterCompletion (int status ) {
401-
402- // If we haven't closed the Connection in beforeCompletion,
403- // close it now. The holder might have been used for other
404- // cleanup in the meantime, for example by a Hibernate Session.
405- if (this .holderActive ) {
406- // The thread-bound ConnectionHolder might not be available anymore,
407- // since afterCompletion might get called from a different thread.
408- return TransactionSynchronizationManager .forCurrentTransaction ().flatMap (it -> {
409-
410- it .unbindResourceIfPossible (this .connectionFactory );
411- this .holderActive = false ;
412- if (this .connectionHolder .hasConnection ()) {
413- return releaseConnection (this .connectionHolder .getConnection (), this .connectionFactory )
414- .doOnTerminate (() -> {
415- // Reset the ConnectionHolder: It might remain bound to the context.
416- this .connectionHolder .setConnection (null );
417- });
418- }
419-
420- return Mono .empty ();
421- });
422- }
423-
424- this .connectionHolder .reset ();
425- return Mono .empty ();
426- }
427- }
428220}
0 commit comments