* mark the input stream as closed after receiving the packet's data

* properly close the source file in StreamSinkSend
* always adjust the rtt on ack, not just for packets with 1 send
* handle dup SYN gracefully
* revamp the default connection options
* logging
This commit is contained in:
jrandom
2004-10-25 20:04:07 +00:00
committed by zzz
parent 87898dd2f1
commit d592936873
8 changed files with 80 additions and 33 deletions

View File

@ -59,7 +59,7 @@ public class StreamSinkSend {
_log.error("Peer destination is not valid in " + _peerDestFile, dfe); _log.error("Peer destination is not valid in " + _peerDestFile, dfe);
return; return;
} finally { } finally {
if (fis == null) try { fis.close(); } catch (IOException ioe) {} if (fis != null) try { fis.close(); } catch (IOException ioe) {}
} }

View File

@ -152,6 +152,7 @@ public class Connection {
} else { } else {
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
_outboundPackets.put(new Long(packet.getSequenceNum()), packet); _outboundPackets.put(new Long(packet.getSequenceNum()), packet);
_outboundPackets.notifyAll();
} }
SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getRTT()*2); SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getRTT()*2);
} }
@ -241,6 +242,7 @@ public class Connection {
doClose(); doClose();
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
_outboundPackets.clear(); _outboundPackets.clear();
_outboundPackets.notifyAll();
} }
if (removeFromConMgr) if (removeFromConMgr)
_connectionManager.removeConnection(this); _connectionManager.removeConnection(this);

View File

@ -51,13 +51,14 @@ public class ConnectionOptions extends I2PSocketOptions {
} else { } else {
setConnectDelay(2*1000); setConnectDelay(2*1000);
setProfile(PROFILE_BULK); setProfile(PROFILE_BULK);
setMaxMessageSize(32*1024); setMaxMessageSize(Packet.MAX_PAYLOAD_SIZE);
setRTT(5*1000); setRTT(5*1000);
setReceiveWindow(1); setReceiveWindow(1);
setResendDelay(5*1000); setResendDelay(5*1000);
setSendAckDelay(2*1000); setSendAckDelay(1*1000);
setWindowSize(1); setWindowSize(1);
setMaxResends(10); setMaxResends(10);
setWriteTimeout(-1);
} }
} }

View File

@ -29,6 +29,11 @@ public class ConnectionPacketHandler {
boolean ok = verifyPacket(packet, con); boolean ok = verifyPacket(packet, con);
if (!ok) return; if (!ok) return;
boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
// close *after* receiving the data, as well as after verifying the signatures / etc
if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
con.closeReceived();
if (isNew) { if (isNew) {
con.incrementUnackedPacketsReceived(); con.incrementUnackedPacketsReceived();
long nextTime = con.getNextSendTime(); long nextTime = con.getNextSendTime();
@ -66,8 +71,8 @@ public class ConnectionPacketHandler {
for (int i = 0; i < acked.size(); i++) { for (int i = 0; i < acked.size(); i++) {
PacketLocal p = (PacketLocal)acked.get(i); PacketLocal p = (PacketLocal)acked.get(i);
if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) ) { if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) ) {
if (p.getNumSends() <= 1) //if (p.getNumSends() <= 1)
lowestRtt = p.getAckTime(); lowestRtt = p.getAckTime();
} }
if (p.getNumSends() > 1) if (p.getNumSends() > 1)
@ -81,7 +86,7 @@ public class ConnectionPacketHandler {
p.getTagsSent()); p.getTagsSent());
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet acked: " + p); _log.debug("Packet acked after " + p.getAckTime() + "ms: " + p);
} }
if (lowestRtt > 0) { if (lowestRtt > 0) {
int oldRTT = con.getOptions().getRTT(); int oldRTT = con.getOptions().getRTT();
@ -216,9 +221,6 @@ public class ConnectionPacketHandler {
_log.warn("Received unsigned / forged packet: " + packet); _log.warn("Received unsigned / forged packet: " + packet);
return false; return false;
} }
if (packet.isFlagSet(Packet.FLAG_CLOSE)) {
con.closeReceived();
}
} }
return true; return true;
} }

View File

@ -51,6 +51,7 @@ public class MessageInputStream extends InputStream {
private boolean _locallyClosed; private boolean _locallyClosed;
private int _readTimeout; private int _readTimeout;
private IOException _streamError; private IOException _streamError;
private long _readTotal;
private Object _dataLock; private Object _dataLock;
@ -62,6 +63,7 @@ public class MessageInputStream extends InputStream {
_highestReadyBlockId = -1; _highestReadyBlockId = -1;
_highestBlockId = -1; _highestBlockId = -1;
_readTimeout = -1; _readTimeout = -1;
_readTotal = 0;
_notYetReadyBlocks = new HashMap(4); _notYetReadyBlocks = new HashMap(4);
_dataLock = new Object(); _dataLock = new Object();
_closeReceived = false; _closeReceived = false;
@ -149,9 +151,32 @@ public class MessageInputStream extends InputStream {
public void closeReceived() { public void closeReceived() {
synchronized (_dataLock) { synchronized (_dataLock) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Close received, ready size: " + _readyDataBlocks.size() StringBuffer buf = new StringBuffer(128);
+ " not ready: " + _notYetReadyBlocks.size(), new Exception("closed")); buf.append("Close received, ready bytes: ");
long available = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++)
available += ((ByteArray)_readyDataBlocks.get(i)).getData().length;
available -= _readyDataBlockIndex;
buf.append(available);
buf.append(" blocks: ").append(_readyDataBlocks.size());
buf.append(" not ready blocks: ");
long notAvailable = 0;
for (Iterator iter = _notYetReadyBlocks.keySet().iterator(); iter.hasNext(); ) {
Long id = (Long)iter.next();
ByteArray ba = (ByteArray)_notYetReadyBlocks.get(id);
buf.append(id).append(" ");
if (ba.getData() != null)
notAvailable += ba.getData().length;
}
buf.append("not ready bytes: ").append(notAvailable);
buf.append(" highest ready block: ").append(_highestReadyBlockId);
_log.debug(buf.toString(), new Exception("closed"));
}
_closeReceived = true; _closeReceived = true;
_dataLock.notifyAll(); _dataLock.notifyAll();
} }
@ -182,15 +207,17 @@ public class MessageInputStream extends InputStream {
_readyDataBlocks.add(new ByteArray(payload)); _readyDataBlocks.add(new ByteArray(payload));
} }
_highestReadyBlockId = messageId; _highestReadyBlockId = messageId;
long cur = _highestReadyBlockId + 1;
// now pull in any previously pending blocks // now pull in any previously pending blocks
while (_notYetReadyBlocks.containsKey(new Long(_highestReadyBlockId + 1))) { while (_notYetReadyBlocks.containsKey(new Long(cur))) {
ByteArray ba = (ByteArray)_notYetReadyBlocks.get(new Long(_highestReadyBlockId + 1)); ByteArray ba = (ByteArray)_notYetReadyBlocks.remove(new Long(cur));
if ( (ba != null) && (ba.getData() != null) && (ba.getData().length > 0) ) { if ( (ba != null) && (ba.getData() != null) && (ba.getData().length > 0) ) {
_readyDataBlocks.add(ba); _readyDataBlocks.add(ba);
} }
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("making ready the block " + _highestReadyBlockId); _log.debug("making ready the block " + cur);
cur++;
_highestReadyBlockId++; _highestReadyBlockId++;
} }
_dataLock.notifyAll(); _dataLock.notifyAll();
@ -219,7 +246,7 @@ public class MessageInputStream extends InputStream {
if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) { if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read() got EOF: " + toString()); _log.debug("read() got EOF after " + _readTotal + " " + toString());
return -1; return -1;
} else { } else {
if (_readTimeout < 0) { if (_readTimeout < 0) {
@ -259,6 +286,7 @@ public class MessageInputStream extends InputStream {
_readyDataBlockIndex = 0; _readyDataBlockIndex = 0;
_readyDataBlocks.remove(0); _readyDataBlocks.remove(0);
} }
_readTotal++;
return (rv < 0 ? rv + 256 : rv); return (rv < 0 ? rv + 256 : rv);
} }
} }

View File

@ -18,6 +18,7 @@ public class MessageOutputStream extends OutputStream {
private DataReceiver _dataReceiver; private DataReceiver _dataReceiver;
private IOException _streamError; private IOException _streamError;
private boolean _closed; private boolean _closed;
private long _written;
public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) {
this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE); this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE);
@ -29,6 +30,7 @@ public class MessageOutputStream extends OutputStream {
_buf = new byte[bufSize]; _buf = new byte[bufSize];
_dataReceiver = receiver; _dataReceiver = receiver;
_dataLock = new Object(); _dataLock = new Object();
_written = 0;
_closed = false; _closed = false;
} }
@ -48,6 +50,7 @@ public class MessageOutputStream extends OutputStream {
System.arraycopy(b, cur, _buf, _valid, remaining); System.arraycopy(b, cur, _buf, _valid, remaining);
_valid += remaining; _valid += remaining;
cur += remaining; cur += remaining;
_written += remaining;
remaining = 0; remaining = 0;
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining); // _log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining);
@ -68,6 +71,7 @@ public class MessageOutputStream extends OutputStream {
_dataReceiver.writeData(_buf, 0, _valid); _dataReceiver.writeData(_buf, 0, _valid);
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining); // _log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining);
_written += _valid;
_valid = 0; _valid = 0;
throwAnyError(); throwAnyError();
} }
@ -89,9 +93,11 @@ public class MessageOutputStream extends OutputStream {
// also throws InterruptedIOException if the write timeout // also throws InterruptedIOException if the write timeout
// expires // expires
_dataReceiver.writeData(_buf, 0, _valid); _dataReceiver.writeData(_buf, 0, _valid);
_written += _valid;
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("flush(): valid = " + _valid + " complete"); // _log.debug("flush(): valid = " + _valid + " complete");
_valid = 0; _valid = 0;
_dataLock.notifyAll();
} }
throwAnyError(); throwAnyError();
} }
@ -99,6 +105,7 @@ public class MessageOutputStream extends OutputStream {
public void close() throws IOException { public void close() throws IOException {
_closed = true; _closed = true;
flush(); flush();
_log.debug("Output stream closed after writing " + _written);
} }
public boolean getClosed() { return _closed; } public boolean getClosed() { return _closed; }
@ -123,7 +130,9 @@ public class MessageOutputStream extends OutputStream {
void flushAvailable(DataReceiver target) throws IOException { void flushAvailable(DataReceiver target) throws IOException {
synchronized (_dataLock) { synchronized (_dataLock) {
target.writeData(_buf, 0, _valid); target.writeData(_buf, 0, _valid);
_written += _valid;
_valid = 0; _valid = 0;
_dataLock.notifyAll();
} }
} }

View File

@ -116,14 +116,6 @@ public class PacketHandler {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Echo packet received with no stream IDs: " + packet); _log.warn("Echo packet received with no stream IDs: " + packet);
} }
} else if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
if (sendId == null) {
// this is the initial SYN to establish a connection
_manager.getConnectionHandler().receiveNewSyn(packet);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Syn packet reply on a stream we don't know about: " + packet);
}
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received on an unknown stream (and not a SYN): " + packet); _log.debug("Packet received on an unknown stream (and not a SYN): " + packet);
@ -145,16 +137,21 @@ public class PacketHandler {
} }
} }
} }
if (_log.shouldLog(Log.WARN)) {
StringBuffer buf = new StringBuffer(128); if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
Set cons = _manager.listConnections(); _manager.getConnectionHandler().receiveNewSyn(packet);
for (Iterator iter = cons.iterator(); iter.hasNext(); ) { } else {
Connection con = (Connection)iter.next(); if (_log.shouldLog(Log.WARN)) {
buf.append(Base64.encode(con.getReceiveStreamId())).append(" "); StringBuffer buf = new StringBuffer(128);
Set cons = _manager.listConnections();
for (Iterator iter = cons.iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
buf.append(Base64.encode(con.getReceiveStreamId())).append(" ");
}
_log.warn("Packet belongs to no other cons: " + packet + " connections: "
+ buf.toString() + " sendId: "
+ (sendId != null ? Base64.encode(sendId) : " unknown"));
} }
_log.warn("Packet belongs to no other cons: " + packet + " connections: "
+ buf.toString() + " sendId: "
+ (sendId != null ? Base64.encode(sendId) : " unknown"));
} }
} }
} }

View File

@ -68,4 +68,12 @@ public class PacketLocal extends Packet {
} }
public int getNumSends() { return _numSends; } public int getNumSends() { return _numSends; }
public long getLastSend() { return _lastSend; } public long getLastSend() { return _lastSend; }
public String toString() {
String str = super.toString();
if (_ackOn > 0)
return str + " ack after " + getAckTime();
else
return str;
}
} }