Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@
*/
package org.drools.persistence.session;

import static org.drools.persistence.util.DroolsPersistenceUtil.DROOLS_PERSISTENCE_UNIT_NAME;
import static org.drools.persistence.util.DroolsPersistenceUtil.createEnvironment;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.drools.core.command.impl.CommandBasedStatefulKnowledgeSession;
import org.drools.core.impl.InternalKnowledgeBase;
import org.drools.core.impl.KnowledgeBaseFactory;
Expand All @@ -24,9 +38,10 @@
import org.hibernate.StaleObjectStateException;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.kie.api.KieBase;
import org.kie.api.event.rule.DefaultRuleRuntimeEventListener;
import org.kie.api.event.rule.ObjectInsertedEvent;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.Environment;
import org.kie.internal.builder.KnowledgeBuilder;
Expand All @@ -37,39 +52,33 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.drools.persistence.util.DroolsPersistenceUtil.DROOLS_PERSISTENCE_UNIT_NAME;
import static org.drools.persistence.util.DroolsPersistenceUtil.createEnvironment;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

@Ignore("Failing after Hibernate 4.x -> 5.x upgrade. Needs to be investigated, see https://issues.jboss.org/browse/DROOLS-1393")
public class JpaOptLockPersistentStatefulSessionTest {

private static Logger logger = LoggerFactory.getLogger(JpaOptLockPersistentStatefulSessionTest.class);

private Map<String, Object> context;
private Environment env;

private static CountDownLatch ksession1latch = new CountDownLatch(1);
private static CountDownLatch ksession2latch = new CountDownLatch(1);
private static volatile boolean isKsession1finished = false;

public JpaOptLockPersistentStatefulSessionTest() {
}

@Before
public void setUp() throws Exception {
context = DroolsPersistenceUtil.setupWithPoolingDataSource(DROOLS_PERSISTENCE_UNIT_NAME);
env = createEnvironment(context);
}

@After
public void tearDown() throws Exception {
DroolsPersistenceUtil.cleanUp(context);
}

@Test
public void testOptimisticLockInterceptor() {
public void testOptimisticLockInterceptor() throws InterruptedException {
String str = "";
str += "package org.kie.test\n";
str += "global java.util.List list\n";
Expand All @@ -93,17 +102,100 @@ public void testOptimisticLockInterceptor() {
kbase.addPackages( kbuilder.getKnowledgePackages() );

StatefulKnowledgeSession ksession = JPAKnowledgeService.newStatefulKnowledgeSession( kbase, null, env );
List<?> list = new ArrayList<Object>();
List<?> list = new ArrayList<>();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 2; i++) {
new InsertAndFireThread(ksession.getIdentifier(), kbase, list).start();
Thread thread = new InsertAndFireThread(ksession.getIdentifier(), kbase, list);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
assertEquals( 6, list.size() );
ksession.dispose();
}

@Test
public void testOptimisticLockInterceptorMaxRetry() {
String str = "";
str += "package org.kie.test\n";
str += "rule rule1\n";
str += "when\n";
str += " Integer(intValue == 1)\n";
str += "then\n";
str += "end\n";

KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add(ResourceFactory.newByteArrayResource(str.getBytes()), ResourceType.DRL);
final InternalKnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();

if (kbuilder.hasErrors()) {
fail(kbuilder.getErrors().toString());
}

kbase.addPackages(kbuilder.getKnowledgePackages());

final AtomicInteger attempts = new AtomicInteger(0);

final StatefulKnowledgeSession ksession1 = JPAKnowledgeService.newStatefulKnowledgeSession(kbase, null, env);
PersistableRunner sscs1 = (PersistableRunner) ((CommandBasedStatefulKnowledgeSession) ksession1).getRunner();
OptimisticLockRetryInterceptor interceptor1 = new OptimisticLockRetryInterceptor();
sscs1.addInterceptor(interceptor1);
ksession1.addEventListener(new DefaultRuleRuntimeEventListener() {

public void objectInserted(ObjectInsertedEvent event) {
attempts.incrementAndGet();
try {
ksession1latch = new CountDownLatch(1);
ksession2latch.countDown();
ksession1latch.await(); // Wait for ksession2 to commit so ksesison1 will hit OptimisticLockException
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

final long ksessionId = ksession1.getIdentifier();
StatefulKnowledgeSession ksession2 = JPAKnowledgeService.loadStatefulKnowledgeSession(ksessionId, kbase, null, createEnvironment(context));
PersistableRunner sscs2 = (PersistableRunner) ((CommandBasedStatefulKnowledgeSession) ksession2).getRunner();
OptimisticLockRetryInterceptor interceptor2 = new OptimisticLockRetryInterceptor();
sscs2.addInterceptor(interceptor2);

ExecutorService executor = Executors.newFixedThreadPool(1);
executor.execute(() -> {
try {
ksession1.insert(1);
ksession1.dispose();
} catch (Exception e) {
e.printStackTrace();
} finally {
isKsession1finished = true;
ksession2latch.countDown();
}
});

while (!isKsession1finished) {
try {
ksession2latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
ksession2latch = new CountDownLatch(1);

ksession2.insert(2);
ksession1latch.countDown();
}
ksession2.dispose();

executor.shutdown();
try {
Thread.sleep(1000);
executor.awaitTermination(300, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertEquals( 6, list.size() );
ksession.dispose();

assertEquals(4, attempts.get());
}

private class InsertAndFireThread extends Thread {
Expand Down