forked from I2P_Developers/i2p.i2p
SSU OutboundMessageState -
Fix SSU Output Queue errors due to races with PacketBuilder: - Remove all buffer caching as it can't be made thread-safe. Just allocate buffer in constructor and let GC handle it - Do fragmenting in constructor and make all fragment fields final - Don't track per-fragment retransmissions as it wasn't used - Move ack tracking from an array to a long - Sync all ack methods - Entire class now thread-safe (thx dg)
This commit is contained in:
@ -4,16 +4,16 @@ import java.util.Date;
|
||||
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.Base64;
|
||||
import net.i2p.data.ByteArray;
|
||||
import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.util.CDPQEntry;
|
||||
import net.i2p.util.ByteCache;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
* Maintain the outbound fragmentation for resending, for a single message.
|
||||
*
|
||||
* All methods are thread-safe.
|
||||
*
|
||||
*/
|
||||
class OutboundMessageState implements CDPQEntry {
|
||||
private final I2PAppContext _context;
|
||||
@ -21,53 +21,32 @@ class OutboundMessageState implements CDPQEntry {
|
||||
/** may be null if we are part of the establishment */
|
||||
private final OutNetMessage _message;
|
||||
private final I2NPMessage _i2npMessage;
|
||||
private final long _messageId;
|
||||
/** will be null, unless we are part of the establishment */
|
||||
private final PeerState _peer;
|
||||
private final long _expiration;
|
||||
private ByteArray _messageBuf;
|
||||
private final byte[] _messageBuf;
|
||||
/** fixed fragment size across the message */
|
||||
private int _fragmentSize;
|
||||
/** size of the I2NP message */
|
||||
private int _totalSize;
|
||||
/** sends[i] is how many times the fragment has been sent, or -1 if ACKed
|
||||
* TODO this may not accurately track the number of retransmissions per-fragment,
|
||||
* and we don't make any use of it anyway, so we should just make it a bitfield.
|
||||
*/
|
||||
private short _fragmentSends[];
|
||||
private final int _fragmentSize;
|
||||
/** bitmask, 0 if acked, all 0 = complete */
|
||||
private long _fragmentAcks;
|
||||
private final int _numFragments;
|
||||
private final long _startedOn;
|
||||
private long _nextSendTime;
|
||||
private int _pushCount;
|
||||
private short _maxSends;
|
||||
// private int _nextSendFragment;
|
||||
/** for tracking use-after-free bugs */
|
||||
private boolean _released;
|
||||
private Exception _releasedBy;
|
||||
private int _maxSends;
|
||||
// we can't use the ones in _message since it is null for injections
|
||||
private long _enqueueTime;
|
||||
private long _seqNum;
|
||||
|
||||
public static final int MAX_MSG_SIZE = 32 * 1024;
|
||||
private static final int CACHE4_BYTES = MAX_MSG_SIZE;
|
||||
private static final int CACHE3_BYTES = CACHE4_BYTES / 4;
|
||||
private static final int CACHE2_BYTES = CACHE3_BYTES / 4;
|
||||
private static final int CACHE1_BYTES = CACHE2_BYTES / 4;
|
||||
|
||||
private static final int CACHE1_MAX = 256;
|
||||
private static final int CACHE2_MAX = CACHE1_MAX / 4;
|
||||
private static final int CACHE3_MAX = CACHE2_MAX / 4;
|
||||
private static final int CACHE4_MAX = CACHE3_MAX / 4;
|
||||
|
||||
private static final ByteCache _cache1 = ByteCache.getInstance(CACHE1_MAX, CACHE1_BYTES);
|
||||
private static final ByteCache _cache2 = ByteCache.getInstance(CACHE2_MAX, CACHE2_BYTES);
|
||||
private static final ByteCache _cache3 = ByteCache.getInstance(CACHE3_MAX, CACHE3_BYTES);
|
||||
private static final ByteCache _cache4 = ByteCache.getInstance(CACHE4_MAX, CACHE4_BYTES);
|
||||
|
||||
private static final long EXPIRATION = 10*1000;
|
||||
|
||||
|
||||
/**
|
||||
* Called from UDPTransport
|
||||
* "injected" message from the establisher.
|
||||
*
|
||||
* Called from UDPTransport.
|
||||
* @throws IAE if too big or if msg or peer is null
|
||||
*/
|
||||
public OutboundMessageState(I2PAppContext context, I2NPMessage msg, PeerState peer) {
|
||||
@ -75,7 +54,9 @@ class OutboundMessageState implements CDPQEntry {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called from OutboundMessageFragments
|
||||
* Normal constructor.
|
||||
*
|
||||
* Called from OutboundMessageFragments.
|
||||
* @throws IAE if too big or if msg or peer is null
|
||||
*/
|
||||
public OutboundMessageState(I2PAppContext context, OutNetMessage m, PeerState peer) {
|
||||
@ -95,161 +76,86 @@ class OutboundMessageState implements CDPQEntry {
|
||||
_message = m;
|
||||
_i2npMessage = msg;
|
||||
_peer = peer;
|
||||
_messageId = msg.getUniqueId();
|
||||
_startedOn = _context.clock().now();
|
||||
_nextSendTime = _startedOn;
|
||||
_expiration = _startedOn + EXPIRATION;
|
||||
//_expiration = msg.getExpiration();
|
||||
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
|
||||
// now "fragment" it
|
||||
int totalSize = _i2npMessage.getRawMessageSize();
|
||||
if (totalSize > MAX_MSG_SIZE)
|
||||
throw new IllegalArgumentException("Size too large! " + totalSize);
|
||||
_messageBuf = new byte[totalSize];
|
||||
_i2npMessage.toRawByteArray(_messageBuf);
|
||||
_fragmentSize = _peer.fragmentSize();
|
||||
int numFragments = totalSize / _fragmentSize;
|
||||
if (numFragments * _fragmentSize < totalSize)
|
||||
numFragments++;
|
||||
// This should never happen, as 534 bytes * 64 fragments > 32KB, and we won't bid on > 32KB
|
||||
if (numFragments > InboundMessageState.MAX_FRAGMENTS)
|
||||
throw new IllegalArgumentException("Fragmenting a " + totalSize + " message into " + numFragments + " fragments - too many!");
|
||||
_numFragments = numFragments;
|
||||
// all 1's where we care
|
||||
_fragmentAcks = _numFragments < 64 ? mask(_numFragments) - 1L : -1L;
|
||||
}
|
||||
|
||||
/**
|
||||
* lazily inits the message buffer unless already inited
|
||||
* @param fragment 0-63
|
||||
*/
|
||||
private synchronized void initBuf() {
|
||||
if (_messageBuf != null)
|
||||
return;
|
||||
final int size = _i2npMessage.getRawMessageSize();
|
||||
acquireBuf(size);
|
||||
_totalSize = _i2npMessage.toRawByteArray(_messageBuf.getData());
|
||||
_messageBuf.setValid(_totalSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IAE if too big
|
||||
* @since 0.9.3
|
||||
*/
|
||||
private void acquireBuf(int size) {
|
||||
if (_messageBuf != null)
|
||||
releaseBuf();
|
||||
if (size <= CACHE1_BYTES)
|
||||
_messageBuf = _cache1.acquire();
|
||||
else if (size <= CACHE2_BYTES)
|
||||
_messageBuf = _cache2.acquire();
|
||||
else if (size <= CACHE3_BYTES)
|
||||
_messageBuf = _cache3.acquire();
|
||||
else if (size <= CACHE4_BYTES)
|
||||
_messageBuf = _cache4.acquire();
|
||||
else
|
||||
throw new IllegalArgumentException("Size too large! " + size);
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.9.3
|
||||
*/
|
||||
private void releaseBuf() {
|
||||
if (_messageBuf == null)
|
||||
return;
|
||||
int size = _messageBuf.getData().length;
|
||||
if (size == CACHE1_BYTES)
|
||||
_cache1.release(_messageBuf);
|
||||
else if (size == CACHE2_BYTES)
|
||||
_cache2.release(_messageBuf);
|
||||
else if (size == CACHE3_BYTES)
|
||||
_cache3.release(_messageBuf);
|
||||
else if (size == CACHE4_BYTES)
|
||||
_cache4.release(_messageBuf);
|
||||
_messageBuf = null;
|
||||
_released = true;
|
||||
private static long mask(int fragment) {
|
||||
return 1L << fragment;
|
||||
}
|
||||
|
||||
/**
|
||||
* This is synchronized with writeFragment(),
|
||||
* so we do not release (probably due to an ack) while we are retransmitting.
|
||||
* Also prevent double-free
|
||||
*/
|
||||
public synchronized void releaseResources() {
|
||||
if (_messageBuf != null && !_released) {
|
||||
releaseBuf();
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_releasedBy = new Exception ("Released on " + new Date() + " by:");
|
||||
}
|
||||
//_messageBuf = null;
|
||||
}
|
||||
|
||||
public OutNetMessage getMessage() { return _message; }
|
||||
public long getMessageId() { return _messageId; }
|
||||
|
||||
public long getMessageId() { return _i2npMessage.getUniqueId(); }
|
||||
|
||||
public PeerState getPeer() { return _peer; }
|
||||
|
||||
public boolean isExpired() {
|
||||
return _expiration < _context.clock().now();
|
||||
}
|
||||
|
||||
public boolean isComplete() {
|
||||
short sends[] = _fragmentSends;
|
||||
if (sends == null) return false;
|
||||
for (int i = 0; i < sends.length; i++)
|
||||
if (sends[i] >= 0)
|
||||
return false;
|
||||
// nothing else pending ack
|
||||
return true;
|
||||
public synchronized boolean isComplete() {
|
||||
return _fragmentAcks == 0;
|
||||
}
|
||||
|
||||
public synchronized int getUnackedSize() {
|
||||
short fragmentSends[] = _fragmentSends;
|
||||
ByteArray messageBuf = _messageBuf;
|
||||
int rv = 0;
|
||||
if ( (messageBuf != null) && (fragmentSends != null) ) {
|
||||
int lastSize = _totalSize % _fragmentSize;
|
||||
if (lastSize == 0)
|
||||
lastSize = _fragmentSize;
|
||||
for (int i = 0; i < fragmentSends.length; i++) {
|
||||
if (fragmentSends[i] >= (short)0) {
|
||||
if (i + 1 == fragmentSends.length)
|
||||
rv += lastSize;
|
||||
else
|
||||
rv += _fragmentSize;
|
||||
}
|
||||
if (isComplete())
|
||||
return rv;
|
||||
int lastSize = _messageBuf.length % _fragmentSize;
|
||||
if (lastSize == 0)
|
||||
lastSize = _fragmentSize;
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
if (needsSending(i)) {
|
||||
if (i + 1 == _numFragments)
|
||||
rv += lastSize;
|
||||
else
|
||||
rv += _fragmentSize;
|
||||
}
|
||||
}
|
||||
return rv;
|
||||
}
|
||||
|
||||
public boolean needsSending(int fragment) {
|
||||
short sends[] = _fragmentSends;
|
||||
if ( (sends == null) || (fragment >= sends.length) || (fragment < 0) )
|
||||
return false;
|
||||
return (sends[fragment] >= (short)0);
|
||||
public synchronized boolean needsSending(int fragment) {
|
||||
return (_fragmentAcks & mask(fragment)) != 0;
|
||||
}
|
||||
|
||||
public long getLifetime() { return _context.clock().now() - _startedOn; }
|
||||
|
||||
/**
|
||||
* Ack all the fragments in the ack list. As a side effect, if there are
|
||||
* still unacked fragments, the 'next send' time will be updated under the
|
||||
* assumption that that all of the packets within a volley would reach the
|
||||
* peer within that ack frequency (2-400ms).
|
||||
* Ack all the fragments in the ack list.
|
||||
*
|
||||
* @return true if the message was completely ACKed
|
||||
*/
|
||||
public boolean acked(ACKBitfield bitfield) {
|
||||
public synchronized boolean acked(ACKBitfield bitfield) {
|
||||
// stupid brute force, but the cardinality should be trivial
|
||||
short sends[] = _fragmentSends;
|
||||
if (sends != null) {
|
||||
for (int i = 0; i < bitfield.fragmentCount() && i < sends.length; i++) {
|
||||
if (bitfield.received(i))
|
||||
sends[i] = (short)-1;
|
||||
}
|
||||
for (int i = 0; i < bitfield.fragmentCount() && i < _numFragments; i++) {
|
||||
if (bitfield.received(i))
|
||||
_fragmentAcks &= ~mask(i);
|
||||
}
|
||||
|
||||
boolean rv = isComplete();
|
||||
/****
|
||||
if (!rv && false) { // don't do the fast retransmit... lets give it time to get ACKed
|
||||
long nextTime = _context.clock().now() + Math.max(_peer.getRTT(), ACKSender.ACK_FREQUENCY);
|
||||
//_nextSendTime = Math.max(now, _startedOn+PeerState.MIN_RTO);
|
||||
if (_nextSendTime <= 0)
|
||||
_nextSendTime = nextTime;
|
||||
else
|
||||
_nextSendTime = Math.min(_nextSendTime, nextTime);
|
||||
|
||||
//if (now + 100 > _nextSendTime)
|
||||
// _nextSendTime = now + 100;
|
||||
//_nextSendTime = now;
|
||||
}
|
||||
****/
|
||||
return rv;
|
||||
return isComplete();
|
||||
}
|
||||
|
||||
public long getNextSendTime() { return _nextSendTime; }
|
||||
@ -259,111 +165,45 @@ class OutboundMessageState implements CDPQEntry {
|
||||
* The max number of sends for any fragment, which is the
|
||||
* same as the push count, at least as it's coded now.
|
||||
*/
|
||||
public int getMaxSends() { return _maxSends; }
|
||||
public synchronized int getMaxSends() { return _maxSends; }
|
||||
|
||||
/**
|
||||
* The number of times we've pushed some fragments, which is the
|
||||
* same as the max sends, at least as it's coded now.
|
||||
*/
|
||||
public int getPushCount() { return _pushCount; }
|
||||
public synchronized int getPushCount() { return _pushCount; }
|
||||
|
||||
/**
|
||||
* Note that we have pushed the message fragments.
|
||||
* Increments push count (and max sends... why?)
|
||||
*/
|
||||
public void push() {
|
||||
public synchronized void push() {
|
||||
// these will never be different...
|
||||
_pushCount++;
|
||||
if (_pushCount > _maxSends)
|
||||
_maxSends = (short)_pushCount;
|
||||
if (_fragmentSends != null)
|
||||
for (int i = 0; i < _fragmentSends.length; i++)
|
||||
if (_fragmentSends[i] >= (short)0)
|
||||
_fragmentSends[i]++;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether fragment() has been called.
|
||||
* NOT whether it has more than one fragment.
|
||||
*
|
||||
* Caller should synchronize
|
||||
*
|
||||
* @return true iff fragment() has been called previously
|
||||
*/
|
||||
public boolean isFragmented() { return _fragmentSends != null; }
|
||||
|
||||
/**
|
||||
* Prepare the message for fragmented delivery, using no more than
|
||||
* fragmentSize bytes per fragment.
|
||||
*
|
||||
* Caller should synchronize
|
||||
*
|
||||
* @throws IllegalStateException if called more than once
|
||||
*/
|
||||
public void fragment(int fragmentSize) {
|
||||
if (_fragmentSends != null)
|
||||
throw new IllegalStateException();
|
||||
initBuf();
|
||||
int numFragments = _totalSize / fragmentSize;
|
||||
if (numFragments * fragmentSize < _totalSize)
|
||||
numFragments++;
|
||||
// This should never happen, as 534 bytes * 64 fragments > 32KB, and we won't bid on > 32KB
|
||||
if (numFragments > InboundMessageState.MAX_FRAGMENTS)
|
||||
throw new IllegalArgumentException("Fragmenting a " + _totalSize + " message into " + numFragments + " fragments - too many!");
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Fragmenting a " + _totalSize + " message into " + numFragments + " fragments");
|
||||
|
||||
//_fragmentEnd = new int[numFragments];
|
||||
_fragmentSends = new short[numFragments];
|
||||
//Arrays.fill(_fragmentEnd, -1);
|
||||
//Arrays.fill(_fragmentSends, (short)0);
|
||||
|
||||
_fragmentSize = fragmentSize;
|
||||
_maxSends = _pushCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* How many fragments in the message.
|
||||
* Only valid after fragment() has been called.
|
||||
* Returns -1 before then.
|
||||
*
|
||||
* Caller should synchronize
|
||||
*/
|
||||
public int getFragmentCount() {
|
||||
if (_fragmentSends == null)
|
||||
return -1;
|
||||
else
|
||||
return _fragmentSends.length;
|
||||
return _numFragments;
|
||||
}
|
||||
|
||||
/**
|
||||
* The size of the I2NP message. Does not include any SSU overhead.
|
||||
*
|
||||
* Caller should synchronize
|
||||
*/
|
||||
public int getMessageSize() { return _totalSize; }
|
||||
public int getMessageSize() { return _messageBuf.length; }
|
||||
|
||||
/**
|
||||
* Should we continue sending this fragment?
|
||||
* Only valid after fragment() has been called.
|
||||
* Throws NPE before then.
|
||||
* The size in bytes of the fragment
|
||||
*
|
||||
* Caller should synchronize
|
||||
*
|
||||
* @return true if fragment is not acked yet
|
||||
*/
|
||||
public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; }
|
||||
|
||||
/**
|
||||
* This assumes fragment(int size) has been called
|
||||
* @param fragmentNum the number of the fragment
|
||||
*
|
||||
* @return the size of the fragment specified by the number
|
||||
*/
|
||||
public int fragmentSize(int fragmentNum) {
|
||||
if (_messageBuf == null) return -1;
|
||||
if (fragmentNum + 1 == _fragmentSends.length) {
|
||||
int valid = _totalSize;
|
||||
if (fragmentNum + 1 == _numFragments) {
|
||||
int valid = _messageBuf.length;
|
||||
if (valid <= _fragmentSize)
|
||||
return valid;
|
||||
// bugfix 0.8.12
|
||||
@ -376,63 +216,19 @@ class OutboundMessageState implements CDPQEntry {
|
||||
|
||||
/**
|
||||
* Write a part of the the message onto the specified buffer.
|
||||
* See releaseResources() above for synchronization information.
|
||||
* This assumes fragment(int size) has been called.
|
||||
*
|
||||
* @param out target to write
|
||||
* @param outOffset into outOffset to begin writing
|
||||
* @param fragmentNum fragment to write (0 indexed)
|
||||
* @return bytesWritten
|
||||
*/
|
||||
public synchronized int writeFragment(byte out[], int outOffset, int fragmentNum) {
|
||||
if (_messageBuf == null) return -1;
|
||||
if (_released) {
|
||||
/******
|
||||
Solved by synchronization with releaseResources() and simply returning -1.
|
||||
Previous output:
|
||||
|
||||
23:50:57.013 ERROR [acket pusher] sport.udp.OutboundMessageState: SSU OMS Use after free
|
||||
java.lang.Exception: Released on Wed Dec 23 23:50:57 GMT 2009 by:
|
||||
at net.i2p.router.transport.udp.OutboundMessageState.releaseResources(OutboundMessageState.java:133)
|
||||
at net.i2p.router.transport.udp.PeerState.acked(PeerState.java:1391)
|
||||
at net.i2p.router.transport.udp.OutboundMessageFragments.acked(OutboundMessageFragments.java:404)
|
||||
at net.i2p.router.transport.udp.InboundMessageFragments.receiveACKs(InboundMessageFragments.java:191)
|
||||
at net.i2p.router.transport.udp.InboundMessageFragments.receiveData(InboundMessageFragments.java:77)
|
||||
at net.i2p.router.transport.udp.PacketHandler$Handler.handlePacket(PacketHandler.java:485)
|
||||
at net.i2p.router.transport.udp.PacketHandler$Handler.receivePacket(PacketHandler.java:282)
|
||||
at net.i2p.router.transport.udp.PacketHandler$Handler.handlePacket(PacketHandler.java:231)
|
||||
at net.i2p.router.transport.udp.PacketHandler$Handler.run(PacketHandler.java:136)
|
||||
at java.lang.Thread.run(Thread.java:619)
|
||||
at net.i2p.util.I2PThread.run(I2PThread.java:71)
|
||||
23:50:57.014 ERROR [acket pusher] ter.transport.udp.PacketPusher: SSU Output Queue Error
|
||||
java.lang.RuntimeException: SSU OMS Use after free: Message 2381821417 with 4 fragments of size 0 volleys: 2 lifetime: 1258 pending fragments: 0 1 2 3
|
||||
at net.i2p.router.transport.udp.OutboundMessageState.writeFragment(OutboundMessageState.java:298)
|
||||
at net.i2p.router.transport.udp.PacketBuilder.buildPacket(PacketBuilder.java:170)
|
||||
at net.i2p.router.transport.udp.OutboundMessageFragments.preparePackets(OutboundMessageFragments.java:332)
|
||||
at net.i2p.router.transport.udp.OutboundMessageFragments.getNextVolley(OutboundMessageFragments.java:297)
|
||||
at net.i2p.router.transport.udp.PacketPusher.run(PacketPusher.java:38)
|
||||
at java.lang.Thread.run(Thread.java:619)
|
||||
at net.i2p.util.I2PThread.run(I2PThread.java:71)
|
||||
*******/
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.log(Log.WARN, "SSU OMS Use after free: " + toString(), _releasedBy);
|
||||
return -1;
|
||||
//throw new RuntimeException("SSU OMS Use after free: " + toString());
|
||||
}
|
||||
public int writeFragment(byte out[], int outOffset, int fragmentNum) {
|
||||
int start = _fragmentSize * fragmentNum;
|
||||
int end = start + fragmentSize(fragmentNum);
|
||||
int toSend = end - start;
|
||||
byte buf[] = _messageBuf.getData();
|
||||
if ( (buf != null) && (start + toSend <= buf.length) && (outOffset + toSend <= out.length) ) {
|
||||
System.arraycopy(buf, start, out, outOffset, toSend);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Raw fragment[" + fragmentNum + "] for " + _messageId
|
||||
+ "[" + start + "-" + (start+toSend) + "/" + _totalSize + "/" + _fragmentSize + "]: "
|
||||
+ Base64.encode(out, outOffset, toSend));
|
||||
int toSend = fragmentSize(fragmentNum);
|
||||
int end = start + toSend;
|
||||
if (end <= _messageBuf.length && outOffset + toSend <= out.length) {
|
||||
System.arraycopy(_messageBuf, start, out, outOffset, toSend);
|
||||
return toSend;
|
||||
} else if (buf == null) {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error: null buf");
|
||||
} else {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Error: " + start + '/' + end + '/' + outOffset + '/' + out.length);
|
||||
@ -462,7 +258,6 @@ class OutboundMessageState implements CDPQEntry {
|
||||
*/
|
||||
public void drop() {
|
||||
_peer.getTransport().failed(this, false);
|
||||
releaseResources();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -492,19 +287,18 @@ class OutboundMessageState implements CDPQEntry {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
short sends[] = _fragmentSends;
|
||||
StringBuilder buf = new StringBuilder(256);
|
||||
buf.append("OB Message ").append(_messageId);
|
||||
if (sends != null)
|
||||
buf.append(" with ").append(sends.length).append(" fragments");
|
||||
buf.append(" of size ").append(_totalSize);
|
||||
buf.append("OB Message ").append(_i2npMessage.getUniqueId());
|
||||
buf.append(" with ").append(_numFragments).append(" fragments");
|
||||
buf.append(" of size ").append(_messageBuf.length);
|
||||
buf.append(" volleys: ").append(_maxSends);
|
||||
buf.append(" lifetime: ").append(getLifetime());
|
||||
if (sends != null) {
|
||||
if (!isComplete()) {
|
||||
buf.append(" pending fragments: ");
|
||||
for (int i = 0; i < sends.length; i++)
|
||||
if (sends[i] >= 0)
|
||||
for (int i = 0; i < _numFragments; i++) {
|
||||
if (needsSending(i))
|
||||
buf.append(i).append(' ');
|
||||
}
|
||||
}
|
||||
return buf.toString();
|
||||
}
|
||||
|
@ -1541,7 +1541,6 @@ class PeerState {
|
||||
for (int i = 0; succeeded != null && i < succeeded.size(); i++) {
|
||||
OutboundMessageState state = succeeded.get(i);
|
||||
_transport.succeeded(state);
|
||||
state.releaseResources();
|
||||
OutNetMessage msg = state.getMessage();
|
||||
if (msg != null)
|
||||
msg.timestamp("sending complete");
|
||||
@ -1559,7 +1558,6 @@ class PeerState {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Unable to send a direct message: " + state);
|
||||
}
|
||||
state.releaseResources();
|
||||
}
|
||||
|
||||
return rv + _outboundQueue.size();
|
||||
@ -1708,7 +1706,7 @@ class PeerState {
|
||||
* how much payload data can we shove in there?
|
||||
* @return MTU - 87, i.e. 533 or 1397 (IPv4), MTU - 107 (IPv6)
|
||||
*/
|
||||
private int fragmentSize() {
|
||||
public int fragmentSize() {
|
||||
// 46 + 20 + 8 + 13 = 74 + 13 = 87 (IPv4)
|
||||
// 46 + 40 + 8 + 13 = 74 + 13 = 107 (IPv6)
|
||||
return _mtu -
|
||||
@ -1727,16 +1725,6 @@ class PeerState {
|
||||
private ShouldSend locked_shouldSend(OutboundMessageState state) {
|
||||
long now = _context.clock().now();
|
||||
if (state.getNextSendTime() <= now) {
|
||||
if (!state.isFragmented()) {
|
||||
state.fragment(fragmentSize());
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("fragment into " + state.getFragmentCount());
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Fragmenting " + state);
|
||||
}
|
||||
|
||||
|
||||
OutboundMessageState retrans = _retransmitter;
|
||||
if ( (retrans != null) && ( (retrans.isExpired() || retrans.isComplete()) ) ) {
|
||||
_retransmitter = null;
|
||||
@ -1858,7 +1846,6 @@ class PeerState {
|
||||
//if (getSendWindowBytesRemaining() > 0)
|
||||
// _throttle.unchoke(peer.getRemotePeer());
|
||||
|
||||
state.releaseResources();
|
||||
} else {
|
||||
// dupack, likely
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
@ -1935,7 +1922,6 @@ class PeerState {
|
||||
//if (state.getPeer().getSendWindowBytesRemaining() > 0)
|
||||
// _throttle.unchoke(state.getPeer().getRemotePeer());
|
||||
|
||||
state.releaseResources();
|
||||
} else {
|
||||
//if (state.getMessage() != null)
|
||||
// state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());
|
||||
|
Reference in New Issue
Block a user