Fixed Bug #99: Data pending to be sent is still sent even if STREAM CLOSE is issued.

This commit is contained in:
connelly
2005-03-24 01:49:00 +00:00
committed by zzz
parent 904f755c8c
commit 28978e3680

View File

@ -341,7 +341,7 @@ public class SAMStreamSession {
} }
/** /**
* Remove and close a SAM STREAM session socket handler. * Remove and gracefully close a SAM STREAM session socket handler.
* *
* @param id Handler id to be removed * @param id Handler id to be removed
*/ */
@ -357,12 +357,12 @@ public class SAMStreamSession {
if (reader != null) if (reader != null)
reader.stopRunning(); reader.stopRunning();
if (sender != null) if (sender != null)
sender.stopRunning(); sender.shutDownGracefully();
_log.debug("Removed SAM STREAM session socket handler " + id); _log.debug("Removed SAM STREAM session socket handler (gracefully) " + id);
} }
/** /**
* Remove and close all the socket handlers managed by this SAM * Remove and hard close all the socket handlers managed by this SAM
* STREAM session. * STREAM session.
* *
*/ */
@ -378,7 +378,7 @@ public class SAMStreamSession {
while (iter.hasNext()) { while (iter.hasNext()) {
id = (Integer)iter.next(); id = (Integer)iter.next();
((SAMStreamSessionSocketReader)handlersMap.get(id)).stopRunning(); ((SAMStreamSessionSocketReader)handlersMap.get(id)).stopRunning();
((StreamSender)sendersMap.get(id)).stopRunning(); ((StreamSender)sendersMap.get(id)).shutDownGracefully();
} }
handlersMap.clear(); handlersMap.clear();
sendersMap.clear(); sendersMap.clear();
@ -498,25 +498,20 @@ public class SAMStreamSession {
} }
/** /**
* Stop a SAM STREAM session socket reader * Stop a SAM STREAM session socket reader thead immediately.
* *
*/ */
public void stopRunning() { public void stopRunning() {
_log.debug("stopRunning() invoked on socket handler " + id); _log.debug("stopRunning() invoked on socket reader " + id);
synchronized (runningLock) { synchronized (runningLock) {
if (stillRunning) { if (stillRunning) {
stillRunning = false; stillRunning = false;
try {
i2pSocket.close();
} catch (IOException e) {
_log.debug("Caught IOException", e);
}
} }
} }
} }
public void run() { public void run() {
_log.debug("SAM STREAM session socket handler running"); _log.debug("run() called for socket reader " + id);
int read = -1; int read = -1;
byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE]; byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
@ -568,7 +563,9 @@ public class SAMStreamSession {
private int _id; private int _id;
private ByteCache _cache; private ByteCache _cache;
private OutputStream _out = null; private OutputStream _out = null;
private boolean _stillRunning; private boolean _stillRunning, _shuttingDownGracefully;
private Object runningLock = new Object();
private I2PSocket i2pSocket = null;
public StreamSender(I2PSocket s, int id) throws IOException { public StreamSender(I2PSocket s, int id) throws IOException {
_data = new ArrayList(1); _data = new ArrayList(1);
@ -576,6 +573,8 @@ public class SAMStreamSession {
_cache = ByteCache.getInstance(4, 32*1024); _cache = ByteCache.getInstance(4, 32*1024);
_out = s.getOutputStream(); _out = s.getOutputStream();
_stillRunning = true; _stillRunning = true;
_shuttingDownGracefully = false;
i2pSocket = s;
} }
/** /**
@ -602,29 +601,55 @@ public class SAMStreamSession {
} }
/** /**
* Stop a SAM STREAM session socket sender * Stop a SAM STREAM session socket sender thread immediately
* *
*/ */
public void stopRunning() { public void stopRunning() {
_log.debug("stopRunning() invoked on socket sender " + _id); _log.debug("stopRunning() invoked on socket sender " + _id);
synchronized (runningLock) {
if (_stillRunning) {
_stillRunning = false; _stillRunning = false;
try {
i2pSocket.close();
} catch (IOException e) {
_log.debug("Caught IOException", e);
}
synchronized (_data) { synchronized (_data) {
_data.clear(); _data.clear();
_data.notifyAll(); _data.notifyAll();
} }
} }
}
}
/**
* Stop a SAM STREAM session socket sender gracefully: stop the
* sender thread once all pending data has been sent.
*/
public void shutDownGracefully() {
_log.debug("shutDownGracefully() invoked on socket sender " + _id);
_shuttingDownGracefully = true;
}
public void run() { public void run() {
_log.debug("run() called for socket sender " + _id);
ByteArray data = null; ByteArray data = null;
while (_stillRunning) { while (_stillRunning) {
data = null; data = null;
try { try {
synchronized (_data) { synchronized (_data) {
if (_data.size() > 0) if (_data.size() > 0) {
data = (ByteArray)_data.remove(0); data = (ByteArray)_data.remove(0);
else } else if (_shuttingDownGracefully) {
/* No data left and shutting down gracefully?
If so, stop the sender. */
stopRunning();
break;
} else {
/* Wait for data. */
_data.wait(5000); _data.wait(5000);
} }
}
if (data != null) { if (data != null) {
try { try {