various tweaks to make sure we release appropriate references ASAP

This commit is contained in:
jrandom
2004-11-15 14:37:56 +00:00
committed by zzz
parent 3780d290fa
commit 306f6b0037
10 changed files with 86 additions and 22 deletions

View File

@ -331,17 +331,33 @@ public class Connection {
_outboundPackets.clear();
_outboundPackets.notifyAll();
}
if (removeFromConMgr) {
if (!_disconnectScheduled) {
_disconnectScheduled = true;
SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
}
}
if (removeFromConMgr) {
if (!_disconnectScheduled) {
_disconnectScheduled = true;
SimpleTimer.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
}
}
}
void disconnectComplete() {
_connected = false;
if (_socket != null)
_socket.destroy();
_socket = null;
_inputStream = null;
if (_outputStream != null)
_outputStream.destroy();
_outputStream = null;
_outboundQueue = null;
_handler = null;
if (_receiver != null)
_receiver.destroy();
_receiver = null;
if (_activityTimer != null)
SimpleTimer.getInstance().addEvent(_activityTimer, 1);
_activityTimer = null;
if (!_disconnectScheduled) {
_disconnectScheduled = true;
@ -539,6 +555,7 @@ public class Connection {
private void resetActivityTimer() {
if (_options.getInactivityTimeout() <= 0) return;
if (_activityTimer == null) return;
long howLong = _activityTimer.getTimeLeft();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Resetting the inactivity timer to " + howLong);
@ -617,12 +634,15 @@ public class Connection {
buf.append("] ");
}
buf.append("unacked inbound? ").append(getUnackedPacketsReceived());
buf.append(" [high ").append(_inputStream.getHighestBlockId());
long nacks[] = _inputStream.getNacks();
if (nacks != null)
for (int i = 0; i < nacks.length; i++)
buf.append(" ").append(nacks[i]);
buf.append("]");
if (_inputStream != null) {
buf.append(" [high ");
buf.append(_inputStream.getHighestBlockId());
long nacks[] = _inputStream.getNacks();
if (nacks != null)
for (int i = 0; i < nacks.length; i++)
buf.append(" ").append(nacks[i]);
buf.append("]");
}
buf.append("]");
return buf.toString();
}

View File

@ -127,4 +127,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
public final boolean writeFailed() { return false; }
public final boolean writeSuccessful() { return true; }
}
void destroy() {
_connection = null;
}
}

View File

@ -53,7 +53,7 @@ public class ConnectionManager {
_conPacketHandler = new ConnectionPacketHandler(context);
_session = session;
session.setSessionListener(_messageHandler);
_outboundQueue = new PacketQueue(context, session);
_outboundQueue = new PacketQueue(context, session, this);
_allowIncoming = false;
_maxConcurrentStreams = maxConcurrent;
_numWaiting = 0;
@ -102,11 +102,12 @@ public class ConnectionManager {
reject = true;
} else {
while (true) {
Connection oldCon = (Connection)_connectionByInboundId.put(new ByteArray(receiveId), con);
ByteArray ba = new ByteArray(receiveId);
Connection oldCon = (Connection)_connectionByInboundId.put(ba, con);
if (oldCon == null) {
break;
} else {
_connectionByInboundId.put(new ByteArray(receiveId), oldCon);
_connectionByInboundId.put(ba, oldCon);
// receiveId already taken, try another
_context.random().nextBytes(receiveId);
}
@ -210,15 +211,19 @@ public class ConnectionManager {
}
private boolean locked_tooManyStreams() {
if (_maxConcurrentStreams <= 0) return false;
if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
int active = 0;
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
if (con.getIsConnected())
active++;
}
if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) )
_log.info("More than 100 connections! " + active
+ " total: " + _connectionByInboundId.size());
if (_maxConcurrentStreams <= 0) return false;
if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
return (active >= _maxConcurrentStreams);
}
@ -245,8 +250,15 @@ public class ConnectionManager {
}
public void removeConnection(Connection con) {
boolean removed = false;
synchronized (_connectionLock) {
_connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId()));
Object o = _connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId()));
removed = (o == con);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection removed? " + removed + " remaining: "
+ _connectionByInboundId.size() + ": " + con);
if (!removed && _log.shouldLog(Log.DEBUG))
_log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
_connectionLock.notifyAll();
}
}

View File

@ -36,6 +36,12 @@ public class ConnectionPacketHandler {
}
con.getOptions().setChoke(0);
boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("seq=0 && size=" + packet.getPayloadSize() + ": isNew? " + isNew
+ " packet: " + packet + " con: " + con);
}
// close *after* receiving the data, as well as after verifying the signatures / etc
if (packet.isFlagSet(Packet.FLAG_CLOSE) && packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED))
@ -289,6 +295,8 @@ public class ConnectionPacketHandler {
}
public void timeReached() {
if (_con.getLastSendTime() <= _created) {
if (!_con.getIsConnected()) return;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Last sent was a while ago, and we want to ack a dup");
// we haven't done anything since receiving the dup, send an

View File

@ -19,12 +19,14 @@ public class I2PSocketFull implements I2PSocket {
}
public void close() throws IOException {
if (_connection == null) return;
if (_connection.getIsConnected()) {
_connection.getOutputStream().close();
_connection.disconnect(true);
} else {
//throw new IOException("Not connected");
}
destroy();
}
public InputStream getInputStream() {
@ -65,4 +67,9 @@ public class I2PSocketFull implements I2PSocket {
public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
_listener = lsnr;
}
void destroy() {
_connection = null;
_listener = null;
}
}

View File

@ -148,6 +148,8 @@ public class I2PSocketManagerFull implements I2PSocketManager {
throws I2PException, NoRouteToHostException {
if (_connectionManager.getSession().isClosed())
throw new I2PException("Session is closed");
if (options == null)
options = _defaultOptions;
ConnectionOptions opts = null;
if (options instanceof ConnectionOptions)
opts = (ConnectionOptions)options;

View File

@ -198,6 +198,10 @@ public class MessageOutputStream extends OutputStream {
return;
}
void destroy() {
_dataReceiver = null;
}
public interface DataReceiver {
/**
* Nonblocking write

View File

@ -105,12 +105,13 @@ public class PacketHandler {
}
private static final SimpleDateFormat _fmt = new SimpleDateFormat("hh:mm:ss.SSS");
static void displayPacket(Packet packet, String prefix) {
void displayPacket(Packet packet, String prefix) {
String msg = null;
synchronized (_fmt) {
msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString();
}
System.out.println(msg);
if (_log.shouldLog(Log.DEBUG))
System.out.println(msg);
}
private void receiveKnownCon(Connection con, Packet packet) {

View File

@ -77,12 +77,14 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
_ackOn = _context.clock().now();
notifyAll();
}
_connection = null;
}
public void cancelled() {
synchronized (this) {
_cancelledOn = _context.clock().now();
notifyAll();
}
_connection = null;
}
/** how long after packet creation was it acked? */
@ -113,9 +115,11 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat
_acceptedOn = _context.clock().now();
else
_acceptedOn = -1;
_connection = null;
}
public void waitForCompletion(int maxWaitMs) {
_connection = null;
long expiration = _context.clock().now()+maxWaitMs;
while (true) {
long timeRemaining = expiration - _context.clock().now();

View File

@ -19,12 +19,14 @@ class PacketQueue {
private I2PAppContext _context;
private Log _log;
private I2PSession _session;
private ConnectionManager _connectionManager;
private byte _buf[];
private ByteCache _cache = ByteCache.getInstance(64, 36*1024);
public PacketQueue(I2PAppContext context, I2PSession session) {
public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) {
_context = context;
_session = session;
_connectionManager = mgr;
_buf = _cache.acquire().getData(); // new byte[36*1024];
_log = context.logManager().getLog(PacketQueue.class);
}
@ -95,7 +97,7 @@ class PacketQueue {
+ " con: " + conStr;
_log.debug(msg);
}
PacketHandler.displayPacket(packet, "SEND");
_connectionManager.getPacketHandler().displayPacket(packet, "SEND");
}
}