Streaming:

- Make I2PSocketFull.close() nonblocking; it will now cause any user-side
     writes blocked in I/O (Connection.packetSendChoke()) to throw
     an exception (tickets #629, #1041)
   - Don't ignore InterruptedExceptions; throw InterruptedIOException
   - Back out static disconnect exception
   - MessageInputStream locking fixes
   - Cleanups
 I2PSnark:
   - Close socket before closing output stream to avoid blocking in
     Peer.disconnect(), and prevent Peer.disconnect() loop
This commit is contained in:
zzz
2013-10-12 17:39:49 +00:00
parent 380783c1ba
commit 74a57abfb4
10 changed files with 176 additions and 82 deletions

View File

@ -28,6 +28,8 @@ import java.io.OutputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocket;
@ -68,8 +70,10 @@ public class Peer implements Comparable
private I2PSocket sock; private I2PSocket sock;
private boolean deregister = true; private boolean deregister = true;
private static long __id; private static final AtomicLong __id = new AtomicLong();
private long _id; private final long _id;
private final AtomicBoolean _disconnected = new AtomicBoolean();
final static long CHECK_PERIOD = PeerCoordinator.CHECK_PERIOD; // 40 seconds final static long CHECK_PERIOD = PeerCoordinator.CHECK_PERIOD; // 40 seconds
final static int RATE_DEPTH = PeerCoordinator.RATE_DEPTH; // make following arrays RATE_DEPTH long final static int RATE_DEPTH = PeerCoordinator.RATE_DEPTH; // make following arrays RATE_DEPTH long
private long uploaded_old[] = {-1,-1,-1}; private long uploaded_old[] = {-1,-1,-1};
@ -98,7 +102,7 @@ public class Peer implements Comparable
this.my_id = my_id; this.my_id = my_id;
this.infohash = infohash; this.infohash = infohash;
this.metainfo = metainfo; this.metainfo = metainfo;
_id = ++__id; _id = __id.incrementAndGet();
//_log.debug("Creating a new peer with " + peerID.toString(), new Exception("creating")); //_log.debug("Creating a new peer with " + peerID.toString(), new Exception("creating"));
} }
@ -123,7 +127,7 @@ public class Peer implements Comparable
byte[] id = handshake(in, out); byte[] id = handshake(in, out);
this.peerID = new PeerID(id, sock.getPeerDestination()); this.peerID = new PeerID(id, sock.getPeerDestination());
_id = ++__id; _id = __id.incrementAndGet();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Creating a new peer " + peerID.toString(), new Exception("creating " + _id)); _log.debug("Creating a new peer " + peerID.toString(), new Exception("creating " + _id));
} }
@ -457,6 +461,8 @@ public class Peer implements Comparable
void disconnect() void disconnect()
{ {
if (!_disconnected.compareAndSet(false, true))
return;
PeerState s = state; PeerState s = state;
if (s != null) if (s != null)
{ {
@ -476,9 +482,11 @@ public class Peer implements Comparable
PeerConnectionIn in = s.in; PeerConnectionIn in = s.in;
if (in != null) if (in != null)
in.disconnect(); in.disconnect();
PeerConnectionOut out = s.out; // this is blocking in streaming, so do this after closing the socket
if (out != null) // so it won't really block
out.disconnect(); //PeerConnectionOut out = s.out;
//if (out != null)
// out.disconnect();
PeerListener pl = s.listener; PeerListener pl = s.listener;
if (pl != null) if (pl != null)
pl.disconnected(this); pl.disconnected(this);
@ -492,6 +500,13 @@ public class Peer implements Comparable
_log.warn("Error disconnecting " + toString(), ioe); _log.warn("Error disconnecting " + toString(), ioe);
} }
} }
if (s != null) {
// this is blocking in streaming, so do this after closing the socket
// so it won't really block
PeerConnectionOut out = s.out;
if (out != null)
out.disconnect();
}
} }
/** /**

View File

@ -65,7 +65,7 @@ class PeerConnectionIn implements Runnable
try { try {
din.close(); din.close();
} catch (IOException ioe) { } catch (IOException ioe) {
_log.warn("Error closing the stream from " + peer, ioe); //_log.warn("Error closing the stream from " + peer, ioe);
} }
} }
} }

View File

@ -56,7 +56,6 @@ class PeerConnectionOut implements Runnable
_id = ++__id; _id = ++__id;
lastSent = System.currentTimeMillis(); lastSent = System.currentTimeMillis();
quit = false;
} }
public void startup() { public void startup() {
@ -66,7 +65,7 @@ class PeerConnectionOut implements Runnable
/** /**
* Continuesly monitors for more outgoing messages that have to be send. * Continuesly monitors for more outgoing messages that have to be send.
* Stops if quit is true of an IOException occurs. * Stops if quit is true or an IOException occurs.
*/ */
public void run() public void run()
{ {
@ -215,13 +214,13 @@ class PeerConnectionOut implements Runnable
thread.interrupt(); thread.interrupt();
sendQueue.clear(); sendQueue.clear();
sendQueue.notify(); sendQueue.notifyAll();
} }
if (dout != null) { if (dout != null) {
try { try {
dout.close(); dout.close();
} catch (IOException ioe) { } catch (IOException ioe) {
_log.warn("Error closing the stream to " + peer, ioe); //_log.warn("Error closing the stream to " + peer, ioe);
} }
} }
} }

View File

@ -168,7 +168,7 @@ class Connection {
* @return true if the packet should be sent, false for a fatal error * @return true if the packet should be sent, false for a fatal error
* will return false after 5 minutes even if timeoutMs is <= 0. * will return false after 5 minutes even if timeoutMs is <= 0.
*/ */
boolean packetSendChoke(long timeoutMs) { public boolean packetSendChoke(long timeoutMs) throws IOException, InterruptedException {
long start = _context.clock().now(); long start = _context.clock().now();
long writeExpire = start + timeoutMs; // only used if timeoutMs > 0 long writeExpire = start + timeoutMs; // only used if timeoutMs > 0
boolean started = false; boolean started = false;
@ -183,8 +183,10 @@ class Connection {
// no need to wait until the other side has ACKed us before sending the first few wsize // no need to wait until the other side has ACKed us before sending the first few wsize
// packets through // packets through
// Incorrect assumption, the constructor defaults _connected to true --Sponge // Incorrect assumption, the constructor defaults _connected to true --Sponge
if (!_connected.get()) if (!_connected.get())
return false; throw new IOException("disconnected");
if (_outputStream.getClosed())
throw new IOException("output stream closed");
started = true; started = true;
// Try to keep things moving even during NACKs and retransmissions... // Try to keep things moving even during NACKs and retransmissions...
// Limit unacked packets to the window // Limit unacked packets to the window
@ -207,12 +209,24 @@ class Connection {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Outbound window is full (" + unacked + "/" + wsz + "/" _log.debug("Outbound window is full (" + unacked + "/" + wsz + "/"
+ _activeResends + "), waiting " + timeLeft); + _activeResends + "), waiting " + timeLeft);
try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;} try {
_outboundPackets.wait(Math.min(timeLeft,250l));
} catch (InterruptedException ie) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")");
throw ie;
}
} else { } else {
//if (_log.shouldLog(Log.DEBUG)) //if (_log.shouldLog(Log.DEBUG))
// _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends // _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends
// + "), waiting indefinitely"); // + "), waiting indefinitely");
try { _outboundPackets.wait(250); } catch (InterruptedException ie) {if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")"); return false;} //10*1000 try {
_outboundPackets.wait(250);
} catch (InterruptedException ie) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")");
throw ie;
} //10*1000
} }
} else { } else {
_context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start); _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start);
@ -222,6 +236,9 @@ class Connection {
} }
} }
/**
* Notify all threads waiting in packetSendChoke()
*/
void windowAdjusted() { void windowAdjusted() {
synchronized (_outboundPackets) { synchronized (_outboundPackets) {
_outboundPackets.notifyAll(); _outboundPackets.notifyAll();
@ -710,8 +727,6 @@ class Connection {
} }
} }
private static final IOException DISCON_IOE = new IOException("disconnected!");
/** /**
* Must be called when we are done with this connection. * Must be called when we are done with this connection.
* Final disconnect. Remove from conn manager. * Final disconnect. Remove from conn manager.
@ -729,7 +744,7 @@ class Connection {
_outputStream.destroy(); _outputStream.destroy();
_receiver.destroy(); _receiver.destroy();
_activityTimer.cancel(); _activityTimer.cancel();
_inputStream.streamErrorOccurred(DISCON_IOE); _inputStream.streamErrorOccurred(new IOException("disconnected"));
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info("Connection disconnect complete: " _log.info("Connection disconnect complete: "

View File

@ -4,6 +4,7 @@ import java.util.List;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.I2PException; import net.i2p.I2PException;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleScheduler;
@ -200,8 +201,8 @@ class ConnectionPacketHandler {
final long lastSendTime = con.getLastSendTime(); final long lastSendTime = con.getLastSendTime();
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn(String.format("%s congestion.. dup packet %s now %d ackDelay %d lastSend %d", _log.warn(String.format("%s congestion.. dup packet %s ackDelay %d lastSend %d ago",
con, packet, now, ackDelay, lastSendTime)); con, packet, now, ackDelay, DataHelper.formatDuration(now - lastSendTime)));
final long nextSendTime = lastSendTime + ackDelay; final long nextSendTime = lastSendTime + ackDelay;
if (nextSendTime <= now) { if (nextSendTime <= now) {

View File

@ -33,6 +33,15 @@ class I2PSocketFull implements I2PSocket {
_remotePeer = _localPeer = null; _remotePeer = _localPeer = null;
} }
/**
* Closes this socket.
*
* Nonblocking as of 0.9.9:
* Any thread currently blocked in an I/O operation upon this socket will throw an IOException.
* Once a socket has been closed, it is not available for further networking use
* (i.e. can't be reconnected or rebound). A new socket needs to be created.
* Closing this socket will also close the socket's InputStream and OutputStream.
*/
public void close() throws IOException { public void close() throws IOException {
if (!_closed.compareAndSet(false,true)) { if (!_closed.compareAndSet(false,true)) {
// log a trace to find out why // log a trace to find out why
@ -42,15 +51,13 @@ class I2PSocketFull implements I2PSocket {
Connection c = _connection; Connection c = _connection;
if (c == null) return; if (c == null) return;
if (c.getIsConnected()) { if (c.getIsConnected()) {
OutputStream out = c.getOutputStream();
try {
out.close();
} catch (IOException ioe) {
// ignore any write error, as we want to keep on and kill the
// con (thanks Complication!)
}
MessageInputStream in = c.getInputStream(); MessageInputStream in = c.getInputStream();
in.close(); in.close();
MessageOutputStream out = c.getOutputStream();
out.closeInternal();
// this will cause any thread waiting in Connection.packetSendChoke()
// to throw an IOE
c.windowAdjusted();
} else { } else {
//throw new IOException("Not connected"); //throw new IOException("Not connected");
} }

View File

@ -2,6 +2,7 @@ package net.i2p.client.streaming;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -287,12 +288,12 @@ class MessageInputStream extends InputStream {
@Override @Override
public int read(byte target[], int offset, int length) throws IOException { public int read(byte target[], int offset, int length) throws IOException {
if (_locallyClosed) throw new IOException("Already locally closed");
throwAnyError();
long expiration = -1; long expiration = -1;
if (_readTimeout > 0) if (_readTimeout > 0)
expiration = _readTimeout + System.currentTimeMillis(); expiration = _readTimeout + System.currentTimeMillis();
synchronized (_dataLock) { synchronized (_dataLock) {
if (_locallyClosed) throw new IOException("Already locally closed");
throwAnyError();
for (int i = 0; i < length; i++) { for (int i = 0; i < length; i++) {
if ( (_readyDataBlocks.isEmpty()) && (i == 0) ) { if ( (_readyDataBlocks.isEmpty()) && (i == 0) ) {
// ok, we havent found anything, so lets block until we get // ok, we havent found anything, so lets block until we get
@ -312,7 +313,13 @@ class MessageInputStream extends InputStream {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with no timeout: " + toString()); + ") with no timeout: " + toString());
try { _dataLock.wait(); } catch (InterruptedException ie) { } try {
_dataLock.wait();
} catch (InterruptedException ie) {
IOException ioe2 = new InterruptedIOException("Interrupted read");
ioe2.initCause(ie);
throw ioe2;
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with no timeout complete: " + toString()); + ") with no timeout complete: " + toString());
@ -321,7 +328,13 @@ class MessageInputStream extends InputStream {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with timeout: " + _readTimeout + ": " + toString()); + ") with timeout: " + _readTimeout + ": " + toString());
try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { } try {
_dataLock.wait(_readTimeout);
} catch (InterruptedException ie) {
IOException ioe2 = new InterruptedIOException("Interrupted read");
ioe2.initCause(ie);
throw ioe2;
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("read(...," + offset+", " + length+ ")[" + i _log.debug("read(...," + offset+", " + length+ ")[" + i
+ ") with timeout complete: " + _readTimeout + ": " + toString()); + ") with timeout complete: " + _readTimeout + ": " + toString());
@ -382,10 +395,10 @@ class MessageInputStream extends InputStream {
@Override @Override
public int available() throws IOException { public int available() throws IOException {
if (_locallyClosed) throw new IOException("Already closed");
throwAnyError();
int numBytes = 0; int numBytes = 0;
synchronized (_dataLock) { synchronized (_dataLock) {
if (_locallyClosed) throw new IOException("Already closed");
throwAnyError();
for (int i = 0; i < _readyDataBlocks.size(); i++) { for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = _readyDataBlocks.get(i); ByteArray cur = _readyDataBlocks.get(i);
if (i == 0) if (i == 0)
@ -467,14 +480,15 @@ class MessageInputStream extends InputStream {
* *
*/ */
void streamErrorOccurred(IOException ioe) { void streamErrorOccurred(IOException ioe) {
if (_streamError == null)
_streamError = ioe;
_locallyClosed = true;
synchronized (_dataLock) { synchronized (_dataLock) {
if (_streamError == null)
_streamError = ioe;
_locallyClosed = true;
_dataLock.notifyAll(); _dataLock.notifyAll();
} }
} }
/** Caller must lock _dataLock */
private void throwAnyError() throws IOException { private void throwAnyError() throws IOException {
IOException ioe = _streamError; IOException ioe = _streamError;
if (ioe != null) { if (ioe != null) {

View File

@ -89,15 +89,17 @@ class MessageOutputStream extends OutputStream {
_writeTimeout = ms; _writeTimeout = ms;
} }
public int getWriteTimeout() { return _writeTimeout; } public int getWriteTimeout() { return _writeTimeout; }
public void setBufferSize(int size) { _nextBufferSize = size; } public void setBufferSize(int size) { _nextBufferSize = size; }
@Override @Override
public void write(byte b[]) throws IOException { public void write(byte b[]) throws IOException {
write(b, 0, b.length); write(b, 0, b.length);
} }
@Override @Override
public void write(byte b[], int off, int len) throws IOException { public void write(byte b[], int off, int len) throws IOException {
if (_closed.get()) throw new IOException("Already closed"); if (_closed.get()) throw new IOException("Already closed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -150,7 +152,13 @@ class MessageOutputStream extends OutputStream {
// ok, we've actually added a new packet - lets wait until // ok, we've actually added a new packet - lets wait until
// its accepted into the queue before moving on (so that we // its accepted into the queue before moving on (so that we
// dont fill our buffer instantly) // dont fill our buffer instantly)
ws.waitForAccept(_writeTimeout); try {
ws.waitForAccept(_writeTimeout);
} catch (InterruptedException ie) {
IOException ioe2 = new InterruptedIOException("Interrupted write");
ioe2.initCause(ie);
throw ioe2;
}
if (!ws.writeAccepted()) { if (!ws.writeAccepted()) {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("Write not accepted of " + ws); _log.warn("Write not accepted of " + ws);
@ -296,7 +304,7 @@ class MessageOutputStream extends OutputStream {
* *
* @throws IOException if the write fails * @throws IOException if the write fails
*/ */
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
/* @throws InterruptedIOException if the write times out /* @throws InterruptedIOException if the write times out
* Documented here, but doesn't belong in the javadoc. * Documented here, but doesn't belong in the javadoc.
@ -343,14 +351,20 @@ class MessageOutputStream extends OutputStream {
// Wait a loooooong time, until we have the ACK // Wait a loooooong time, until we have the ACK
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws); _log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws);
if (_closed.get() && try {
( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) || if (_closed.get() &&
(_writeTimeout <= 0) ) ) ( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ||
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT); (_writeTimeout <= 0) ) )
else if ( (_writeTimeout <= 0) || (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ) ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT); else if ( (_writeTimeout <= 0) || (_writeTimeout > Connection.DISCONNECT_TIMEOUT) )
else ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT);
ws.waitForCompletion(_writeTimeout); else
ws.waitForCompletion(_writeTimeout);
} catch (InterruptedException ie) {
IOException ioe2 = new InterruptedIOException("Interrupted flush");
ioe2.initCause(ie);
throw ioe2;
}
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("after waiting " + _writeTimeout + "ms for completion of " + ws); _log.debug("after waiting " + _writeTimeout + "ms for completion of " + ws);
if (ws.writeFailed() && (_writeTimeout > 0) ) if (ws.writeFailed() && (_writeTimeout > 0) )
@ -466,6 +480,7 @@ class MessageOutputStream extends OutputStream {
void flushAvailable(DataReceiver target) throws IOException { void flushAvailable(DataReceiver target) throws IOException {
flushAvailable(target, true); flushAvailable(target, true);
} }
void flushAvailable(DataReceiver target, boolean blocking) throws IOException { void flushAvailable(DataReceiver target, boolean blocking) throws IOException {
WriteStatus ws = null; WriteStatus ws = null;
long before = System.currentTimeMillis(); long before = System.currentTimeMillis();
@ -487,7 +502,13 @@ class MessageOutputStream extends OutputStream {
_log.debug("Took " + (afterBuild-before) + "ms to build a packet? " + ws); _log.debug("Took " + (afterBuild-before) + "ms to build a packet? " + ws);
if (blocking && ws != null) { if (blocking && ws != null) {
ws.waitForAccept(_writeTimeout); try {
ws.waitForAccept(_writeTimeout);
} catch (InterruptedException ie) {
IOException ioe2 = new InterruptedIOException("Interrupted flush");
ioe2.initCause(ie);
throw ioe2;
}
if (ws.writeFailed()) if (ws.writeFailed())
throw new IOException("Flush available failed"); throw new IOException("Flush available failed");
else if (!ws.writeAccepted()) else if (!ws.writeAccepted())
@ -526,7 +547,7 @@ class MessageOutputStream extends OutputStream {
* Success means an ACK FROM THE FAR END. * Success means an ACK FROM THE FAR END.
* @param maxWaitMs -1 = forever * @param maxWaitMs -1 = forever
*/ */
public void waitForCompletion(int maxWaitMs); public void waitForCompletion(int maxWaitMs) throws IOException, InterruptedException;
/** /**
* Wait until the data written is accepted into the outbound pool, * Wait until the data written is accepted into the outbound pool,
@ -534,9 +555,9 @@ class MessageOutputStream extends OutputStream {
* which we throttle rather than accept arbitrary data and queue * which we throttle rather than accept arbitrary data and queue
* @param maxWaitMs -1 = forever * @param maxWaitMs -1 = forever
*/ */
public void waitForAccept(int maxWaitMs); public void waitForAccept(int maxWaitMs) throws IOException, InterruptedException;
/** the write was accepted. aka did the socket not close? */ /** Was the write was accepted. aka did the socket not close? */
public boolean writeAccepted(); public boolean writeAccepted();
/** did the write fail? */ /** did the write fail? */
public boolean writeFailed(); public boolean writeFailed();

View File

@ -254,6 +254,7 @@ class Packet {
* @return Delay before resending a packet in seconds. * @return Delay before resending a packet in seconds.
*/ */
public int getResendDelay() { return _resendDelay; } public int getResendDelay() { return _resendDelay; }
/** /**
* Unused. * Unused.
* Broken before release 0.7.8 * Broken before release 0.7.8
@ -267,17 +268,22 @@ class Packet {
* @return the payload of the message, null if none. * @return the payload of the message, null if none.
*/ */
public ByteArray getPayload() { return _payload; } public ByteArray getPayload() { return _payload; }
public void setPayload(ByteArray payload) { public void setPayload(ByteArray payload) {
_payload = payload; _payload = payload;
if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) ) if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) )
throw new IllegalArgumentException("Too large payload: " + payload.getValid()); throw new IllegalArgumentException("Too large payload: " + payload.getValid());
} }
public int getPayloadSize() { public int getPayloadSize() {
return (_payload == null ? 0 : _payload.getValid()); return (_payload == null ? 0 : _payload.getValid());
} }
/** does nothing right now */
public void releasePayload() { public void releasePayload() {
//_payload = null; //_payload = null;
} }
public ByteArray acquirePayload() { public ByteArray acquirePayload() {
_payload = new ByteArray(new byte[Packet.MAX_PAYLOAD_SIZE]); _payload = new ByteArray(new byte[Packet.MAX_PAYLOAD_SIZE]);
return _payload; return _payload;
@ -288,13 +294,16 @@ class Packet {
* @return true if set, false if not. * @return true if set, false if not.
*/ */
public boolean isFlagSet(int flag) { return 0 != (_flags & flag); } public boolean isFlagSet(int flag) { return 0 != (_flags & flag); }
public void setFlag(int flag) { _flags |= flag; } public void setFlag(int flag) { _flags |= flag; }
public void setFlag(int flag, boolean set) { public void setFlag(int flag, boolean set) {
if (set) if (set)
_flags |= flag; _flags |= flag;
else else
_flags &= ~flag; _flags &= ~flag;
} }
public void setFlags(int flags) { _flags = flags; } public void setFlags(int flags) { _flags = flags; }
/** the signature on the packet (only included if the flag for it is set) /** the signature on the packet (only included if the flag for it is set)

View File

@ -140,6 +140,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Cancelled! " + toString(), new Exception("cancelled")); _log.debug("Cancelled! " + toString(), new Exception("cancelled"));
} }
public SimpleTimer2.TimedEvent getResendEvent() { return _resendEvent; } public SimpleTimer2.TimedEvent getResendEvent() { return _resendEvent; }
/** how long after packet creation was it acked? /** how long after packet creation was it acked?
@ -230,59 +231,71 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
return buf; return buf;
} }
////// begin WriteStatus methods
/** /**
* Blocks until outbound window is not full. See Connection.packetSendChoke(). * Blocks until outbound window is not full. See Connection.packetSendChoke().
* @param maxWaitMs MessageOutputStream is the only caller, generally with -1 * @param maxWaitMs MessageOutputStream is the only caller, generally with -1
*/ */
public void waitForAccept(int maxWaitMs) { public void waitForAccept(int maxWaitMs) throws IOException, InterruptedException {
long before = _context.clock().now(); long before = _context.clock().now();
int queued = _connection.getUnackedPacketsSent(); boolean accepted = false;
int window = _connection.getOptions().getWindowSize(); try {
boolean accepted = _connection.packetSendChoke(maxWaitMs); // throws IOE or IE
long after = _context.clock().now(); accepted = _connection.packetSendChoke(maxWaitMs);
if (accepted) { } finally {
_acceptedOn = after; if (accepted) {
} else { _acceptedOn = _context.clock().now();
_acceptedOn = -1; } else {
releasePayload(); _acceptedOn = -1;
releasePayload();
}
if ( (_acceptedOn - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) {
int queued = _connection.getUnackedPacketsSent();
int window = _connection.getOptions().getWindowSize();
int afterQueued = _connection.getUnackedPacketsSent();
_log.debug("Took " + (_acceptedOn - before) + "ms to get "
+ (accepted ? "accepted" : "rejected")
+ (_cancelledOn > 0 ? " and CANCELLED" : "")
+ ", queued behind " + queued +" with a window size of " + window
+ ", finally accepted with " + afterQueued + " queued: "
+ toString());
}
} }
int afterQueued = _connection.getUnackedPacketsSent();
if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Took " + (after-before) + "ms to get "
+ (accepted ? "accepted" : "rejected")
+ (_cancelledOn > 0 ? " and CANCELLED" : "")
+ ", queued behind " + queued +" with a window size of " + window
+ ", finally accepted with " + afterQueued + " queued: "
+ toString());
} }
/** block until the packet is acked from the far end */ /** block until the packet is acked from the far end */
public void waitForCompletion(int maxWaitMs) { public void waitForCompletion(int maxWaitMs) throws IOException, InterruptedException {
long expiration = _context.clock().now()+maxWaitMs; long expiration = _context.clock().now()+maxWaitMs;
while (true) { try {
long timeRemaining = expiration - _context.clock().now(); while (true) {
if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) break; long timeRemaining = expiration - _context.clock().now();
try { if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) break;
synchronized (this) { synchronized (this) {
if (_ackOn > 0) break; if (_ackOn > 0) break;
if (_cancelledOn > 0) break; if (!_connection.getIsConnected())
if (!_connection.getIsConnected()) break; throw new IOException("disconnected");
if (_cancelledOn > 0)
throw new IOException("cancelled");
if (timeRemaining > 60*1000) if (timeRemaining > 60*1000)
timeRemaining = 60*1000; timeRemaining = 60*1000;
else if (timeRemaining <= 0) else if (timeRemaining <= 0)
timeRemaining = 10*1000; timeRemaining = 10*1000;
wait(timeRemaining); wait(timeRemaining);
} }
} catch (InterruptedException ie) { }//{ break; } }
} finally {
if (!writeSuccessful())
releasePayload();
} }
if (!writeSuccessful())
releasePayload();
} }
public synchronized boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; } public synchronized boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; }
public synchronized boolean writeFailed() { return _cancelledOn > 0; } public synchronized boolean writeFailed() { return _cancelledOn > 0; }
public synchronized boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; } public synchronized boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; }
////// end WriteStatus methods
/** Generate a pcap/tcpdump-compatible format, /** Generate a pcap/tcpdump-compatible format,
* so we can use standard debugging tools. * so we can use standard debugging tools.
*/ */