Ticket #1183 - move message serialization later in the SSU pipeline

This commit is contained in:
zab2
2014-01-27 14:47:58 +00:00
parent 6b16907e40
commit 5d322245d8
3 changed files with 31 additions and 10 deletions

View File

@ -1,3 +1,8 @@
2014-01-27 zab
* Move message serialization later in the SSU sending pipeline
(Ticket #1183)
* Up version to -5
2014-01-27 zzz 2014-01-27 zzz
* i2ptunnel HTTP Proxy: Fix default enable for outproxy plugin * i2ptunnel HTTP Proxy: Fix default enable for outproxy plugin
@ -13,7 +18,7 @@
2014-01-25 zab 2014-01-25 zab
* Move OutNetMessage buffer preparation to the Writer threads * Move OutNetMessage buffer preparation to the Writer threads
(Ticket #1184) (Ticket #1183)
* Up version to -1 * Up version to -1
* 2014-01-22 0.9.10 released * 2014-01-22 0.9.10 released

View File

@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */ /** deprecated */
public final static String ID = "Monotone"; public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION; public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 4; public final static long BUILD = 5;
/** for example "-test" */ /** for example "-test" */
public final static String EXTRA = ""; public final static String EXTRA = "";

View File

@ -20,6 +20,7 @@ class OutboundMessageState implements CDPQEntry {
private final Log _log; private final Log _log;
/** may be null if we are part of the establishment */ /** may be null if we are part of the establishment */
private final OutNetMessage _message; private final OutNetMessage _message;
private final I2NPMessage _i2npMessage;
private final long _messageId; private final long _messageId;
/** will be null, unless we are part of the establishment */ /** will be null, unless we are part of the establishment */
private final PeerState _peer; private final PeerState _peer;
@ -28,7 +29,7 @@ class OutboundMessageState implements CDPQEntry {
/** fixed fragment size across the message */ /** fixed fragment size across the message */
private int _fragmentSize; private int _fragmentSize;
/** size of the I2NP message */ /** size of the I2NP message */
private final int _totalSize; private int _totalSize;
/** sends[i] is how many times the fragment has been sent, or -1 if ACKed */ /** sends[i] is how many times the fragment has been sent, or -1 if ACKed */
private short _fragmentSends[]; private short _fragmentSends[];
private final long _startedOn; private final long _startedOn;
@ -89,13 +90,9 @@ class OutboundMessageState implements CDPQEntry {
_context = context; _context = context;
_log = _context.logManager().getLog(OutboundMessageState.class); _log = _context.logManager().getLog(OutboundMessageState.class);
_message = m; _message = m;
_i2npMessage = msg;
_peer = peer; _peer = peer;
int size = msg.getRawMessageSize();
acquireBuf(size);
_totalSize = msg.toRawByteArray(_messageBuf.getData());
_messageBuf.setValid(_totalSize);
_messageId = msg.getUniqueId(); _messageId = msg.getUniqueId();
_startedOn = _context.clock().now(); _startedOn = _context.clock().now();
_nextSendTime = _startedOn; _nextSendTime = _startedOn;
_expiration = _startedOn + EXPIRATION; _expiration = _startedOn + EXPIRATION;
@ -105,6 +102,18 @@ class OutboundMessageState implements CDPQEntry {
// _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len)); // _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
} }
/**
* lazily inits the message buffer unless already inited
*/
private synchronized void initBuf() {
if (_messageBuf != null)
return;
final int size = _i2npMessage.getRawMessageSize();
acquireBuf(size);
_totalSize = _i2npMessage.toRawByteArray(_messageBuf.getData());
_messageBuf.setValid(_totalSize);
}
/** /**
* @throws IAE if too big * @throws IAE if too big
* @since 0.9.3 * @since 0.9.3
@ -175,7 +184,7 @@ class OutboundMessageState implements CDPQEntry {
return true; return true;
} }
public int getUnackedSize() { public synchronized int getUnackedSize() {
short fragmentSends[] = _fragmentSends; short fragmentSends[] = _fragmentSends;
ByteArray messageBuf = _messageBuf; ByteArray messageBuf = _messageBuf;
int rv = 0; int rv = 0;
@ -288,6 +297,7 @@ class OutboundMessageState implements CDPQEntry {
public void fragment(int fragmentSize) { public void fragment(int fragmentSize) {
if (_fragmentSends != null) if (_fragmentSends != null)
throw new IllegalStateException(); throw new IllegalStateException();
initBuf();
int numFragments = _totalSize / fragmentSize; int numFragments = _totalSize / fragmentSize;
if (numFragments * fragmentSize < _totalSize) if (numFragments * fragmentSize < _totalSize)
numFragments++; numFragments++;
@ -335,6 +345,11 @@ class OutboundMessageState implements CDPQEntry {
*/ */
public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; } public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; }
/**
* This assumes fragment(int size) has been called
* @param fragmentNum the number of the fragment
* @return the size of the fragment specified by the number
*/
public int fragmentSize(int fragmentNum) { public int fragmentSize(int fragmentNum) {
if (_messageBuf == null) return -1; if (_messageBuf == null) return -1;
if (fragmentNum + 1 == _fragmentSends.length) { if (fragmentNum + 1 == _fragmentSends.length) {
@ -351,7 +366,8 @@ class OutboundMessageState implements CDPQEntry {
/** /**
* Write a part of the the message onto the specified buffer. * Write a part of the the message onto the specified buffer.
* See releaseResources() above for synchhronization information. * See releaseResources() above for synchronization information.
* This assumes fragment(int size) has been called.
* *
* @param out target to write * @param out target to write
* @param outOffset into outOffset to begin writing * @param outOffset into outOffset to begin writing