diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index f64ecce99f..d427d4c239 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -331,17 +331,33 @@ public class Connection { _outboundPackets.clear(); _outboundPackets.notifyAll(); } - if (removeFromConMgr) { - if (!_disconnectScheduled) { - _disconnectScheduled = true; - SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT); - } + } + if (removeFromConMgr) { + if (!_disconnectScheduled) { + _disconnectScheduled = true; + SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT); } } } void disconnectComplete() { _connected = false; + if (_socket != null) + _socket.destroy(); + _socket = null; + _inputStream = null; + if (_outputStream != null) + _outputStream.destroy(); + _outputStream = null; + _outboundQueue = null; + _handler = null; + if (_receiver != null) + _receiver.destroy(); + _receiver = null; + if (_activityTimer != null) + SimpleTimer.getInstance().addEvent(_activityTimer, 1); + _activityTimer = null; + if (!_disconnectScheduled) { _disconnectScheduled = true; @@ -539,6 +555,7 @@ public class Connection { private void resetActivityTimer() { if (_options.getInactivityTimeout() <= 0) return; + if (_activityTimer == null) return; long howLong = _activityTimer.getTimeLeft(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Resetting the inactivity timer to " + howLong); @@ -617,12 +634,15 @@ public class Connection { buf.append("] "); } buf.append("unacked inbound? ").append(getUnackedPacketsReceived()); - buf.append(" [high ").append(_inputStream.getHighestBlockId()); - long nacks[] = _inputStream.getNacks(); - if (nacks != null) - for (int i = 0; i < nacks.length; i++) - buf.append(" ").append(nacks[i]); - buf.append("]"); + if (_inputStream != null) { + buf.append(" [high "); + buf.append(_inputStream.getHighestBlockId()); + long nacks[] = _inputStream.getNacks(); + if (nacks != null) + for (int i = 0; i < nacks.length; i++) + buf.append(" ").append(nacks[i]); + buf.append("]"); + } buf.append("]"); return buf.toString(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index c855991e30..4d4177ff48 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -127,4 +127,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { public final boolean writeFailed() { return false; } public final boolean writeSuccessful() { return true; } } + + void destroy() { + _connection = null; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 2fc679e2ad..92b302337b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -53,7 +53,7 @@ public class ConnectionManager { _conPacketHandler = new ConnectionPacketHandler(context); _session = session; session.setSessionListener(_messageHandler); - _outboundQueue = new PacketQueue(context, session); + _outboundQueue = new PacketQueue(context, session, this); _allowIncoming = false; _maxConcurrentStreams = maxConcurrent; _numWaiting = 0; @@ -102,11 +102,12 @@ public class ConnectionManager { reject = true; } else { while (true) { - Connection oldCon = (Connection)_connectionByInboundId.put(new ByteArray(receiveId), con); + ByteArray ba = new ByteArray(receiveId); + Connection oldCon = (Connection)_connectionByInboundId.put(ba, con); if (oldCon == null) { break; } else { - _connectionByInboundId.put(new ByteArray(receiveId), oldCon); + _connectionByInboundId.put(ba, oldCon); // receiveId already taken, try another _context.random().nextBytes(receiveId); } @@ -210,15 +211,19 @@ public class ConnectionManager { } private boolean locked_tooManyStreams() { - if (_maxConcurrentStreams <= 0) return false; - if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; - int active = 0; for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); if (con.getIsConnected()) active++; } + + if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) ) + _log.info("More than 100 connections! " + active + + " total: " + _connectionByInboundId.size()); + + if (_maxConcurrentStreams <= 0) return false; + if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; return (active >= _maxConcurrentStreams); } @@ -245,8 +250,15 @@ public class ConnectionManager { } public void removeConnection(Connection con) { + boolean removed = false; synchronized (_connectionLock) { - _connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId())); + Object o = _connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId())); + removed = (o == con); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Connection removed? " + removed + " remaining: " + + _connectionByInboundId.size() + ": " + con); + if (!removed && _log.shouldLog(Log.DEBUG)) + _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values()); _connectionLock.notifyAll(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index b135822468..1417e11584 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -36,6 +36,12 @@ public class ConnectionPacketHandler { } con.getOptions().setChoke(0); boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); + + if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew + + " packet: " + packet + " con: " + con); + } // close *after* receiving the data, as well as after verifying the signatures / etc if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED)) @@ -289,6 +295,8 @@ public class ConnectionPacketHandler { } public void timeReached() { if (_con.getLastSendTime() <= _created) { + if (!_con.getIsConnected()) return; + if (_log.shouldLog(Log.DEBUG)) _log.debug("Last sent was a while ago, and we want to ack a dup"); // we haven't done anything since receiving the dup, send an diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index 089733c0f1..1818ccec0a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -19,12 +19,14 @@ public class I2PSocketFull implements I2PSocket { } public void close() throws IOException { + if (_connection == null) return; if (_connection.getIsConnected()) { _connection.getOutputStream().close(); _connection.disconnect(true); } else { //throw new IOException("Not connected"); } + destroy(); } public InputStream getInputStream() { @@ -65,4 +67,9 @@ public class I2PSocketFull implements I2PSocket { public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) { _listener = lsnr; } + + void destroy() { + _connection = null; + _listener = null; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index ef2330b8e8..37c23654bb 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -148,6 +148,8 @@ public class I2PSocketManagerFull implements I2PSocketManager { throws I2PException, NoRouteToHostException { if (_connectionManager.getSession().isClosed()) throw new I2PException("Session is closed"); + if (options == null) + options = _defaultOptions; ConnectionOptions opts = null; if (options instanceof ConnectionOptions) opts = (ConnectionOptions)options; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index a1f37ab045..5771258129 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -198,6 +198,10 @@ public class MessageOutputStream extends OutputStream { return; } + void destroy() { + _dataReceiver = null; + } + public interface DataReceiver { /** * Nonblocking write diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 9db51eb9cf..08ad87da4e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -105,12 +105,13 @@ public class PacketHandler { } private static final SimpleDateFormat _fmt = new SimpleDateFormat("hh:mm:ss.SSS"); - static void displayPacket(Packet packet, String prefix) { + void displayPacket(Packet packet, String prefix) { String msg = null; synchronized (_fmt) { msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString(); } - System.out.println(msg); + if (_log.shouldLog(Log.DEBUG)) + System.out.println(msg); } private void receiveKnownCon(Connection con, Packet packet) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 3b291f78dd..b04f6f1e18 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -77,12 +77,14 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat _ackOn = _context.clock().now(); notifyAll(); } + _connection = null; } public void cancelled() { synchronized (this) { _cancelledOn = _context.clock().now(); notifyAll(); } + _connection = null; } /** how long after packet creation was it acked? */ @@ -113,9 +115,11 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat _acceptedOn = _context.clock().now(); else _acceptedOn = -1; + _connection = null; } public void waitForCompletion(int maxWaitMs) { + _connection = null; long expiration = _context.clock().now()+maxWaitMs; while (true) { long timeRemaining = expiration - _context.clock().now(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 6597865d7f..1841f797cc 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -19,12 +19,14 @@ class PacketQueue { private I2PAppContext _context; private Log _log; private I2PSession _session; + private ConnectionManager _connectionManager; private byte _buf[]; private ByteCache _cache = ByteCache.getInstance(64, 36*1024); - public PacketQueue(I2PAppContext context, I2PSession session) { + public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { _context = context; _session = session; + _connectionManager = mgr; _buf = _cache.acquire().getData(); // new byte[36*1024]; _log = context.logManager().getLog(PacketQueue.class); } @@ -95,7 +97,7 @@ class PacketQueue { + " con: " + conStr; _log.debug(msg); } - PacketHandler.displayPacket(packet, "SEND"); + _connectionManager.getPacketHandler().displayPacket(packet, "SEND"); } }