deal with a race on close

more zealous bc synchronization
make sure we always close the streams explicitly
logging
This commit is contained in:
jrandom
2004-08-15 20:48:35 +00:00
committed by zzz
parent d2fc24e792
commit e5d66f46c6

View File

@ -346,7 +346,7 @@ class I2PSocketImpl implements I2PSocket {
} }
boolean timedOut = false; boolean timedOut = false;
while (read.length == 0) { while ( (read.length == 0) && (!inStreamClosed) ) {
synchronized (flagLock) { synchronized (flagLock) {
if (closed) { if (closed) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -378,6 +378,9 @@ class I2PSocketImpl implements I2PSocket {
} }
} }
if (read.length > len) throw new RuntimeException("BUG"); if (read.length > len) throw new RuntimeException("BUG");
if ( (inStreamClosed) && ( (read == null) || (read.length <= 0) ) )
return -1;
System.arraycopy(read, 0, b, off, read.length); System.arraycopy(read, 0, b, off, read.length);
if (_log.shouldLog(Log.DEBUG)) { if (_log.shouldLog(Log.DEBUG)) {
@ -456,6 +459,8 @@ class I2PSocketImpl implements I2PSocket {
synchronized (I2PInputStream.this) { synchronized (I2PInputStream.this) {
I2PInputStream.this.notifyAll(); I2PInputStream.this.notifyAll();
} }
if (_log.shouldLog(Log.DEBUG))
_log.debug(getStreamPrefix() + "After insert " + len + " bytes into queue: " + hashCode());
} }
public void notifyClosed() { public void notifyClosed() {
@ -471,6 +476,8 @@ class I2PSocketImpl implements I2PSocket {
inStreamClosed = true; inStreamClosed = true;
bc.notifyAll(); bc.notifyAll();
} }
if (_log.shouldLog(Log.DEBUG))
_log.debug(getStreamPrefix() + "After close");
} }
} }
@ -518,10 +525,21 @@ class I2PSocketImpl implements I2PSocket {
*/ */
private boolean handleNextPacket(ByteCollector bc, byte buffer[]) private boolean handleNextPacket(ByteCollector bc, byte buffer[])
throws IOException, I2PSessionException { throws IOException, I2PSessionException {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket");
int len = in.read(buffer); int len = in.read(buffer);
int bcsize = bc.getCurrentSize(); int bcsize = 0;
synchronized (bc) {
bcsize = bc.getCurrentSize();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket len=" + len + " bcsize=" + bcsize);
if (len != -1) { if (len != -1) {
bc.append(buffer, len); synchronized (bc) {
bc.append(buffer, len);
}
} else if (bcsize == 0) { } else if (bcsize == 0) {
// nothing left in the buffer, and read(..) got EOF (-1). // nothing left in the buffer, and read(..) got EOF (-1).
// the bart the // the bart the
@ -529,7 +547,7 @@ class I2PSocketImpl implements I2PSocket {
} }
if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) { if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Runner Point d: " + hashCode()); _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Runner Point d: " + hashCode());
try { try {
Thread.sleep(PACKET_DELAY); Thread.sleep(PACKET_DELAY);
@ -538,19 +556,22 @@ class I2PSocketImpl implements I2PSocket {
} }
} }
if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) { if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
byte[] data = bc.startToByteArray(MAX_PACKET_SIZE); byte data[] = null;
synchronized (bc) {
data = bc.startToByteArray(MAX_PACKET_SIZE);
}
if (data.length > 0) { if (data.length > 0) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Message size is: " + data.length); _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message size is: " + data.length);
boolean sent = sendBlock(data); boolean sent = sendBlock(data);
if (!sent) { if (!sent) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "Error sending message to peer. Killing socket runner"); _log.warn(getPrefix() + ":" + Thread.currentThread().getName() + "Error sending message to peer. Killing socket runner");
errorOccurred(); errorOccurred();
return false; return false;
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Message sent to peer"); _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message sent to peer");
} }
} }
} }
@ -567,7 +588,14 @@ class I2PSocketImpl implements I2PSocket {
while (keepHandling) { while (keepHandling) {
keepHandling = handleNextPacket(bc, buffer); keepHandling = handleNextPacket(bc, buffer);
packetsHandled++; packetsHandled++;
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + ":" + Thread.currentThread().getName()
+ "Packets handled: " + packetsHandled);
} }
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + ":" + Thread.currentThread().getName()
+ "After handling packets, we're done. Packets handled: " + packetsHandled);
if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) { if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "We lost some data queued up due to a network send error (input stream: " _log.warn(getPrefix() + "We lost some data queued up due to a network send error (input stream: "
@ -583,16 +611,20 @@ class I2PSocketImpl implements I2PSocket {
} // FIXME: Race here? } // FIXME: Race here?
if (sc) { if (sc) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Sending close packet: (we started? " + outgoing + ") after reading " + _bytesRead + " and writing " + _bytesWritten); _log.info(getPrefix() + ":" + Thread.currentThread().getName()
+ "Sending close packet: (we started? " + outgoing
+ ") after reading " + _bytesRead + " and writing " + _bytesWritten);
byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]); byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]);
boolean sent = manager.getSession().sendMessage(remote, packet); boolean sent = manager.getSession().sendMessage(remote, packet);
if (!sent) { if (!sent) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "Error sending close packet to peer"); _log.warn(getPrefix() + ":" + Thread.currentThread().getName()
+ "Error sending close packet to peer");
errorOccurred(); errorOccurred();
} }
} }
manager.removeSocket(I2PSocketImpl.this); manager.removeSocket(I2PSocketImpl.this);
internalClose();
} catch (InterruptedIOException ex) { } catch (InterruptedIOException ex) {
_log.error(getPrefix() + "BUG! read() operations should not timeout!", ex); _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex);
} catch (IOException ex) { } catch (IOException ex) {