* revamped locking to block on flush and close until all of the

packets through that point have been ACKed, throwing an
  InterruptedIOException if there was a writeTimeout or an IOException
  if the con failed
* revamped the ack/nack field settings to ack as much as possible
* handle some strange timeout/resend errors on connection
* pass 1/2rtt as the packet 'optional delay' field, and use that to
  schedule the ack time (the 'last' messages in a window set the
  optional delay to 0, asking for immediate ack of all received)
* increase the optional delay to 2 bytes (#ms to delay)
* inject random failures and delays if configured to do so in
  PacketHandler.choke
* fix up the window size adjustment (increment on ack, /= 2 on resend)
* use the highest RTT in the new RTT calculation so that we fit more
  in (via SACK)
* fix up the SACK handling (duh)
* revise the resend time calculation
This commit is contained in:
jrandom
2004-10-28 02:03:38 +00:00
committed by zzz
parent 669a8fae15
commit 48cdf17a4f
12 changed files with 454 additions and 160 deletions

View File

@ -92,9 +92,9 @@ public class Connection {
*
* @return true if the packet should be sent
*/
boolean packetSendChoke() {
boolean packetSendChoke(long timeoutMs) {
if (false) return true;
long writeExpire = _options.getWriteTimeout();
long writeExpire = timeoutMs;
while (true) {
long timeLeft = writeExpire - _context.clock().now();
synchronized (_outboundPackets) {
@ -130,7 +130,7 @@ public class Connection {
void sendAvailable() {
// this grabs the data, builds a packet, and queues it up via sendPacket
try {
_outputStream.flushAvailable(_receiver);
_outputStream.flushAvailable(_receiver, false);
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error flushing available", ioe);
@ -149,12 +149,28 @@ public class Connection {
if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) {
ackOnly = true;
if (_log.shouldLog(Log.DEBUG))
_log.debug("No resend for " + packet);
} else {
int remaining = 0;
synchronized (_outboundPackets) {
_outboundPackets.put(new Long(packet.getSequenceNum()), packet);
remaining = _options.getWindowSize() - _outboundPackets.size() ;
_outboundPackets.notifyAll();
}
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getRTT()*2);
if (remaining < 0)
remaining = 0;
if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) {
packet.setOptionalDelay(0);
} else {
int delay = _options.getRTT() / 2;
packet.setOptionalDelay(delay);
_log.debug("Requesting ack delay of " + delay + "ms for packet " + packet);
}
packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resend in " + (_options.getRTT()*2) + " for " + packet);
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), (_options.getRTT()*2 < 5000 ? 5000 : _options.getRTT()*2));
}
_lastSendTime = _context.clock().now();
@ -167,7 +183,7 @@ public class Connection {
// ACKs don't get ACKed, but pings do.
if (packet.getTagsSent().size() > 0) {
_log.warn("Sending a ping since the ACK we just sent has " + packet.getTagsSent().size() + " tags");
_connectionManager.ping(_remotePeer, _options.getRTT()*2, false, packet.getKeyUsed(), packet.getTagsSent());
_connectionManager.ping(_remotePeer, 30*1000, false, packet.getKeyUsed(), packet.getTagsSent());
}
}
}
@ -178,13 +194,17 @@ public class Connection {
for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) {
Long id = (Long)iter.next();
if (id.longValue() <= ackThrough) {
boolean nacked = false;
if (nacks != null) {
// linear search since its probably really tiny
for (int i = 0; i < nacks.length; i++)
if (nacks[i] == id.longValue())
continue; // NACKed
} else {
// ACKed
for (int i = 0; i < nacks.length; i++) {
if (nacks[i] == id.longValue()) {
nacked = true;
break; // NACKed
}
}
}
if (!nacked) { // aka ACKed
if (acked == null)
acked = new ArrayList(1);
PacketLocal ackedPacket = (PacketLocal)_outboundPackets.get(id);
@ -231,16 +251,15 @@ public class Connection {
if (cleanDisconnect) {
// send close packets and schedule stuff...
try {
_outputStream.close();
_inputStream.close();
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
_log.warn("Error on clean disconnect", ioe);
}
_outputStream.closeInternal();
_inputStream.close();
} else {
doClose();
synchronized (_outboundPackets) {
for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
PacketLocal pl = (PacketLocal)iter.next();
pl.cancelled();
}
_outboundPackets.clear();
_outboundPackets.notifyAll();
}
@ -297,10 +316,25 @@ public class Connection {
*/
public long getNextSendTime() { return _nextSendTime; }
public void setNextSendTime(long when) {
if (_nextSendTime > 0)
if (_log.shouldLog(Log.DEBUG))
_log.debug("set next send time to " + (when-_nextSendTime) + "ms after it was before ("+when+")");
_nextSendTime = when;
if (_nextSendTime >= 0) {
if (when < _nextSendTime)
_nextSendTime = when;
} else {
_nextSendTime = when;
}
if (_nextSendTime >= 0) {
long max = _context.clock().now() + _options.getSendAckDelay();
if (max < _nextSendTime)
_nextSendTime = max;
}
if (_log.shouldLog(Log.DEBUG) && false) {
if (_nextSendTime <= 0)
_log.debug("set next send time to an unknown time", new Exception(toString()));
else
_log.debug("set next send time to " + (_nextSendTime-_context.clock().now()) + "ms from now", new Exception(toString()));
}
}
public long getAckedPackets() { return _ackedPackets; }
@ -346,6 +380,12 @@ public class Connection {
buf.append("] ");
}
buf.append("unacked inbound? ").append(getUnackedPacketsReceived());
buf.append(" [high ").append(_inputStream.getHighestBlockId());
long nacks[] = _inputStream.getNacks();
if (nacks != null)
for (int i = 0; i < nacks.length; i++)
buf.append(" ").append(nacks[i]);
buf.append("]");
buf.append("]");
return buf.toString();
}
@ -360,6 +400,8 @@ public class Connection {
}
public void timeReached() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resend period reached for " + _packet);
boolean resend = false;
synchronized (_outboundPackets) {
if (_outboundPackets.containsKey(new Long(_packet.getSequenceNum())))
@ -367,18 +409,26 @@ public class Connection {
}
if ( (resend) && (_packet.getAckTime() < 0) ) {
// revamp various fields, in case we need to ack more, etc
_packet.setAckThrough(getInputStream().getHighestBlockId());
_packet.setNacks(getInputStream().getNacks());
_inputStream.updateAcks(_packet);
_packet.setOptionalDelay(getOptions().getChoke());
_packet.setOptionalMaxSize(getOptions().getMaxMessageSize());
_packet.setResendDelay(getOptions().getResendDelay());
_packet.setReceiveStreamId(_receiveStreamId);
_packet.setSendStreamId(_sendStreamId);
// shrink the window
int newWindowSize = getOptions().getWindowSize();
newWindowSize /= 2;
if (newWindowSize <= 0)
newWindowSize = 1;
getOptions().setWindowSize(newWindowSize);
int numSends = _packet.getNumSends() + 1;
if (_log.shouldLog(Log.WARN))
_log.warn("Resend packet " + _packet + " time " + numSends + " on " + Connection.this);
_log.warn("Resend packet " + _packet + " time " + numSends + " (wsize "
+ newWindowSize + " lifetime "
+ (_context.clock().now() - _packet.getCreatedOn()) + "ms)");
_outboundQueue.enqueue(_packet);
if (numSends > _options.getMaxResends()) {
@ -387,14 +437,15 @@ public class Connection {
disconnect(false);
} else {
//long timeout = _options.getResendDelay() << numSends;
long timeout = _options.getRTT() << numSends;
long timeout = _options.getRTT() << (numSends-1);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling resend in " + timeout + "ms");
_log.debug("Scheduling resend in " + timeout + "ms for " + _packet);
SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout);
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet acked before resend: " + _packet + " on " + Connection.this);
_log.debug("Packet acked before resend (resend="+ resend + "): "
+ _packet + " on " + Connection.this);
}
}
}

View File

@ -1,6 +1,7 @@
package net.i2p.client.streaming;
import java.io.InterruptedIOException;
import java.io.IOException;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
@ -11,16 +12,16 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
private I2PAppContext _context;
private Log _log;
private Connection _connection;
private MessageOutputStream.WriteStatus _dummyStatus;
public ConnectionDataReceiver(I2PAppContext ctx, Connection con) {
_context = ctx;
_log = ctx.logManager().getLog(ConnectionDataReceiver.class);
_connection = con;
_dummyStatus = new DummyStatus();
}
public void writeData(byte[] buf, int off, int size) throws InterruptedIOException {
if (!_connection.packetSendChoke())
throw new InterruptedIOException("Timeout expired waiting to write");
public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) {
boolean doSend = true;
if ( (size <= 0) && (_connection.getLastSendId() >= 0) ) {
if (_connection.getOutputStream().getClosed()) {
@ -45,15 +46,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
+ " con: " + _connection, new Exception("write called by"));
if (doSend) {
send(buf, off, size);
PacketLocal packet = send(buf, off, size);
return packet;
} else {
//_connection.flushPackets();
return _dummyStatus;
}
}
public void send(byte buf[], int off, int size) {
public PacketLocal send(byte buf[], int off, int size) {
PacketLocal packet = buildPacket(buf, off, size);
_connection.sendPacket(packet);
return packet;
}
private boolean isAckOnly(int size) {
@ -67,7 +71,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
private PacketLocal buildPacket(byte buf[], int off, int size) {
boolean ackOnly = isAckOnly(size);
PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer());
PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer(), _connection);
byte data[] = new byte[size];
if (size > 0)
System.arraycopy(buf, off, data, 0, size);
@ -79,8 +83,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
packet.setSendStreamId(_connection.getSendStreamId());
packet.setReceiveStreamId(_connection.getReceiveStreamId());
packet.setAckThrough(_connection.getInputStream().getHighestBlockId());
packet.setNacks(_connection.getInputStream().getNacks());
_connection.getInputStream().updateAcks(packet);
packet.setOptionalDelay(_connection.getOptions().getChoke());
packet.setOptionalMaxSize(_connection.getOptions().getMaxMessageSize());
packet.setResendDelay(_connection.getOptions().getResendDelay());
@ -103,10 +106,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Closed is set for a new packet on " + _connection + ": " + packet);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Closed is not set for a new packet on " + _connection + ": " + packet);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet);
}
return packet;
}
private static final class DummyStatus implements MessageOutputStream.WriteStatus {
public final void waitForAccept(int maxWaitMs) { return; }
public final void waitForCompletion(int maxWaitMs) { return; }
public final boolean writeAccepted() { return true; }
public final boolean writeFailed() { return false; }
public final boolean writeSuccessful() { return true; }
}
}

View File

@ -77,7 +77,20 @@ class ConnectionHandler {
}
if (syn != null) {
return _manager.receiveConnection(syn);
// deal with forged / invalid syn packets
Connection con = _manager.receiveConnection(syn);
if (con != null) {
return con;
} else if (timeoutMs > 0) {
long remaining = expiration - _context.clock().now();
if (remaining <= 0) {
return null;
} else {
return accept(remaining);
}
} else {
return accept(timeoutMs);
}
} else {
return null;
}

View File

@ -7,6 +7,7 @@ import java.util.Map;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.data.ByteArray;
import net.i2p.data.Destination;
@ -90,7 +91,14 @@ public class ConnectionManager {
}
con.setReceiveStreamId(receiveId);
con.getPacketHandler().receivePacket(synPacket, con);
try {
con.getPacketHandler().receivePacket(synPacket, con);
} catch (I2PException ie) {
synchronized (_connectionLock) {
_connectionByInboundId.remove(new ByteArray(receiveId));
}
return null;
}
return con;
}

View File

@ -52,10 +52,10 @@ public class ConnectionOptions extends I2PSocketOptions {
setConnectDelay(2*1000);
setProfile(PROFILE_BULK);
setMaxMessageSize(Packet.MAX_PAYLOAD_SIZE);
setRTT(5*1000);
setRTT(30*1000);
setReceiveWindow(1);
setResendDelay(5*1000);
setSendAckDelay(1*1000);
setSendAckDelay(2*1000);
setWindowSize(1);
setMaxResends(10);
setWriteTimeout(-1);
@ -102,7 +102,11 @@ public class ConnectionOptions extends I2PSocketOptions {
* What to set the round trip time estimate to (in milliseconds)
*/
public int getRTT() { return _rtt; }
public void setRTT(int ms) { _rtt = ms; }
public void setRTT(int ms) {
_rtt = ms;
if (_rtt > 60*1000)
_rtt = 60*1000;
}
/** How long after sending a packet will we wait before resending? */
public int getResendDelay() { return _resendDelay; }

View File

@ -3,6 +3,7 @@ package net.i2p.client.streaming;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.Log;
@ -25,7 +26,7 @@ public class ConnectionPacketHandler {
}
/** distribute a packet to the connection specified */
void receivePacket(Packet packet, Connection con) {
void receivePacket(Packet packet, Connection con) throws I2PException {
boolean ok = verifyPacket(packet, con);
if (!ok) return;
boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
@ -36,15 +37,17 @@ public class ConnectionPacketHandler {
if (isNew) {
con.incrementUnackedPacketsReceived();
long nextTime = con.getNextSendTime();
if (nextTime <= 0) {
con.setNextSendTime(con.getOptions().getSendAckDelay() + _context.clock().now());
if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED) && (packet.getOptionalDelay() <= 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Scheduling ack in " + con.getOptions().getSendAckDelay() + "ms for received packet " + packet);
_log.debug("Scheduling immediate ack for " + packet);
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
} else {
int delay = con.getOptions().getSendAckDelay();
if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) // delayed ACK requested
delay += packet.getOptionalDelay();
con.setNextSendTime(delay + _context.clock().now());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Ack is already scheduled in " + (nextTime-_context.clock().now())
+ "ms, though we just received " + packet);
_log.debug("Scheduling ack in " + delay + "ms for received packet " + packet);
}
} else {
if (packet.getSequenceNum() > 0) {
@ -54,9 +57,15 @@ public class ConnectionPacketHandler {
if (_log.shouldLog(Log.WARN))
_log.warn("congestion.. dup " + packet);
con.incrementUnackedPacketsReceived();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ACK only packet received: " + packet);
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
con.incrementUnackedPacketsReceived();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ACK only packet received: " + packet);
}
}
}
@ -65,14 +74,14 @@ public class ConnectionPacketHandler {
if ( (acked != null) && (acked.size() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(acked.size() + " of our packets acked with " + packet);
// use the lowest RTT, since these would likely be bunched together,
// waiting for the most recent packet received before sending the ACK
int lowestRtt = -1;
// use the highest RTT, since these would likely be bunched together,
// and the highest rtt lets us set our resend delay properly
int highestRTT = -1;
for (int i = 0; i < acked.size(); i++) {
PacketLocal p = (PacketLocal)acked.get(i);
if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) ) {
if (p.getAckTime() > highestRTT) {
//if (p.getNumSends() <= 1)
lowestRtt = p.getAckTime();
highestRTT = p.getAckTime();
}
if (p.getNumSends() > 1)
@ -88,24 +97,25 @@ public class ConnectionPacketHandler {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet acked after " + p.getAckTime() + "ms: " + p);
}
if (lowestRtt > 0) {
if (highestRTT > 0) {
int oldRTT = con.getOptions().getRTT();
int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*lowestRtt);
int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*highestRTT);
con.getOptions().setRTT(newRTT);
}
}
boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends);
boolean fastAck = adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0));
con.eventOccurred();
if (fastAck) {
if (con.getLastSendTime() + con.getOptions().getRTT() < _context.clock().now()) {
_log.error("Fast ack for dup " + packet);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Fast ack for dup " + packet);
con.ackImmediately();
}
}
}
private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends) {
private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked) {
if ( (!isNew) && (sequenceNum > 0) ) {
// dup real packet
int oldSize = con.getOptions().getWindowSize();
@ -115,22 +125,17 @@ public class ConnectionPacketHandler {
con.getOptions().setWindowSize(oldSize);
return true;
} else if (numResends > 0) {
int newWindowSize = con.getOptions().getWindowSize();
newWindowSize /= 2; // >>>= numResends;
if (newWindowSize <= 0)
newWindowSize = 1;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Shrink the window to " + newWindowSize + " (#resends: " + numResends
+ ") for " + con);
con.getOptions().setWindowSize(newWindowSize);
// window sizes are shrunk on resend, not on ack
} else {
// new packet that ack'ed uncongested data, or an empty ack
int newWindowSize = con.getOptions().getWindowSize();
newWindowSize += 1; //acked.size();
if (_log.shouldLog(Log.DEBUG))
_log.debug("New window size " + newWindowSize + " (#resends: " + numResends
+ ") for " + con);
con.getOptions().setWindowSize(newWindowSize);
if (acked > 0) {
// new packet that ack'ed uncongested data, or an empty ack
int newWindowSize = con.getOptions().getWindowSize();
newWindowSize += 1; // acked; // 1
if (_log.shouldLog(Log.DEBUG))
_log.debug("New window size " + newWindowSize + " (#resends: " + numResends
+ ") for " + con);
con.getOptions().setWindowSize(newWindowSize);
}
}
return false;
}
@ -141,12 +146,12 @@ public class ConnectionPacketHandler {
* @return true if the packet is ok for this connection, false if we shouldn't
* continue processing.
*/
private boolean verifyPacket(Packet packet, Connection con) {
private boolean verifyPacket(Packet packet, Connection con) throws I2PException {
if (packet.isFlagSet(Packet.FLAG_RESET)) {
verifyReset(packet, con);
return false;
} else {
boolean sigOk = verifySignature(packet, con);
verifySignature(packet, con);
if (con.getSendStreamId() == null) {
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
@ -204,9 +209,9 @@ public class ConnectionPacketHandler {
/**
* Verify the signature if necessary.
*
* @return false only if the signature was required and it was invalid
* @throws I2PException if the signature was necessary and it was invalid
*/
private boolean verifySignature(Packet packet, Connection con) {
private void verifySignature(Packet packet, Connection con) throws I2PException {
// verify the signature if necessary
if (con.getOptions().getRequireFullySigned() ||
packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) ||
@ -217,11 +222,8 @@ public class ConnectionPacketHandler {
from = packet.getOptionalFrom();
boolean sigOk = packet.verifySignature(_context, from, null);
if (!sigOk) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received unsigned / forged packet: " + packet);
return false;
throw new I2PException("Received unsigned / forged packet: " + packet);
}
}
return true;
}
}

View File

@ -90,17 +90,20 @@ public class MessageInputStream extends InputStream {
*
*/
public long[] getNacks() {
List ids = null;
synchronized (_dataLock) {
for (long i = _highestReadyBlockId + 1; i < _highestBlockId; i++) {
Long l = new Long(i);
if (_notYetReadyBlocks.containsKey(l)) {
// ACK
} else {
if (ids == null)
ids = new ArrayList(4);
ids.add(l);
}
return locked_getNacks();
}
}
private long[] locked_getNacks() {
List ids = null;
for (long i = _highestReadyBlockId + 1; i < _highestBlockId; i++) {
Long l = new Long(i);
if (_notYetReadyBlocks.containsKey(l)) {
// ACK
} else {
if (ids == null)
ids = new ArrayList(4);
ids.add(l);
}
}
if (ids != null) {
@ -113,6 +116,13 @@ public class MessageInputStream extends InputStream {
}
}
public void updateAcks(PacketLocal packet) {
synchronized (_dataLock) {
packet.setAckThrough(_highestBlockId);
packet.setNacks(locked_getNacks());
}
}
/**
* Ascending list of block IDs greater than the highest
* ready block ID, or null if there aren't any.

View File

@ -1,6 +1,7 @@
package net.i2p.client.streaming;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import net.i2p.I2PAppContext;
@ -19,6 +20,7 @@ public class MessageOutputStream extends OutputStream {
private IOException _streamError;
private boolean _closed;
private long _written;
private int _writeTimeout;
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
@ -32,8 +34,12 @@ public class MessageOutputStream extends OutputStream {
_dataLock = new Object();
_written = 0;
_closed = false;
_writeTimeout = -1;
}
public void setWriteTimeout(int ms) { _writeTimeout = ms; }
public int getWriteTimeout() { return _writeTimeout; }
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
@ -41,10 +47,15 @@ public class MessageOutputStream extends OutputStream {
public void write(byte b[], int off, int len) throws IOException {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("write(b[], " + off + ", " + len + ")");
synchronized (_dataLock) {
int cur = off;
int remaining = len;
while (remaining > 0) {
int cur = off;
int remaining = len;
while (remaining > 0) {
WriteStatus ws = null;
// we do any waiting outside the synchronized() block because we
// want to allow other threads to flushAvailable() whenever they want.
// this is the only method that *adds* to the _buf, and all
// code that reads from it is synchronized
synchronized (_dataLock) {
if (_valid + remaining < _buf.length) {
// simply buffer the data, no flush
System.arraycopy(b, cur, _buf, _valid, remaining);
@ -52,8 +63,6 @@ public class MessageOutputStream extends OutputStream {
cur += remaining;
_written += remaining;
remaining = 0;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining);
} else {
// buffer whatever we can fit then flush,
// repeating until we've pushed all of the
@ -63,19 +72,24 @@ public class MessageOutputStream extends OutputStream {
remaining -= toWrite;
cur += toWrite;
_valid = _buf.length;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("write(...): flushing valid = " + _valid + " remaining=" + remaining);
// this blocks until the packet is ack window is open. it
// also throws InterruptedIOException if the write timeout
// expires
_dataReceiver.writeData(_buf, 0, _valid);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining);
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
throwAnyError();
}
}
if (ws != null) {
// ok, we've actually added a new packet - lets wait until
// its accepted into the queue before moving on (so that we
// dont fill our buffer instantly)
ws.waitForAccept(_writeTimeout);
if (!ws.writeAccepted()) {
if (_writeTimeout > 0)
throw new InterruptedIOException("Write not accepted within timeout");
else
throw new IOException("Write not accepted into the queue");
}
}
}
throwAnyError();
}
@ -86,19 +100,19 @@ public class MessageOutputStream extends OutputStream {
}
public void flush() throws IOException {
WriteStatus ws = null;
synchronized (_dataLock) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("flush(): valid = " + _valid);
// this blocks until the packet is ack window is open. it
// also throws InterruptedIOException if the write timeout
// expires
_dataReceiver.writeData(_buf, 0, _valid);
ws = _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("flush(): valid = " + _valid + " complete");
_valid = 0;
_dataLock.notifyAll();
}
ws.waitForCompletion(_writeTimeout);
if (ws.writeFailed() && (_writeTimeout > 0) )
throw new InterruptedIOException("Timed out during write");
else if (ws.writeFailed())
throw new IOException("Write failed");
throwAnyError();
}
@ -107,6 +121,19 @@ public class MessageOutputStream extends OutputStream {
flush();
_log.debug("Output stream closed after writing " + _written);
}
public void closeInternal() {
_closed = true;
_streamError = new IOException("Closed internally");
synchronized (_dataLock) {
// flush any data, but don't wait for it
if (_valid > 0) {
_dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
}
_dataLock.notifyAll();
}
}
public boolean getClosed() { return _closed; }
@ -126,17 +153,49 @@ public class MessageOutputStream extends OutputStream {
* called whenever the engine wants to push more data to the
* peer
*
* @return true if the data was flushed
*/
void flushAvailable(DataReceiver target) throws IOException {
flushAvailable(target, true);
}
void flushAvailable(DataReceiver target, boolean blocking) throws IOException {
WriteStatus ws = null;
synchronized (_dataLock) {
target.writeData(_buf, 0, _valid);
ws = target.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0;
_dataLock.notifyAll();
}
if (blocking) {
ws.waitForAccept(_writeTimeout);
if (ws.writeFailed())
throw new IOException("Flush available failed");
else if (!ws.writeAccepted())
throw new InterruptedIOException("Flush available timed out");
}
return;
}
public interface DataReceiver {
public void writeData(byte buf[], int off, int size) throws IOException;
/**
* Nonblocking write
*/
public WriteStatus writeData(byte buf[], int off, int size);
}
public interface WriteStatus {
/** wait until the data written either fails or succeeds */
public void waitForCompletion(int maxWaitMs);
/**
* wait until the data written is accepted into the outbound pool,
* which we throttle rather than accept arbitrary data and queue
*/
public void waitForAccept(int maxWaitMs);
/** was the write accepted? aka did the socket not close? */
public boolean writeAccepted();
/** did the write fail? */
public boolean writeFailed();
/** did the write succeed? */
public boolean writeSuccessful();
}
}

View File

@ -133,6 +133,7 @@ public class Packet {
public static final int FLAG_ECHO = (1 << 9);
public static final int DEFAULT_MAX_SIZE = 32*1024;
private static final int MAX_DELAY_REQUEST = 65535;
/** what stream is this packet a part of? */
public byte[] getSendStreamId() {
@ -238,7 +239,12 @@ public class Packet {
public int getOptionalDelay() { return _optionDelay; }
public void setOptionalDelay(int delayMs) {
setFlag(FLAG_DELAY_REQUESTED, delayMs > 0);
_optionDelay = delayMs;
if (delayMs > MAX_DELAY_REQUEST)
_optionDelay = MAX_DELAY_REQUEST;
else if (delayMs < 0)
_optionDelay = 0;
else
_optionDelay = delayMs;
}
/**
@ -298,7 +304,7 @@ public class Packet {
int optionSize = 0;
if (isFlagSet(FLAG_DELAY_REQUESTED))
optionSize += 1;
optionSize += 2;
if (isFlagSet(FLAG_FROM_INCLUDED))
optionSize += _optionFrom.size();
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED))
@ -310,8 +316,8 @@ public class Packet {
cur += 2;
if (isFlagSet(FLAG_DELAY_REQUESTED)) {
DataHelper.toLong(buffer, cur, 1, _optionDelay > 0 ? _optionDelay : 0);
cur++;
DataHelper.toLong(buffer, cur, 2, _optionDelay > 0 ? _optionDelay : 0);
cur += 2;
}
if (isFlagSet(FLAG_FROM_INCLUDED)) {
cur += _optionFrom.writeBytes(buffer, cur);
@ -361,7 +367,7 @@ public class Packet {
size += 2; // flags
if (isFlagSet(FLAG_DELAY_REQUESTED))
size += 1;
size += 2;
if (isFlagSet(FLAG_FROM_INCLUDED))
size += _optionFrom.size();
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED))
@ -428,8 +434,8 @@ public class Packet {
// ok now lets go back and deal with the options
if (isFlagSet(FLAG_DELAY_REQUESTED)) {
_optionDelay = (int)DataHelper.fromLong(buffer, cur, 1);
cur++;
_optionDelay = (int)DataHelper.fromLong(buffer, cur, 2);
cur += 2;
}
if (isFlagSet(FLAG_FROM_INCLUDED)) {
_optionFrom = new Destination();
@ -458,10 +464,20 @@ public class Packet {
if (!isFlagSet(FLAG_SIGNATURE_INCLUDED)) return false;
if (_optionSignature == null) return false;
int size = writtenSize();
if (buffer == null)
buffer = new byte[writtenSize()];
int size = writePacket(buffer, 0, false);
return ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey());
buffer = new byte[size];
int written = writePacket(buffer, 0, false);
if (written != size) {
ctx.logManager().getLog(Packet.class).error("Written " + written + " size " + size + " for " + toString(), new Exception("moo"));
return false;
}
boolean ok = ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey());
if (!ok) {
ctx.logManager().getLog(Packet.class).error("Signature failed with sig " + Base64.encode(_optionSignature.getData()), new Exception("moo"));
}
return ok;
}
/**
@ -485,7 +501,7 @@ public class Packet {
+ 1 // resendDelay
+ 2 // flags
+ 2 // optionSize
+ (isFlagSet(FLAG_DELAY_REQUESTED) ? 1 : 0)
+ (isFlagSet(FLAG_DELAY_REQUESTED) ? 2 : 0)
+ (isFlagSet(FLAG_FROM_INCLUDED) ? _optionFrom.size() : 0)
+ (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED) ? 2 : 0);
System.arraycopy(_optionSignature.getData(), 0, buffer, signatureOffset, Signature.SIGNATURE_BYTES);
@ -497,7 +513,11 @@ public class Packet {
buf.append(toId(_sendStreamId));
//buf.append("<-->");
buf.append(toId(_receiveStreamId)).append(": #").append(_sequenceNum);
buf.append(" ").append(toFlagString());
if (_sequenceNum < 10)
buf.append(" \t"); // so the tab lines up right
else
buf.append('\t');
buf.append(toFlagString());
buf.append(" ACK ").append(_ackThrough);
if (_nacks != null) {
buf.append(" NACK");
@ -520,7 +540,7 @@ public class Packet {
private final String toFlagString() {
StringBuffer buf = new StringBuffer(32);
if (isFlagSet(FLAG_CLOSE)) buf.append(" CLOSE");
if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY");
if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY ").append(_optionDelay);
if (isFlagSet(FLAG_ECHO)) buf.append(" ECHO");
if (isFlagSet(FLAG_FROM_INCLUDED)) buf.append(" FROM");
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MS");

View File

@ -6,9 +6,11 @@ import java.util.Set;
import java.text.SimpleDateFormat;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/**
* receive a packet and dispatch it correctly to the connection specified,
@ -19,35 +21,71 @@ public class PacketHandler {
private ConnectionManager _manager;
private I2PAppContext _context;
private Log _log;
private int _lastDelay;
public PacketHandler(I2PAppContext ctx, ConnectionManager mgr) {
_manager = mgr;
_context = ctx;
_log = ctx.logManager().getLog(PacketHandler.class);
_lastDelay = _context.random().nextInt(30*1000);
}
private boolean choke(Packet packet) {
if (false) {
// artificial choke: 2% random drop and a 1s
// random delay
// artificial choke: 2% random drop and a 0-30s
// random tiered delay from 0-30s
if (_context.random().nextInt(100) >= 98) {
_log.error("DROP: " + packet);
displayPacket(packet, "DROP");
return false;
} else {
int delay = _context.random().nextInt(1000);
try { Thread.sleep(delay); } catch (InterruptedException ie) {}
_log.debug("OK : " + packet + " delay = " + delay);
return true;
/*
int delay = _context.random().nextInt(5*1000);
*/
int delay = _context.random().nextInt(6*1000);
int delayFactor = _context.random().nextInt(100);
if (delayFactor > 80) {
if (delayFactor > 98)
delay *= 5;
else if (delayFactor > 95)
delay *= 4;
else if (delayFactor > 90)
delay *= 3;
else
delay *= 2;
}
if (_context.random().nextInt(100) >= 20)
delay = _lastDelay;
_lastDelay = delay;
SimpleTimer.getInstance().addEvent(new Reinject(packet, delay), delay);
return false;
}
} else {
return true;
}
}
private class Reinject implements SimpleTimer.TimedEvent {
private Packet _packet;
private int _delay;
public Reinject(Packet packet, int delay) {
_packet = packet;
_delay = delay;
}
public void timeReached() {
_log.debug("Reinjecting after " + _delay + ": " + _packet);
receivePacketDirect(_packet);
}
}
void receivePacket(Packet packet) {
boolean ok = choke(packet);
if (!ok) return;
if (ok)
receivePacketDirect(packet);
}
private void receivePacketDirect(Packet packet) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("packet received: " + packet);
@ -58,21 +96,20 @@ public class PacketHandler {
Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null);
if (con != null) {
receiveKnownCon(con, packet);
displayPacket(packet, con);
displayPacket(packet, "RECV");
} else {
receiveUnknownCon(packet, sendId);
displayPacket(packet, null);
displayPacket(packet, "UNKN");
}
}
private void displayPacket(Packet packet, Connection con) {
if (_log.shouldLog(Log.DEBUG)) {
SimpleDateFormat fmt = new SimpleDateFormat("hh:mm:ss.SSS");
String now = fmt.format(new Date());
String msg = packet + (con != null ? " on " + con : " on unknown con");
//_log.debug(msg);
System.out.println(now + ": " + msg);
private static final SimpleDateFormat _fmt = new SimpleDateFormat("hh:mm:ss.SSS");
static void displayPacket(Packet packet, String prefix) {
String msg = null;
synchronized (_fmt) {
msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString();
}
System.out.println(msg);
}
private void receiveKnownCon(Connection con, Packet packet) {
@ -81,19 +118,36 @@ public class PacketHandler {
// the packet's receive stream ID also matches what we expect
if (_log.shouldLog(Log.DEBUG))
_log.debug("receive valid: " + packet);
con.getPacketHandler().receivePacket(packet, con);
try {
con.getPacketHandler().receivePacket(packet, con);
} catch (I2PException ie) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received forged packet for " + con, ie);
}
} else {
if (packet.isFlagSet(Packet.FLAG_RESET)) {
// refused
if (_log.shouldLog(Log.DEBUG))
_log.debug("receive reset: " + packet);
con.getPacketHandler().receivePacket(packet, con);
try {
con.getPacketHandler().receivePacket(packet, con);
} catch (I2PException ie) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received forged reset for " + con, ie);
}
} else if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
if ( (con.getSendStreamId() == null) ||
(DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) ) {
byte oldId[] =con.getSendStreamId();
// con fully established, w00t
con.setSendStreamId(packet.getReceiveStreamId());
con.getPacketHandler().receivePacket(packet, con);
try {
con.getPacketHandler().receivePacket(packet, con);
} catch (I2PException ie) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received forged syn for " + con, ie);
con.setSendStreamId(oldId);
}
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive a syn packet with the wrong IDs: " + packet);
@ -146,7 +200,7 @@ public class PacketHandler {
Set cons = _manager.listConnections();
for (Iterator iter = cons.iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
buf.append(Base64.encode(con.getReceiveStreamId())).append(" ");
buf.append(con.toString()).append(" ");
}
_log.warn("Packet belongs to no other cons: " + packet + " connections: "
+ buf.toString() + " sendId: "

View File

@ -10,21 +10,29 @@ import net.i2p.data.SessionKey;
* coordinate local attributes about a packet - send time, ack time, number of
* retries, etc.
*/
public class PacketLocal extends Packet {
public class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
private I2PAppContext _context;
private Connection _connection;
private Destination _to;
private SessionKey _keyUsed;
private Set _tagsSent;
private long _createdOn;
private int _numSends;
private long _lastSend;
private long _acceptedOn;
private long _ackOn;
private long _cancelledOn;
public PacketLocal(I2PAppContext ctx, Destination to) {
this(ctx, to, null);
}
public PacketLocal(I2PAppContext ctx, Destination to, Connection con) {
_context = ctx;
_createdOn = ctx.clock().now();
_to = to;
_connection = con;
_lastSend = -1;
_cancelledOn = -1;
}
public Destination getTo() { return _to; }
@ -50,15 +58,31 @@ public class PacketLocal extends Packet {
isFlagSet(FLAG_CLOSE);
}
/** last minute update of ack fields, just before write/sign */
public void prepare() {
if (_connection != null)
_connection.getInputStream().updateAcks(this);
}
public long getCreatedOn() { return _createdOn; }
public void incrementSends() {
_numSends++;
_lastSend = _context.clock().now();
}
public void ackReceived() {
if (_ackOn <= 0)
_ackOn = _context.clock().now();
synchronized (this) {
if (_ackOn <= 0)
_ackOn = _context.clock().now();
notifyAll();
}
}
public void cancelled() {
synchronized (this) {
_cancelledOn = _context.clock().now();
notifyAll();
}
}
/** how long after packet creation was it acked? */
public int getAckTime() {
if (_ackOn <= 0)
@ -68,6 +92,7 @@ public class PacketLocal extends Packet {
}
public int getNumSends() { return _numSends; }
public long getLastSend() { return _lastSend; }
public Connection getConnection() { return _connection; }
public String toString() {
String str = super.toString();
@ -76,4 +101,32 @@ public class PacketLocal extends Packet {
else
return str;
}
public void waitForAccept(int maxWaitMs) {
if (_connection == null)
throw new IllegalStateException("Cannot wait for accept with no connection");
long expiration = _context.clock().now()+maxWaitMs;
boolean accepted = _connection.packetSendChoke(maxWaitMs);
if (accepted)
_acceptedOn = _context.clock().now();
else
_acceptedOn = -1;
}
public void waitForCompletion(int maxWaitMs) {
long expiration = _context.clock().now()+maxWaitMs;
while ((maxWaitMs <= 0) || (expiration < _context.clock().now())) {
synchronized (this) {
if (_ackOn > 0)
return;
if (_cancelledOn > 0)
return;
try { wait(); } catch (InterruptedException ie) {}
}
}
}
public boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; }
public boolean writeFailed() { return _cancelledOn > 0; }
public boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; }
}

View File

@ -29,6 +29,7 @@ class PacketQueue {
* Add a new packet to be sent out ASAP
*/
public void enqueue(PacketLocal packet) {
packet.prepare();
int size = 0;
if (packet.shouldSign())
size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey());
@ -42,8 +43,12 @@ class PacketQueue {
if (tagsSent == null)
tagsSent = new HashSet();
try {
// cache this from before sendMessage
String conStr = packet.getConnection() + "";
// this should not block!
long begin = _context.clock().now();
boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent);
long end = _context.clock().now();
if (!sent) {
if (_log.shouldLog(Log.WARN))
_log.warn("Send failed for " + packet);
@ -55,13 +60,17 @@ class PacketQueue {
String msg = "SEND " + packet + (tagsSent.size() > 0
? " with " + tagsSent.size() + " tags"
: "")
+ " send # " + packet.getNumSends();
+ " send # " + packet.getNumSends()
+ " sendTime: " + (end-begin)
+ " con: " + conStr;
_log.debug(msg);
}
PacketHandler.displayPacket(packet, "SEND");
}
} catch (I2PSessionException ise) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to send the packet " + packet, ise);
}
}
}