Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.drools.core.common;

import org.drools.core.reteoo.LeftTupleNode;
import org.drools.core.reteoo.LeftTupleSinkPropagator;
import org.drools.core.reteoo.LeftTupleSource;
import org.drools.core.reteoo.PathEndNode;
import org.drools.core.reteoo.PathMemory;
Expand All @@ -36,7 +35,7 @@ public interface SegmentMemorySupport {

public SegmentMemory getQuerySegmentMemory(QueryElementNode queryNode);

public void createChildSegments(LeftTupleSinkPropagator sinkProp, SegmentMemory smem);
public void initializeChildSegmentsIfNeeded(SegmentMemory smem);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpic: I would remove the IfNeeded bit from the name of this method (unless you have a good reason to keep it).

Copy link
Contributor Author

@pibizza pibizza Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariofusco It is to clarify that the method is idempotent - you can call it twice and it won't get extra initailization.


public SegmentMemory createChildSegment(LeftTupleNode node);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1121,8 +1121,9 @@ private static TupleImpl insertPeerLeftTuple(InternalWorkingMemory wm,
memory.getSegmentMemory().getStagedLeftTuples().addInsert(peer);
} else {
// If parent is Lian, then this must be called, so that any linking or unlinking can be done.
LeftInputAdapterNode.doInsertSegmentMemoryWithFlush(wm, true, liaMem, memory.getSegmentMemory(), peer, node
.getLeftTupleSource().isStreamMode());
List<PathMemory> pathsToFlush = LeftInputAdapterNode.doInsertSegmentMemory(wm, true, liaMem, memory.getSegmentMemory(), peer, node
.getLeftTupleSource().isStreamMode() );
wm.getRuleNetworkEvaluator().forceFlushPaths(pathsToFlush);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this force flush necessary now? Did it do this also before in a different way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariofusco yes. If you see the method called originally was doInsertSegmentMemoryWithFlush. Now it is doInsertSegmentMemory. See changes in LeftInputAdapterNode.

}

return peer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,8 +1032,9 @@ private static TupleImpl insertPeerLeftTuple(InternalWorkingMemory wm,
memory.getSegmentMemory().getStagedLeftTuples().addInsert(peer);
} else {
// If parent is Lian, then this must be called, so that any linking or unlinking can be done.
LeftInputAdapterNode.doInsertSegmentMemoryWithFlush(wm, true, liaMem, memory.getSegmentMemory(), peer, node
.getLeftTupleSource().isStreamMode());
List<PathMemory> pathsToFlush = LeftInputAdapterNode.doInsertSegmentMemory(wm, true, liaMem, memory.getSegmentMemory(), peer, node
.getLeftTupleSource().isStreamMode() );
wm.getRuleNetworkEvaluator().forceFlushPaths(pathsToFlush);
}

return peer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.drools.core.phreak;

import java.util.Collection;
import java.util.List;

import org.drools.base.common.NetworkNode;
Expand Down Expand Up @@ -51,15 +52,18 @@ void forceFlushLeftTuple(PathMemory pmem,

void forceFlushWhenSubnetwork(PathMemory pmem);

public boolean flushLeftTupleIfNecessary(SegmentMemory sm, boolean streamMode);
boolean flushLeftTupleIfNecessary(SegmentMemory sm, boolean streamMode);

public boolean flushLeftTupleIfNecessary(SegmentMemory sm,
boolean flushLeftTupleIfNecessary(SegmentMemory sm,
TupleImpl leftTuple,
boolean streamMode,
short stagedType);

List<PathMemory> findPathsToFlushFromSubnetwork(PathMemory pmem);

void propagate(SegmentMemory sourceSegment, TupleSets leftTuples);

void forceFlushPaths(Collection<PathMemory> pathsToFlush);

void propagate(SegmentMemory smem, TupleSets actualResultLeftTuples);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.drools.core.phreak;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -49,12 +50,10 @@
import org.drools.core.reteoo.FromNode;
import org.drools.core.reteoo.FromNode.FromMemory;
import org.drools.core.reteoo.JoinNode;
import org.drools.core.reteoo.LeftInputAdapterNode;
import org.drools.core.reteoo.LeftTuple;
import org.drools.core.reteoo.LeftTupleNode;
import org.drools.core.reteoo.LeftTupleSink;
import org.drools.core.reteoo.LeftTupleSinkNode;
import org.drools.core.reteoo.LeftTupleSource;
import org.drools.core.reteoo.NotNode;
import org.drools.core.reteoo.ObjectSink;
import org.drools.core.reteoo.PathEndNode;
Expand All @@ -73,7 +72,6 @@
import org.drools.core.reteoo.TupleFactory;
import org.drools.core.reteoo.TupleImpl;
import org.drools.core.reteoo.TupleToObjectNode;
import org.drools.core.reteoo.TupleToObjectNode.SubnetworkPathMemory;
import org.drools.core.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -147,7 +145,7 @@ public void evaluateNetwork(ActivationsManager activationsManager,
return;
}

LeftInputAdapterNode liaNode = (LeftInputAdapterNode) smem.getRootNode();
LeftTupleNode liaNode = smem.getRootNode();

NetworkNode node;
Memory nodeMem;
Expand Down Expand Up @@ -188,7 +186,7 @@ public void evaluate(PathMemory pmem,
}

long bit = 1;
for (NetworkNode node = sm.getRootNode(); node != sink; node = ((LeftTupleSource) node).getSinkPropagator()
for (NetworkNode node = sm.getRootNode(); node != sink; node = ((LeftTupleNode) node).getSinkPropagator()
.getFirstLeftTupleSink()) {
//update the bit to the correct node position.
bit = nextNodePosMask(bit);
Expand All @@ -201,11 +199,17 @@ public void evaluate(PathMemory pmem,

@Override
public void forceFlushWhenSubnetwork(PathMemory pmem) {
for (PathMemory outPmem : findPathsToFlushFromSubnetwork(pmem)) {
forceFlushPaths(findPathsToFlushFromSubnetwork(pmem));
}

@Override
public void forceFlushPaths(Collection<PathMemory> pmems) {
for (PathMemory outPmem : pmems) {
forceFlushPath(outPmem);
}
}


@Override
public List<PathMemory> findPathsToFlushFromSubnetwork(PathMemory pmem) {
List<PathMemory> paths = null;
Expand Down Expand Up @@ -399,8 +403,8 @@ private void innerEval(ActivationsManager activationsManager,

// this is needed for subnetworks that feed into a parent network that has no right inputs,
// and may not yet be initialized
if (smem.isEmpty() && !NodeTypeEnums.isTerminalNode(smem.getTipNode())) {
segmentMemorySupport.createChildSegments(smem.getTipNode().getSinkPropagator(), smem);
if (!NodeTypeEnums.isTerminalNode(smem.getTipNode())) {
segmentMemorySupport.initializeChildSegmentsIfNeeded(smem);
}

smem = smems[i];
Expand Down Expand Up @@ -436,7 +440,7 @@ private void innerEval(ActivationsManager activationsManager,
log.trace("{} Skip Node {}", indent(offset), node);
}
bit = nextNodePosMask(bit); // shift to check the next node
node = ((LeftTupleSource) node).getSinkPropagator().getFirstLeftTupleSink();
node = ((LeftTupleNode) node).getSinkPropagator().getFirstLeftTupleSink();
nodeMem = nodeMem.getNext();
}
}
Expand All @@ -460,7 +464,7 @@ private void innerEval(ActivationsManager activationsManager,
}

stagedLeftTuples = getTargetStagedLeftTuples(smem, node);
LeftTupleSinkNode sink = ((LeftTupleSource) node).getSinkPropagator().getFirstLeftTupleSink();
LeftTupleSinkNode sink = ((LeftTupleNode) node).getSinkPropagator().getFirstLeftTupleSink();

trgTuples = evalNode(activationsManager, executor, stack, pmem, smems, smem, smemIndex, bit, nodeMem, node, sink, srcTuples, stagedLeftTuples, processSubnetwork);
if (trgTuples == null) {
Expand Down Expand Up @@ -569,9 +573,7 @@ private TupleSets getTargetStagedLeftTuples(SegmentMemory smem,
NetworkNode node) {
if (node == smem.getTipNode()) {
// we are about to process the segment tip, allow it to merge insert/update/delete clashes
if (smem.isEmpty()) {
segmentMemorySupport.createChildSegments(((LeftTupleSource) node).getSinkPropagator(), smem);
}
segmentMemorySupport.initializeChildSegmentsIfNeeded(smem);
return smem.getFirst().getStagedLeftTuples().takeAll();
} else {
return null;
Expand Down Expand Up @@ -749,7 +751,7 @@ private void doSubnetwork(ActivationsManager activationsManager,
BetaMemory bm,
BetaNode betaNode,
LeftTupleSinkNode sink) {
SubnetworkPathMemory pathMem = bm.getSubnetworkPathMemory();
PathMemory pathMem = bm.getSubnetworkPathMemory();
SegmentMemory[] subnetworkSmems = pathMem.getSegmentMemories();
SegmentMemory subSmem = null;
for (int i = 0; subSmem == null; i++) {
Expand All @@ -767,6 +769,8 @@ private void doSubnetwork(ActivationsManager activationsManager,
log.trace("{} SubnetworkQueue {} {}", indent(offset), betaNode.toString(), srcTuples.toStringSizes());
}




TupleSets subLts = subSmem.getStagedLeftTuples().takeAll();
// node is first in the segment, so bit is 1
Expand Down Expand Up @@ -892,16 +896,13 @@ private void doSubnetwork2(TupleToObjectNode tton,
srcTuples.resetAll();
}

@Override
public void propagate(SegmentMemory sourceSegment, TupleSets leftTuples) {
if (leftTuples.isEmpty()) {
return;
}

LeftTupleSource source = ( LeftTupleSource ) sourceSegment.getTipNode();

if ( sourceSegment.isEmpty() ) {
segmentMemorySupport.createChildSegments(source.getSinkPropagator(), sourceSegment);
}
segmentMemorySupport.initializeChildSegmentsIfNeeded(sourceSegment);

processPeers(sourceSegment, leftTuples);

Expand Down Expand Up @@ -1030,15 +1031,15 @@ private static String indent(int size) {
}

private static int getOffset(NetworkNode node) {
LeftTupleSource lt;
LeftTupleNode lt;
int offset = 1;
if (NodeTypeEnums.isTerminalNode(node)) {
lt = ((TerminalNode) node).getLeftTupleSource();
offset++;
} else if (node.getType() == NodeTypeEnums.TupleToObjectNode) {
lt = ((TupleToObjectNode) node).getLeftTupleSource();
} else {
lt = (LeftTupleSource) node;
lt = (LeftTupleNode) node;
}
while (!NodeTypeEnums.isLeftInputAdapterNode(lt)) {
offset++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,13 @@ public SegmentMemory createChildSegmentLazily(LeftTupleNode node) {
return memory.getSegmentMemory();
}

public void createChildSegments(LeftTupleSinkPropagator sinkProp, SegmentMemory smem) {
@Override
public void initializeChildSegmentsIfNeeded(SegmentMemory smem) {
LeftTupleSinkPropagator sinkPropagator = smem.getTipNode().getSinkPropagator();
if (!smem.isEmpty()) {
return; // this can happen when multiple threads are trying to initialize the segment
}
for (LeftTupleSinkNode sink = sinkProp.getFirstLeftTupleSink(); sink != null; sink = sink
for (LeftTupleSinkNode sink = sinkPropagator.getFirstLeftTupleSink(); sink != null; sink = sink
.getNextLeftTupleSinkNode()) {
SegmentMemory childSmem = PhreakBuilder.isEagerSegmentCreation() ? createChildSegment(sink)
: createChildSegmentLazily(sink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.drools.core.reteoo;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -173,10 +174,7 @@ public static void doInsertObject(final InternalFactHandle factHandle,
boolean useLeftMemory) {
SegmentMemory sm = lm.getOrCreateSegmentMemory( liaNode, reteEvaluator );
if ( sm.getTipNode() == liaNode) {
// liaNode in its own segment and child segments not yet created
if ( sm.isEmpty() ) {
reteEvaluator.getSegmentMemorySupport().createChildSegments(liaNode.getSinkPropagator(), sm);
}
reteEvaluator.getSegmentMemorySupport().initializeChildSegmentsIfNeeded(sm);
sm = sm.getFirst(); // repoint to the child sm
}

Expand All @@ -199,8 +197,9 @@ public static void doInsertObject(final InternalFactHandle factHandle,
TupleImpl leftTuple = TupleFactory.createLeftTuple( sink, factHandle, useLeftMemory );
leftTuple.setPropagationContext( context );

Collection<PathMemory> pathsToFlush;
if ( sm.getRootNode() == liaNode ) {
doInsertSegmentMemoryWithFlush(reteEvaluator, notifySegment, lm, sm, leftTuple, liaNode.isStreamMode());
pathsToFlush = doInsertSegmentMemory(reteEvaluator, notifySegment, lm, sm, leftTuple, liaNode.isStreamMode() );
} else {
// sm points to lia child sm, so iterate for all remaining children
// all peer tuples must be created before propagation, or eager evaluation subnetworks have problem
Expand All @@ -210,9 +209,8 @@ public static void doInsertObject(final InternalFactHandle factHandle,
sink = sm.getSinkFactory();
peer = TupleFactory.createPeer( sink, peer ); // pctx is set during peer cloning
}

sm = originaSm;
Set<PathMemory> pathsToFlush = new HashSet<>();
pathsToFlush = new HashSet<>();
pathsToFlush.addAll( doInsertSegmentMemory( reteEvaluator, notifySegment, lm, sm, leftTuple, liaNode.isStreamMode() ) );
if ( sm.getRootNode() != liaNode ) {
// sm points to lia child sm, so iterate for all remaining children
Expand All @@ -223,16 +221,8 @@ public static void doInsertObject(final InternalFactHandle factHandle,
}
}

for (PathMemory outPmem : pathsToFlush) {
reteEvaluator.getRuleNetworkEvaluator().forceFlushPath(outPmem);
}
}
}

public static void doInsertSegmentMemoryWithFlush(ReteEvaluator reteEvaluator, boolean notifySegment, LiaNodeMemory lm, SegmentMemory sm, TupleImpl leftTuple, boolean streamMode) {
for (PathMemory outPmem : doInsertSegmentMemory(reteEvaluator, notifySegment, lm, sm, leftTuple, streamMode )) {
reteEvaluator.getRuleNetworkEvaluator().forceFlushPath(outPmem);
}
reteEvaluator.getRuleNetworkEvaluator().forceFlushPaths(pathsToFlush);
}

public static List<PathMemory> doInsertSegmentMemory(ReteEvaluator reteEvaluator, boolean linkOrNotify, LiaNodeMemory lm, SegmentMemory sm, TupleImpl leftTuple, boolean streamMode) {
Expand Down Expand Up @@ -263,10 +253,7 @@ public static void doDeleteObject(TupleImpl leftTuple,
final boolean linkOrNotify,
final LiaNodeMemory lm) {
if ( sm.getTipNode() == liaNode ) {
// liaNode in it's own segment and child segments not yet created
if ( sm.isEmpty() ) {
reteEvaluator.getSegmentMemorySupport().createChildSegments(liaNode.getSinkPropagator(), sm);
}
reteEvaluator.getSegmentMemorySupport().initializeChildSegmentsIfNeeded(sm);
sm = sm.getFirst(); // repoint to the child sm
}

Expand Down Expand Up @@ -321,10 +308,7 @@ public static void doUpdateObject(TupleImpl leftTuple,
final LiaNodeMemory lm,
SegmentMemory sm) {
if ( sm.getTipNode() == liaNode) {
// liaNode in it's own segment and child segments not yet created
if ( sm.isEmpty() ) {
reteEvaluator.getSegmentMemorySupport().createChildSegments(liaNode.getSinkPropagator(), sm);
}
reteEvaluator.getSegmentMemorySupport().initializeChildSegmentsIfNeeded(sm);
sm = sm.getFirst(); // repoint to the child sm
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public LeftTupleNode getRootNode() {
return proto.getRootNode();
}

public boolean isOnlyLiaSegment() {
return getRootNode() == getTipNode();
}

public SegmentPrototype getSegmentPrototype() {
return proto;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.drools.core.common.ReteEvaluator;
import org.drools.core.common.TupleSets;
import org.drools.core.phreak.PhreakTimerNode;
import org.drools.core.reteoo.LeftTuple;
import org.drools.core.reteoo.LeftTupleSink;
import org.drools.core.reteoo.PathMemory;
import org.drools.core.reteoo.SegmentMemory;
Expand Down
Loading