1515 */
1616package org .drools .persistence .session ;
1717
18+ import static org .drools .persistence .util .DroolsPersistenceUtil .DROOLS_PERSISTENCE_UNIT_NAME ;
19+ import static org .drools .persistence .util .DroolsPersistenceUtil .createEnvironment ;
20+ import static org .junit .Assert .assertEquals ;
21+ import static org .junit .Assert .fail ;
22+
23+ import java .util .ArrayList ;
24+ import java .util .List ;
25+ import java .util .Map ;
26+ import java .util .concurrent .CountDownLatch ;
27+ import java .util .concurrent .ExecutorService ;
28+ import java .util .concurrent .Executors ;
29+ import java .util .concurrent .TimeUnit ;
30+ import java .util .concurrent .atomic .AtomicInteger ;
31+
1832import org .drools .core .command .impl .CommandBasedStatefulKnowledgeSession ;
1933import org .drools .core .impl .InternalKnowledgeBase ;
2034import org .drools .core .impl .KnowledgeBaseFactory ;
2438import org .hibernate .StaleObjectStateException ;
2539import org .junit .After ;
2640import org .junit .Before ;
27- import org .junit .Ignore ;
2841import org .junit .Test ;
2942import org .kie .api .KieBase ;
43+ import org .kie .api .event .rule .DefaultRuleRuntimeEventListener ;
44+ import org .kie .api .event .rule .ObjectInsertedEvent ;
3045import org .kie .api .io .ResourceType ;
3146import org .kie .api .runtime .Environment ;
3247import org .kie .internal .builder .KnowledgeBuilder ;
3752import org .slf4j .Logger ;
3853import org .slf4j .LoggerFactory ;
3954
40- import java .util .ArrayList ;
41- import java .util .List ;
42- import java .util .Map ;
43-
44- import static org .drools .persistence .util .DroolsPersistenceUtil .DROOLS_PERSISTENCE_UNIT_NAME ;
45- import static org .drools .persistence .util .DroolsPersistenceUtil .createEnvironment ;
46- import static org .junit .Assert .assertEquals ;
47- import static org .junit .Assert .fail ;
4855
49- @ Ignore ("Failing after Hibernate 4.x -> 5.x upgrade. Needs to be investigated, see https://issues.jboss.org/browse/DROOLS-1393" )
5056public class JpaOptLockPersistentStatefulSessionTest {
5157
5258 private static Logger logger = LoggerFactory .getLogger (JpaOptLockPersistentStatefulSessionTest .class );
5359
5460 private Map <String , Object > context ;
5561 private Environment env ;
5662
63+ private static CountDownLatch ksession1latch = new CountDownLatch (1 );
64+ private static CountDownLatch ksession2latch = new CountDownLatch (1 );
65+ private static volatile boolean isKsession1finished = false ;
66+
5767 public JpaOptLockPersistentStatefulSessionTest () {
5868 }
59-
69+
6070 @ Before
6171 public void setUp () throws Exception {
6272 context = DroolsPersistenceUtil .setupWithPoolingDataSource (DROOLS_PERSISTENCE_UNIT_NAME );
6373 env = createEnvironment (context );
6474 }
65-
75+
6676 @ After
6777 public void tearDown () throws Exception {
6878 DroolsPersistenceUtil .cleanUp (context );
6979 }
7080
7181 @ Test
72- public void testOptimisticLockInterceptor () {
82+ public void testOptimisticLockInterceptor () throws InterruptedException {
7383 String str = "" ;
7484 str += "package org.kie.test\n " ;
7585 str += "global java.util.List list\n " ;
@@ -93,17 +103,100 @@ public void testOptimisticLockInterceptor() {
93103 kbase .addPackages ( kbuilder .getKnowledgePackages () );
94104
95105 StatefulKnowledgeSession ksession = JPAKnowledgeService .newStatefulKnowledgeSession ( kbase , null , env );
96- List <?> list = new ArrayList <Object >();
106+ List <?> list = new ArrayList <>();
107+ List <Thread > threads = new ArrayList <>();
97108 for (int i = 0 ; i < 2 ; i ++) {
98- new InsertAndFireThread (ksession .getIdentifier (), kbase , list ).start ();
109+ Thread thread = new InsertAndFireThread (ksession .getIdentifier (), kbase , list );
110+ threads .add (thread );
111+ thread .start ();
112+ }
113+ for (Thread thread : threads ) {
114+ thread .join ();
99115 }
116+ assertEquals ( 6 , list .size () );
117+ ksession .dispose ();
118+ }
119+
120+ @ Test
121+ public void testOptimisticLockInterceptorMaxRetry () {
122+ String str = "" ;
123+ str += "package org.kie.test\n " ;
124+ str += "rule rule1\n " ;
125+ str += "when\n " ;
126+ str += " Integer(intValue == 1)\n " ;
127+ str += "then\n " ;
128+ str += "end\n " ;
129+
130+ KnowledgeBuilder kbuilder = KnowledgeBuilderFactory .newKnowledgeBuilder ();
131+ kbuilder .add (ResourceFactory .newByteArrayResource (str .getBytes ()), ResourceType .DRL );
132+ final InternalKnowledgeBase kbase = KnowledgeBaseFactory .newKnowledgeBase ();
133+
134+ if (kbuilder .hasErrors ()) {
135+ fail (kbuilder .getErrors ().toString ());
136+ }
137+
138+ kbase .addPackages (kbuilder .getKnowledgePackages ());
139+
140+ final AtomicInteger attempts = new AtomicInteger (0 );
141+
142+ final StatefulKnowledgeSession ksession1 = JPAKnowledgeService .newStatefulKnowledgeSession (kbase , null , env );
143+ PersistableRunner sscs1 = (PersistableRunner ) ((CommandBasedStatefulKnowledgeSession ) ksession1 ).getRunner ();
144+ OptimisticLockRetryInterceptor interceptor1 = new OptimisticLockRetryInterceptor ();
145+ sscs1 .addInterceptor (interceptor1 );
146+ ksession1 .addEventListener (new DefaultRuleRuntimeEventListener () {
147+
148+ public void objectInserted (ObjectInsertedEvent event ) {
149+ attempts .incrementAndGet ();
150+ try {
151+ ksession1latch = new CountDownLatch (1 );
152+ ksession2latch .countDown ();
153+ ksession1latch .await (); // Wait for ksession2 to commit so ksesison1 will hit OptimisticLockException
154+ } catch (InterruptedException e ) {
155+ e .printStackTrace ();
156+ }
157+ }
158+ });
159+
160+ final long ksessionId = ksession1 .getIdentifier ();
161+ StatefulKnowledgeSession ksession2 = JPAKnowledgeService .loadStatefulKnowledgeSession (ksessionId , kbase , null , createEnvironment (context ));
162+ PersistableRunner sscs2 = (PersistableRunner ) ((CommandBasedStatefulKnowledgeSession ) ksession2 ).getRunner ();
163+ OptimisticLockRetryInterceptor interceptor2 = new OptimisticLockRetryInterceptor ();
164+ sscs2 .addInterceptor (interceptor2 );
165+
166+ ExecutorService executor = Executors .newFixedThreadPool (1 );
167+ executor .execute (() -> {
168+ try {
169+ ksession1 .insert (1 );
170+ ksession1 .dispose ();
171+ } catch (Exception e ) {
172+ e .printStackTrace ();
173+ } finally {
174+ isKsession1finished = true ;
175+ ksession2latch .countDown ();
176+ }
177+ });
178+
179+ while (!isKsession1finished ) {
180+ try {
181+ ksession2latch .await ();
182+ } catch (InterruptedException e ) {
183+ e .printStackTrace ();
184+ }
185+ ksession2latch = new CountDownLatch (1 );
186+
187+ ksession2 .insert (2 );
188+ ksession1latch .countDown ();
189+ }
190+ ksession2 .dispose ();
191+
192+ executor .shutdown ();
100193 try {
101- Thread . sleep ( 1000 );
194+ executor . awaitTermination ( 300 , TimeUnit . SECONDS );
102195 } catch (InterruptedException e ) {
103196 e .printStackTrace ();
104197 }
105- assertEquals ( 6 , list . size () );
106- ksession . dispose ( );
198+
199+ assertEquals ( 4 , attempts . get () );
107200 }
108201
109202 private class InsertAndFireThread extends Thread {
0 commit comments