Skip to content

Commit 0316a10

Browse files
committed
1 parent f5f4cf9 commit 0316a10

13 files changed

Lines changed: 6 additions & 578 deletions

src/org/jgroups/JChannel.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -549,13 +549,7 @@ public Object down(Event evt) {
549549
}
550550

551551
public Object down(Message msg) {
552-
msg.incr();
553-
try {
554-
return prot_stack.down(msg);
555-
}
556-
finally {
557-
msg.decr();
558-
}
552+
return msg != null? prot_stack.down(msg) : null;
559553
}
560554

561555
/**

src/org/jgroups/Message.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* @author Bela Ban
1717
* @since 5.0
1818
*/
19-
public interface Message extends SizeStreamable, Constructable<Message>, Refcountable<Message> {
19+
public interface Message extends SizeStreamable, Constructable<Message> {
2020

2121
// The type of the message. Cannot be an enum, as users can register additional types
2222
short BYTES_MSG = 0,

src/org/jgroups/Refcountable.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

src/org/jgroups/RefcountedBytesMessage.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

src/org/jgroups/RefcountedNioMessage.java

Lines changed: 0 additions & 55 deletions
This file was deleted.

src/org/jgroups/protocols/NAKACK4.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,8 @@ protected void handleAck(Address sender, long ack) {
208208
return;
209209
}
210210
long old_min=rc[0], new_min=rc[1];
211-
if(new_min > old_min) {
212-
213-
buf.forEach(buf.low()+1, buf.hd(), DECR, false);
214-
211+
if(new_min > old_min)
215212
buf.purge(new_min); // unblocks senders waiting for space to become available
216-
}
217213
}
218214

219215
@Override

src/org/jgroups/protocols/ReliableMulticast.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,6 @@ public abstract class ReliableMulticast extends Protocol implements DiagnosticsH
123123
return hdr == null || hdr.getType() != NakAckHeader.MSG? -1 : hdr.getSeqno();
124124
};
125125
protected final Predicate<Message> HAS_HEADER=m -> m != null && m.getHeader(id) != null;
126-
protected static final Buffer.Visitor<Message> DECR=(seqno, msg) -> {
127-
if(msg != null)
128-
msg.decr();
129-
return true;
130-
};
131126

132127

133128
@ManagedAttribute(description="Number of retransmit requests received",type=SCALAR)
@@ -519,10 +514,8 @@ public Object down(Message msg) {
519514
if(dont_loopback_set && needToSendAck(send_entry, 1))
520515
handleAck(local_addr, win.highestDelivered()); // https://issues.redhat.com/browse/JGRP-2829
521516
}
522-
else {
523-
msg.decr();
517+
else
524518
log.trace("%s: dropped message due to closed send buffer, message: %s", local_addr, msg);
525-
}
526519
last_seqno_resender.skipNext();
527520
return null; // don't pass down the stack
528521
}
@@ -714,7 +707,6 @@ protected boolean send(Message msg, Buffer<Message> win, boolean dont_loopback_s
714707
msg.putHeader(this.id, NakAckHeader.createMessageHeader(msg_id));
715708
if(!addToSendBuffer(win, msg_id, msg, dont_loopback_set? remove_filter : null))
716709
return false; // e.g. message already present in send buffer, or buffer is closed
717-
msg.incr();
718710
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
719711
return true;
720712
}
@@ -1234,7 +1226,6 @@ protected void stable(Digest digest) {
12341226

12351227
// delete *delivered* msgs that are stable (all messages with seqnos <= seqno)
12361228
if(hd >= 0 && win != null) {
1237-
win.forEach(win.low()+1, win.hd(), DECR, false);
12381229
log.trace("%s: deleting msgs <= %s from %s", local_addr, hd, member);
12391230
win.purge(hd);
12401231
}

src/org/jgroups/protocols/ReliableUnicast.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,6 @@ public abstract class ReliableUnicast extends Protocol implements AgeOutCache.Ha
184184

185185
protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=MessageBatch::add;
186186

187-
protected static final Buffer.Visitor<Message> DECR=(seqno, msg) -> {
188-
if(msg != null)
189-
msg.decr();
190-
return true;
191-
};
192-
193187
protected abstract Buffer<Message> createBuffer(long initial_seqno);
194188
protected abstract boolean needToSendAck(Entry e, int num_acks);
195189

@@ -687,10 +681,8 @@ public Object down(Message msg) {
687681
boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && dst.equals(local_addr);
688682
if(send(msg, entry, dont_loopback_set))
689683
num_msgs_sent.increment();
690-
else {
691-
msg.decr();
684+
else
692685
log.trace("%s: dropped message due to closed send buffer, message: %s", local_addr, msg);
693-
}
694686
return null; // the message was already sent down the stack in send()
695687
}
696688

@@ -1036,7 +1028,6 @@ protected void handleAckReceived(Address sender, long seqno, short conn_id, int
10361028

10371029
Buffer<Message> win=entry != null? entry.buf : null;
10381030
if(win != null && entry.updateLastTimestamp(timestamp)) {
1039-
win.forEach(win.low()+1, seqno, DECR, false);
10401031
win.purge(seqno, true); // removes all messages <= seqno (forced purge)
10411032
num_acks_received.increment();
10421033
}
@@ -1110,9 +1101,6 @@ protected boolean send(Message msg, SenderEntry entry, boolean dont_loopback_set
11101101
msg.putHeader(this.id,UnicastHeader.createDataHeader(seqno, send_conn_id,seqno == DEFAULT_FIRST_SEQNO));
11111102
if(!addToSendBuffer(buf, seqno, msg, dont_loopback_set? remove_filter : null))
11121103
return false; // e.g. message already present in send buffer, or buffer is closed
1113-
boolean is_loopback=msg.dest() != null && msg.dest().equals(local_addr);
1114-
if(!dont_loopback_set && !is_loopback)
1115-
msg.incr();
11161104
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
11171105
if(entry.state() == State.CLOSING)
11181106
entry.state(State.OPEN);

src/org/jgroups/protocols/UNICAST3.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,6 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
196196

197197
protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=MessageBatch::add;
198198

199-
protected static final Table.Visitor<Message> DECR=(seqno, msg, row, col) -> {
200-
if(msg != null)
201-
msg.decr();
202-
return true;
203-
};
204-
205199
/** Used for testing only! */
206200
public Table<Message> getSendWindow(Address target) {
207201
SenderEntry entry=send_table.get(target);
@@ -713,12 +707,10 @@ public Object down(Message msg) {
713707
return up_prot.up(msg);
714708
}
715709
SenderEntry entry=getSenderEntry(dst);
716-
boolean is_loopback=dst.equals(local_addr), dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && is_loopback;
710+
boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && dst.equals(local_addr);
717711
short send_conn_id=entry.connId();
718712
long seqno=entry.sent_msgs_seqno.getAndIncrement();
719713
long sleep=10;
720-
if(!dont_loopback_set && !is_loopback)
721-
msg.incr();
722714
do {
723715
try {
724716
msg.putHeader(this.id,UnicastHeader3.createDataHeader(seqno,send_conn_id,seqno == DEFAULT_FIRST_SEQNO));
@@ -1107,7 +1099,6 @@ protected void handleAckReceived(Address sender, long seqno, short conn_id, int
11071099

11081100
Table<Message> win=entry != null? entry.msgs : null;
11091101
if(win != null && entry.updateLastTimestamp(timestamp)) {
1110-
win.forEach(win.getLow()+1, seqno, DECR);
11111102
win.purge(seqno, true); // removes all messages <= seqno (forced purge)
11121103
num_acks_received.increment();
11131104
}

src/org/jgroups/protocols/pbcast/NAKACK2.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,6 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler
162162
return hdr == null || hdr.getType() != NakAckHeader2.MSG? -1 : hdr.getSeqno();
163163
};
164164
protected final Predicate<Message> HAS_HEADER=m -> m != null && m.getHeader(id) != null;
165-
protected static final Table.Visitor<Message> DECR=(seqno, msg, row, col) -> {
166-
if(msg != null)
167-
msg.decr();
168-
return true;
169-
};
170-
171165

172166
@ManagedAttribute(description="Number of retransmit requests received",type=AttributeType.SCALAR)
173167
protected final LongAdder xmit_reqs_received=new LongAdder();
@@ -798,7 +792,6 @@ protected void send(Message msg) {
798792
Table<Message> buf=local_xmit_table;
799793
if(buf == null && (buf=local_xmit_table=xmit_table.get(local_addr)) == null) // discard message if there is no entry for local_addr
800794
return;
801-
msg.incr();
802795
if(msg.getSrc() == null)
803796
msg.setSrc(local_addr); // this needs to be done so we can check whether the message sender is the local_addr
804797

@@ -1309,7 +1302,6 @@ protected void stable(Digest digest) {
13091302

13101303
// delete *delivered* msgs that are stable (all messages with seqnos <= seqno)
13111304
if(hd >= 0 && buf != null) {
1312-
buf.forEach(buf.getLow()+1, hd, DECR);
13131305
log.trace("%s: deleting msgs <= %s from %s", local_addr, hd, member);
13141306
buf.purge(hd);
13151307
}

0 commit comments

Comments
 (0)