SAM: Add start() to session interface,

don't start threads in constructors.
Start master acceptor thread.
Javadocs, SAMv2StreamSession cleanup
This commit is contained in:
zzz
2016-02-05 18:44:35 +00:00
parent 9b004bc61f
commit 270bc24b62
12 changed files with 110 additions and 66 deletions

View File

@ -22,6 +22,7 @@ import net.i2p.client.streaming.I2PSocket;
import net.i2p.data.DataFormatException; import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log; import net.i2p.util.Log;
/** /**
@ -39,10 +40,13 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
private final SAMv3Handler handler; private final SAMv3Handler handler;
private final SAMv3DatagramServer dgs; private final SAMv3DatagramServer dgs;
private final Map<String, SAMMessageSess> sessions; private final Map<String, SAMMessageSess> sessions;
private final StreamAcceptor streamAcceptor;
/** /**
* Build a Session according to information * Build a Session according to information
* registered with the given nickname * registered with the given nickname.
*
* Caller MUST call start().
* *
* @param nick nickname of the session * @param nick nickname of the session
* @throws IOException * @throws IOException
@ -63,6 +67,16 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
// if we get a RAW session added with 0/0, it will replace this, // if we get a RAW session added with 0/0, it will replace this,
// and we won't add this back if removed. // and we won't add this back if removed.
isess.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); isess.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
streamAcceptor = new StreamAcceptor();
}
/**
* Overridden to start the acceptor.
*/
@Override
public void start() {
Thread t = new I2PAppThread(streamAcceptor, "SAMMasterAcceptor");
t.start();
} }
/** /**
@ -147,6 +161,7 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
} }
// listeners etc // listeners etc
sess.start();
// all ok // all ok
return null; return null;
} }
@ -236,10 +251,12 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
/** /**
* Close the master session * Close the master session
* Overridden to stop the acceptor.
*/ */
@Override @Override
public void close() { public void close() {
// close sessions? // close sessions?
streamAcceptor.stopRunning();
super.close(); super.close();
} }
@ -290,11 +307,17 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
private class StreamAcceptor implements Runnable { private class StreamAcceptor implements Runnable {
private volatile boolean stop;
public StreamAcceptor() { public StreamAcceptor() {
} }
public void stopRunning() {
stop = true;
}
public void run() { public void run() {
while (getSocketServer()!=null) { while (!stop && getSocketServer() != null) {
// wait and accept a connection from I2P side // wait and accept a connection from I2P side
I2PSocket i2ps; I2PSocket i2ps;
@ -353,6 +376,8 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
_log.warn("No subsession found for incoming streaming connection on port " + port); _log.warn("No subsession found for incoming streaming connection on port " + port);
} }
} }
if (_log.shouldWarn())
_log.warn("Stream acceptor stopped");
} }
} }
} }

View File

@ -35,6 +35,7 @@ class SAMDatagramSession extends SAMMessageSession {
private final I2PDatagramMaker dgramMaker; private final I2PDatagramMaker dgramMaker;
private final I2PDatagramDissector dgramDissector = new I2PDatagramDissector(); private final I2PDatagramDissector dgramDissector = new I2PDatagramDissector();
/** /**
* Create a new SAM DATAGRAM session. * Create a new SAM DATAGRAM session.
* *
@ -57,6 +58,8 @@ class SAMDatagramSession extends SAMMessageSession {
/** /**
* Create a new SAM DATAGRAM session. * Create a new SAM DATAGRAM session.
* *
* Caller MUST call start().
*
* @param destStream Input stream containing the destination keys * @param destStream Input stream containing the destination keys
* @param props Properties to setup the I2P session * @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data * @param recv Object that will receive incoming data

View File

@ -15,6 +15,12 @@ import net.i2p.data.Destination;
*/ */
interface SAMMessageSess extends Closeable { interface SAMMessageSess extends Closeable {
/**
* Start a SAM message-based session.
* MUST be called after constructor.
*/
public void start();
/** /**
* Close a SAM message-based session. * Close a SAM message-based session.
*/ */

View File

@ -72,9 +72,6 @@ abstract class SAMMessageSession implements SAMMessageSess {
session = handler.getSession(); session = handler.getSession();
listenProtocol = I2PSession.PROTO_ANY; listenProtocol = I2PSession.PROTO_ANY;
listenPort = I2PSession.PORT_ANY; listenPort = I2PSession.PORT_ANY;
// FIXME don't start threads in constructors
Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler");
t.start();
} }
/** /**
@ -97,7 +94,12 @@ abstract class SAMMessageSession implements SAMMessageSess {
handler = new SAMMessageSessionHandler(session); handler = new SAMMessageSessionHandler(session);
this.listenProtocol = listenProtocol; this.listenProtocol = listenProtocol;
this.listenPort = listenPort; this.listenPort = listenPort;
// FIXME don't start threads in constructors }
/*
* @since 0.9.25
*/
public void start() {
Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler"); Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler");
t.start(); t.start();
} }

View File

@ -49,6 +49,8 @@ class SAMRawSession extends SAMMessageSession {
/** /**
* Create a new SAM RAW session. * Create a new SAM RAW session.
* *
* Caller MUST call start().
*
* @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile) * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile)
* @param props Properties to setup the I2P session * @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data * @param recv Object that will receive incoming data

View File

@ -84,6 +84,8 @@ class SAMStreamSession implements SAMMessageSess {
/** /**
* Create a new SAM STREAM session. * Create a new SAM STREAM session.
* *
* Caller MUST call start().
*
* @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile) * @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile)
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") or "__v3__" if extended by SAMv3StreamSession * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") or "__v3__" if extended by SAMv3StreamSession
* @param props Properties to setup the I2P session * @param props Properties to setup the I2P session
@ -181,11 +183,7 @@ class SAMStreamSession implements SAMMessageSess {
if (startAcceptor) { if (startAcceptor) {
// FIXME don't start threads in constructors
server = new SAMStreamSessionServer(); server = new SAMStreamSessionServer();
Thread t = new I2PAppThread(server, "SAMStreamSessionServer");
t.start();
} else { } else {
server = null; server = null;
} }
@ -219,6 +217,16 @@ class SAMStreamSession implements SAMMessageSess {
server = null; server = null;
} }
/*
* @since 0.9.25
*/
public void start() {
if (server != null) {
Thread t = new I2PAppThread(server, "SAMStreamSessionServer");
t.start();
}
}
/* /*
* @since 0.9.25 * @since 0.9.25
*/ */

View File

@ -274,8 +274,10 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (style.equals("RAW")) { if (style.equals("RAW")) {
rawSession = new SAMRawSession(destKeystream, props, this); rawSession = new SAMRawSession(destKeystream, props, this);
rawSession.start();
} else if (style.equals("DATAGRAM")) { } else if (style.equals("DATAGRAM")) {
datagramSession = new SAMDatagramSession(destKeystream, props,this); datagramSession = new SAMDatagramSession(destKeystream, props,this);
datagramSession.start();
} else if (style.equals("STREAM")) { } else if (style.equals("STREAM")) {
String dir = (String) props.remove("DIRECTION"); String dir = (String) props.remove("DIRECTION");
if (dir == null) { if (dir == null) {
@ -290,6 +292,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
} }
streamSession = newSAMStreamSession(destKeystream, dir,props); streamSession = newSAMStreamSession(destKeystream, dir,props);
streamSession.start();
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); _log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");

View File

@ -36,13 +36,13 @@ import net.i2p.util.Log;
* *
* @author mkvore * @author mkvore
*/ */
class SAMv2StreamSession extends SAMStreamSession class SAMv2StreamSession extends SAMStreamSession
{ {
/** /**
* Create a new SAM STREAM session. * Create a new SAM STREAM session.
* *
* Caller MUST call start().
*
* @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile) * @param dest Base64-encoded destination and private keys (same format as PrivateKeyFile)
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
* @param props Properties to setup the I2P session * @param props Properties to setup the I2P session
@ -60,6 +60,8 @@ class SAMv2StreamSession extends SAMStreamSession
/** /**
* Create a new SAM STREAM session. * Create a new SAM STREAM session.
* *
* Caller MUST call start().
*
* @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile) * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile)
* @param dir Session direction ("RECEIVE", "CREATE" or "BOTH") * @param dir Session direction ("RECEIVE", "CREATE" or "BOTH")
* @param props Properties to setup the I2P session * @param props Properties to setup the I2P session
@ -86,7 +88,6 @@ class SAMv2StreamSession extends SAMStreamSession
* receive-only session * receive-only session
* @return true if the communication with the SAM client is ok * @return true if the communication with the SAM client is ok
*/ */
@Override @Override
public boolean connect ( int id, String dest, Properties props ) public boolean connect ( int id, String dest, Properties props )
throws DataFormatException, SAMInvalidDirectionException throws DataFormatException, SAMInvalidDirectionException
@ -120,18 +121,13 @@ class SAMv2StreamSession extends SAMStreamSession
return true ; return true ;
} }
/** /**
* SAM STREAM socket connecter, running in its own thread. * SAM STREAM socket connecter, running in its own thread.
* *
* @author mkvore * @author mkvore
*/ */
private class StreamConnector implements Runnable private class StreamConnector implements Runnable
{ {
private final int id; private final int id;
private final Destination dest ; private final Destination dest ;
private final I2PSocketOptions opts ; private final I2PSocketOptions opts ;
@ -144,7 +140,6 @@ class SAMv2StreamSession extends SAMStreamSession
* @param opts Socket options (I2PSocketOptions) * @param opts Socket options (I2PSocketOptions)
*/ */
public StreamConnector ( int id, Destination dest, I2PSocketOptions opts )// throws IOException public StreamConnector ( int id, Destination dest, I2PSocketOptions opts )// throws IOException
{ {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -155,7 +150,6 @@ class SAMv2StreamSession extends SAMStreamSession
this.dest = dest ; this.dest = dest ;
} }
public void run() public void run()
{ {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
@ -215,8 +209,6 @@ class SAMv2StreamSession extends SAMStreamSession
} }
} }
/** /**
* Lets us push data through the stream without blocking, (even after exceeding * Lets us push data through the stream without blocking, (even after exceeding
* the I2PSocket's buffer) * the I2PSocket's buffer)
@ -226,7 +218,6 @@ class SAMv2StreamSession extends SAMStreamSession
* @return v2StreamSender * @return v2StreamSender
* @throws IOException * @throws IOException
*/ */
@Override @Override
protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException protected StreamSender newStreamSender ( I2PSocket s, int id ) throws IOException
{ {
@ -241,7 +232,6 @@ class SAMv2StreamSession extends SAMStreamSession
} }
private class V2StreamSender extends StreamSender private class V2StreamSender extends StreamSender
{ {
private final List<ByteArray> _data; private final List<ByteArray> _data;
private int _dataSize; private int _dataSize;
@ -431,8 +421,6 @@ class SAMv2StreamSession extends SAMStreamSession
} }
} }
/** /**
* Send bytes through a SAM STREAM session. * Send bytes through a SAM STREAM session.
* *
@ -459,24 +447,18 @@ class SAMv2StreamSession extends SAMStreamSession
return true; return true;
} }
/** /**
* SAM STREAM socket reader, running in its own thread. It forwards * SAM STREAM socket reader, running in its own thread. It forwards
* forward data to/from an I2P socket. * forward data to/from an I2P socket.
* *
* @author human * @author human
*/ */
public class SAMv2StreamSessionSocketReader extends SAMv1StreamSessionSocketReader public class SAMv2StreamSessionSocketReader extends SAMv1StreamSessionSocketReader
{ {
protected boolean nolimit ; protected boolean nolimit ;
protected long limit ; protected long limit ;
protected long totalReceived ; protected long totalReceived ;
/** /**
* Create a new SAM STREAM session socket reader * Create a new SAM STREAM session socket reader
* *
@ -581,7 +563,4 @@ class SAMv2StreamSession extends SAMStreamSession
_log.debug ( "Shutting down SAM STREAM session socket handler " + id ); _log.debug ( "Shutting down SAM STREAM session socket handler " + id );
} }
} }
} }

View File

@ -32,6 +32,8 @@ class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDat
* build a DatagramSession according to informations registered * build a DatagramSession according to informations registered
* with the given nickname * with the given nickname
* *
* Caller MUST call start().
*
* @param nick nickname of the session * @param nick nickname of the session
* @throws IOException * @throws IOException
* @throws DataFormatException * @throws DataFormatException
@ -61,6 +63,8 @@ class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDat
* Build a Datagram Session on an existing i2p session * Build a Datagram Session on an existing i2p session
* registered with the given nickname * registered with the given nickname
* *
* Caller MUST call start().
*
* @param nick nickname of the session * @param nick nickname of the session
* @throws IOException * @throws IOException
* @throws DataFormatException * @throws DataFormatException

View File

@ -455,15 +455,18 @@ class SAMv3Handler extends SAMv1Handler
SAMv3RawSession v3 = new SAMv3RawSession(nick, dgs); SAMv3RawSession v3 = new SAMv3RawSession(nick, dgs);
rawSession = v3; rawSession = v3;
this.session = v3; this.session = v3;
v3.start();
} else if (style.equals("DATAGRAM")) { } else if (style.equals("DATAGRAM")) {
SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props); SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props);
SAMv3DatagramSession v3 = new SAMv3DatagramSession(nick, dgs); SAMv3DatagramSession v3 = new SAMv3DatagramSession(nick, dgs);
datagramSession = v3; datagramSession = v3;
this.session = v3; this.session = v3;
v3.start();
} else if (style.equals("STREAM")) { } else if (style.equals("STREAM")) {
SAMv3StreamSession v3 = newSAMStreamSession(nick); SAMv3StreamSession v3 = newSAMStreamSession(nick);
streamSession = v3; streamSession = v3;
this.session = v3; this.session = v3;
v3.start();
} else if (style.equals("MASTER")) { } else if (style.equals("MASTER")) {
SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props); SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props);
MasterSession v3 = new MasterSession(nick, dgs, this, allProps); MasterSession v3 = new MasterSession(nick, dgs, this, allProps);
@ -471,6 +474,7 @@ class SAMv3Handler extends SAMv1Handler
datagramSession = v3; datagramSession = v3;
rawSession = v3; rawSession = v3;
this.session = v3; this.session = v3;
v3.start();
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); _log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");

View File

@ -34,6 +34,8 @@ class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver {
* Build a Raw Datagram Session according to information * Build a Raw Datagram Session according to information
* registered with the given nickname * registered with the given nickname
* *
* Caller MUST call start().
*
* @param nick nickname of the session * @param nick nickname of the session
* @throws IOException * @throws IOException
* @throws DataFormatException * @throws DataFormatException
@ -62,6 +64,8 @@ class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver {
* Build a Raw Session on an existing i2p session * Build a Raw Session on an existing i2p session
* registered with the given nickname * registered with the given nickname
* *
* Caller MUST call start().
*
* @param nick nickname of the session * @param nick nickname of the session
* @throws IOException * @throws IOException
* @throws DataFormatException * @throws DataFormatException

View File

@ -71,6 +71,8 @@ class SAMv3StreamSession extends SAMStreamSession implements Session
* Create a new SAM STREAM session, according to information * Create a new SAM STREAM session, according to information
* registered with the given nickname * registered with the given nickname
* *
* Caller MUST call start().
*
* @param login The nickname * @param login The nickname
* @throws IOException * @throws IOException
* @throws DataFormatException * @throws DataFormatException
@ -88,9 +90,11 @@ class SAMv3StreamSession extends SAMStreamSession implements Session
} }
/** /**
* Build a Datagram Session on an existing I2P session * Build a Stream Session on an existing I2P session
* registered with the given nickname * registered with the given nickname
* *
* Caller MUST call start().
*
* @param nick nickname of the session * @param nick nickname of the session
* @throws IOException * @throws IOException
* @throws DataFormatException * @throws DataFormatException