diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkSend.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkSend.java index f56c4c1b2..e7398e5b6 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkSend.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkSend.java @@ -59,7 +59,7 @@ public class StreamSinkSend { _log.error("Peer destination is not valid in " + _peerDestFile, dfe); return; } finally { - if (fis == null) try { fis.close(); } catch (IOException ioe) {} + if (fis != null) try { fis.close(); } catch (IOException ioe) {} } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index b5280ecf7..ffa840f42 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -152,6 +152,7 @@ public class Connection { } else { synchronized (_outboundPackets) { _outboundPackets.put(new Long(packet.getSequenceNum()), packet); + _outboundPackets.notifyAll(); } SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getRTT()*2); } @@ -241,6 +242,7 @@ public class Connection { doClose(); synchronized (_outboundPackets) { _outboundPackets.clear(); + _outboundPackets.notifyAll(); } if (removeFromConMgr) _connectionManager.removeConnection(this); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 190d280e8..e72926d6f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -51,13 +51,14 @@ public class ConnectionOptions extends I2PSocketOptions { } else { setConnectDelay(2*1000); setProfile(PROFILE_BULK); - setMaxMessageSize(32*1024); + setMaxMessageSize(Packet.MAX_PAYLOAD_SIZE); setRTT(5*1000); setReceiveWindow(1); setResendDelay(5*1000); - setSendAckDelay(2*1000); + setSendAckDelay(1*1000); setWindowSize(1); setMaxResends(10); + setWriteTimeout(-1); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index b5178b465..a788ffa18 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -29,6 +29,11 @@ public class ConnectionPacketHandler { boolean ok = verifyPacket(packet, con); if (!ok) return; boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); + + // close *after* receiving the data, as well as after verifying the signatures / etc + if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED)) + con.closeReceived(); + if (isNew) { con.incrementUnackedPacketsReceived(); long nextTime = con.getNextSendTime(); @@ -66,8 +71,8 @@ public class ConnectionPacketHandler { for (int i = 0; i < acked.size(); i++) { PacketLocal p = (PacketLocal)acked.get(i); if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) ) { - if (p.getNumSends() <= 1) - lowestRtt = p.getAckTime(); + //if (p.getNumSends() <= 1) + lowestRtt = p.getAckTime(); } if (p.getNumSends() > 1) @@ -81,7 +86,7 @@ public class ConnectionPacketHandler { p.getTagsSent()); } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet acked: " + p); + _log.debug("Packet acked after " + p.getAckTime() + "ms: " + p); } if (lowestRtt > 0) { int oldRTT = con.getOptions().getRTT(); @@ -216,9 +221,6 @@ public class ConnectionPacketHandler { _log.warn("Received unsigned / forged packet: " + packet); return false; } - if (packet.isFlagSet(Packet.FLAG_CLOSE)) { - con.closeReceived(); - } } return true; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 9a3880737..3069ca03e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -51,6 +51,7 @@ public class MessageInputStream extends InputStream { private boolean _locallyClosed; private int _readTimeout; private IOException _streamError; + private long _readTotal; private Object _dataLock; @@ -62,6 +63,7 @@ public class MessageInputStream extends InputStream { _highestReadyBlockId = -1; _highestBlockId = -1; _readTimeout = -1; + _readTotal = 0; _notYetReadyBlocks = new HashMap(4); _dataLock = new Object(); _closeReceived = false; @@ -149,9 +151,32 @@ public class MessageInputStream extends InputStream { public void closeReceived() { synchronized (_dataLock) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Close received, ready size: " + _readyDataBlocks.size() - + " not ready: " + _notYetReadyBlocks.size(), new Exception("closed")); + if (_log.shouldLog(Log.DEBUG)) { + StringBuffer buf = new StringBuffer(128); + buf.append("Close received, ready bytes: "); + long available = 0; + for (int i = 0; i < _readyDataBlocks.size(); i++) + available += ((ByteArray)_readyDataBlocks.get(i)).getData().length; + available -= _readyDataBlockIndex; + buf.append(available); + buf.append(" blocks: ").append(_readyDataBlocks.size()); + + buf.append(" not ready blocks: "); + long notAvailable = 0; + for (Iterator iter = _notYetReadyBlocks.keySet().iterator(); iter.hasNext(); ) { + Long id = (Long)iter.next(); + ByteArray ba = (ByteArray)_notYetReadyBlocks.get(id); + buf.append(id).append(" "); + + if (ba.getData() != null) + notAvailable += ba.getData().length; + } + + buf.append("not ready bytes: ").append(notAvailable); + buf.append(" highest ready block: ").append(_highestReadyBlockId); + + _log.debug(buf.toString(), new Exception("closed")); + } _closeReceived = true; _dataLock.notifyAll(); } @@ -182,15 +207,17 @@ public class MessageInputStream extends InputStream { _readyDataBlocks.add(new ByteArray(payload)); } _highestReadyBlockId = messageId; + long cur = _highestReadyBlockId + 1; // now pull in any previously pending blocks - while (_notYetReadyBlocks.containsKey(new Long(_highestReadyBlockId + 1))) { - ByteArray ba = (ByteArray)_notYetReadyBlocks.get(new Long(_highestReadyBlockId + 1)); + while (_notYetReadyBlocks.containsKey(new Long(cur))) { + ByteArray ba = (ByteArray)_notYetReadyBlocks.remove(new Long(cur)); if ( (ba != null) && (ba.getData() != null) && (ba.getData().length > 0) ) { _readyDataBlocks.add(ba); } if (_log.shouldLog(Log.DEBUG)) - _log.debug("making ready the block " + _highestReadyBlockId); + _log.debug("making ready the block " + cur); + cur++; _highestReadyBlockId++; } _dataLock.notifyAll(); @@ -219,7 +246,7 @@ public class MessageInputStream extends InputStream { if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("read() got EOF: " + toString()); + _log.debug("read() got EOF after " + _readTotal + " " + toString()); return -1; } else { if (_readTimeout < 0) { @@ -259,6 +286,7 @@ public class MessageInputStream extends InputStream { _readyDataBlockIndex = 0; _readyDataBlocks.remove(0); } + _readTotal++; return (rv < 0 ? rv + 256 : rv); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 79d72a40f..6b62083e3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -18,6 +18,7 @@ public class MessageOutputStream extends OutputStream { private DataReceiver _dataReceiver; private IOException _streamError; private boolean _closed; + private long _written; public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE); @@ -29,6 +30,7 @@ public class MessageOutputStream extends OutputStream { _buf = new byte[bufSize]; _dataReceiver = receiver; _dataLock = new Object(); + _written = 0; _closed = false; } @@ -48,6 +50,7 @@ public class MessageOutputStream extends OutputStream { System.arraycopy(b, cur, _buf, _valid, remaining); _valid += remaining; cur += remaining; + _written += remaining; remaining = 0; //if (_log.shouldLog(Log.DEBUG)) // _log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining); @@ -68,6 +71,7 @@ public class MessageOutputStream extends OutputStream { _dataReceiver.writeData(_buf, 0, _valid); //if (_log.shouldLog(Log.DEBUG)) // _log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining); + _written += _valid; _valid = 0; throwAnyError(); } @@ -89,9 +93,11 @@ public class MessageOutputStream extends OutputStream { // also throws InterruptedIOException if the write timeout // expires _dataReceiver.writeData(_buf, 0, _valid); + _written += _valid; //if (_log.shouldLog(Log.DEBUG)) // _log.debug("flush(): valid = " + _valid + " complete"); _valid = 0; + _dataLock.notifyAll(); } throwAnyError(); } @@ -99,6 +105,7 @@ public class MessageOutputStream extends OutputStream { public void close() throws IOException { _closed = true; flush(); + _log.debug("Output stream closed after writing " + _written); } public boolean getClosed() { return _closed; } @@ -123,7 +130,9 @@ public class MessageOutputStream extends OutputStream { void flushAvailable(DataReceiver target) throws IOException { synchronized (_dataLock) { target.writeData(_buf, 0, _valid); + _written += _valid; _valid = 0; + _dataLock.notifyAll(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 9b5053990..3ae1a2430 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -116,14 +116,6 @@ public class PacketHandler { if (_log.shouldLog(Log.WARN)) _log.warn("Echo packet received with no stream IDs: " + packet); } - } else if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { - if (sendId == null) { - // this is the initial SYN to establish a connection - _manager.getConnectionHandler().receiveNewSyn(packet); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Syn packet reply on a stream we don't know about: " + packet); - } } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received on an unknown stream (and not a SYN): " + packet); @@ -145,16 +137,21 @@ public class PacketHandler { } } } - if (_log.shouldLog(Log.WARN)) { - StringBuffer buf = new StringBuffer(128); - Set cons = _manager.listConnections(); - for (Iterator iter = cons.iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); - buf.append(Base64.encode(con.getReceiveStreamId())).append(" "); + + if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + _manager.getConnectionHandler().receiveNewSyn(packet); + } else { + if (_log.shouldLog(Log.WARN)) { + StringBuffer buf = new StringBuffer(128); + Set cons = _manager.listConnections(); + for (Iterator iter = cons.iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + buf.append(Base64.encode(con.getReceiveStreamId())).append(" "); + } + _log.warn("Packet belongs to no other cons: " + packet + " connections: " + + buf.toString() + " sendId: " + + (sendId != null ? Base64.encode(sendId) : " unknown")); } - _log.warn("Packet belongs to no other cons: " + packet + " connections: " - + buf.toString() + " sendId: " - + (sendId != null ? Base64.encode(sendId) : " unknown")); } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 50fab40aa..8af6eb713 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -68,4 +68,12 @@ public class PacketLocal extends Packet { } public int getNumSends() { return _numSends; } public long getLastSend() { return _lastSend; } + + public String toString() { + String str = super.toString(); + if (_ackOn > 0) + return str + " ack after " + getAckTime(); + else + return str; + } }