SAM v3.3: Fixes after testing

- More error checking
- Better error responses
- Fix listen port and protocol for DATAGRAM and RAW
- Fix adding sessions with duplicate dests to DB
- Add more sessions in SAMStreamSink
This commit is contained in:
zzz
2016-02-06 16:56:37 +00:00
parent ceb7791541
commit edde533e1b
7 changed files with 55 additions and 30 deletions

View File

@ -42,6 +42,7 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
private final SAMv3DatagramServer dgs; private final SAMv3DatagramServer dgs;
private final Map<String, SAMMessageSess> sessions; private final Map<String, SAMMessageSess> sessions;
private final StreamAcceptor streamAcceptor; private final StreamAcceptor streamAcceptor;
private static final String[] INVALID_OPTS = { "PORT", "HOST", "FROM_PORT", "TO_PORT", "PROTOCOL" };
/** /**
* Build a Session according to information * Build a Session according to information
@ -57,6 +58,11 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
public MasterSession(String nick, SAMv3DatagramServer dgServer, SAMv3Handler handler, Properties props) public MasterSession(String nick, SAMv3DatagramServer dgServer, SAMv3Handler handler, Properties props)
throws IOException, DataFormatException, SAMException { throws IOException, DataFormatException, SAMException {
super(nick); super(nick);
for (int i = 0; i < INVALID_OPTS.length; i++) {
String p = INVALID_OPTS[i];
if (props.containsKey(p))
throw new SAMException("MASTER session options may not contain " + p);
}
dgs = dgServer; dgs = dgServer;
sessions = new ConcurrentHashMap<String, SAMMessageSess>(4); sessions = new ConcurrentHashMap<String, SAMMessageSess>(4);
this.handler = handler; this.handler = handler;
@ -162,15 +168,13 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
rec = new SessionRecord(getDestination().toBase64(), props, subhandler); rec = new SessionRecord(getDestination().toBase64(), props, subhandler);
try { try {
if (!SAMv3Handler.sSessionsHash.put(nick, rec)) SAMv3Handler.sSessionsHash.putDupDestOK(nick, rec);
return "Duplicate ID " + nick;
sessions.put(nick, sess); sessions.put(nick, sess);
} catch (SessionsDB.ExistingIdException e) { } catch (SessionsDB.ExistingIdException e) {
return e.toString(); return "Duplicate ID " + nick;
} catch (SessionsDB.ExistingDestException e) {
// fixme need new db method for dup dests
} }
// listeners etc if (_log.shouldWarn())
_log.warn("added " + style + " proto " + listenProtocol + " port " + listenPort);
sess.start(); sess.start();
// all ok // all ok

View File

@ -67,11 +67,10 @@ abstract class SAMMessageSession implements SAMMessageSess {
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Initializing SAM message-based session"); _log.debug("Initializing SAM message-based session");
handler = new SAMMessageSessionHandler(destStream, props);
session = handler.getSession();
listenProtocol = I2PSession.PROTO_ANY; listenProtocol = I2PSession.PROTO_ANY;
listenPort = I2PSession.PORT_ANY; listenPort = I2PSession.PORT_ANY;
handler = new SAMMessageSessionHandler(destStream, props);
session = handler.getSession();
} }
/** /**
@ -89,11 +88,10 @@ abstract class SAMMessageSession implements SAMMessageSess {
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Initializing SAM message-based session"); _log.debug("Initializing SAM message-based session");
session = sess;
handler = new SAMMessageSessionHandler(session);
this.listenProtocol = listenProtocol; this.listenProtocol = listenProtocol;
this.listenPort = listenPort; this.listenPort = listenPort;
session = sess;
handler = new SAMMessageSessionHandler(session);
} }
/* /*

View File

@ -368,7 +368,7 @@ class SAMStreamSession implements SAMMessageSess {
* @since 0.9.25 moved from subclass SAMv3StreamSession to implement SAMMessageSess * @since 0.9.25 moved from subclass SAMv3StreamSession to implement SAMMessageSess
*/ */
public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws I2PSessionException { public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws I2PSessionException {
throw new I2PSessionException("Unsupported in stream or master session"); throw new I2PSessionException("Unsupported in STREAM or MASTER session");
} }
/** /**

View File

@ -314,10 +314,13 @@ class SAMv3Handler extends SAMv1Handler
} catch (IOException e) { } catch (IOException e) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Caught IOException in handler", e); _log.debug("Caught IOException in handler", e);
writeString(SESSION_ERROR, e.getMessage());
} catch (SAMException e) { } catch (SAMException e) {
_log.error("Unexpected exception for message [" + msg + ']', e); _log.error("Unexpected exception for message [" + msg + ']', e);
writeString(SESSION_ERROR, e.getMessage());
} catch (RuntimeException e) { } catch (RuntimeException e) {
_log.error("Unexpected exception for message [" + msg + ']', e); _log.error("Unexpected exception for message [" + msg + ']', e);
writeString(SESSION_ERROR, e.getMessage());
} finally { } finally {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Stopping handler"); _log.debug("Stopping handler");
@ -467,8 +470,6 @@ class SAMv3Handler extends SAMv1Handler
// SAMStreamSession constructor. // SAMStreamSession constructor.
allProps.setProperty("i2p.streaming.enforceProtocol", "true"); allProps.setProperty("i2p.streaming.enforceProtocol", "true");
allProps.setProperty("i2cp.dontPublishLeaseSet", "false"); allProps.setProperty("i2cp.dontPublishLeaseSet", "false");
allProps.setProperty("FROM_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED));
allProps.setProperty("TO_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED));
} }
try { try {
@ -530,9 +531,9 @@ class SAMv3Handler extends SAMv1Handler
msg = msess.remove(nick, props); msg = msess.remove(nick, props);
} }
if (msg == null) if (msg == null)
return writeString("SESSION STATUS RESULT=OK", opcode + ' ' + nick); return writeString("SESSION STATUS RESULT=OK ID=\"" + nick + '"', opcode + ' ' + nick);
else else
return writeString(SESSION_ERROR, msg); return writeString(SESSION_ERROR + " ID=\"" + nick + '"', msg);
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION message opcode: \"" _log.debug("Unrecognized SESSION message opcode: \""
@ -581,10 +582,9 @@ class SAMv3Handler extends SAMv1Handler
if ( session != null ) if ( session != null )
{ {
_log.error ( "STREAM message received, but this session is a master session" ); _log.error("v3 control socket cannot be used for STREAM");
try { try {
notifyStreamResult(true, "I2P_ERROR", "master session cannot be used for streams"); notifyStreamResult(true, "I2P_ERROR", "v3 control socket cannot be used for STREAM");
} catch (IOException e) {} } catch (IOException e) {}
return false; return false;
} }

View File

@ -32,8 +32,7 @@ class SessionsDB {
map = new HashMap<String, SessionRecord>() ; map = new HashMap<String, SessionRecord>() ;
} }
/** @return success */ public synchronized void put(String nick, SessionRecord session)
synchronized public boolean put( String nick, SessionRecord session )
throws ExistingIdException, ExistingDestException throws ExistingIdException, ExistingDestException
{ {
if ( map.containsKey(nick) ) { if ( map.containsKey(nick) ) {
@ -44,14 +43,19 @@ class SessionsDB {
throw new ExistingDestException(); throw new ExistingDestException();
} }
} }
session.createThreadGroup("SAM session "+nick);
map.put(nick, session) ;
}
if ( !map.containsKey(nick) ) { /** @since 0.9.25 */
session.createThreadGroup("SAM session "+nick); public synchronized void putDupDestOK(String nick, SessionRecord session)
map.put(nick, session) ; throws ExistingIdException
return true ; {
if (map.containsKey(nick)) {
throw new ExistingIdException();
} }
else session.createThreadGroup("SAM session "+nick);
return false ; map.put(nick, session) ;
} }
/** @return true if removed */ /** @return true if removed */

View File

@ -137,6 +137,10 @@ public class SAMReader {
if ( (eq > 0) && (eq < pair.length() - 1) ) { if ( (eq > 0) && (eq < pair.length() - 1) ) {
String name = pair.substring(0, eq); String name = pair.substring(0, eq);
String val = pair.substring(eq+1); String val = pair.substring(eq+1);
if (val.length() <= 0) {
_log.error("Empty value for " + name);
continue;
}
while ( (val.charAt(0) == '\"') && (val.length() > 0) ) while ( (val.charAt(0) == '\"') && (val.length() > 0) )
val = val.substring(1); val = val.substring(1);
while ( (val.length() > 0) && (val.charAt(val.length()-1) == '\"') ) while ( (val.length() > 0) && (val.charAt(val.length()-1) == '\"') )

View File

@ -702,8 +702,7 @@ public class SAMStreamSink {
samOut.flush(); samOut.flush();
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION " + command + " sent"); _log.debug("SESSION " + command + " sent");
if (mode == STREAM) { //if (mode == STREAM) {
// why only waiting in stream mode?
boolean ok; boolean ok;
if (masterMode) if (masterMode)
ok = eventHandler.waitForSessionAddReply(); ok = eventHandler.waitForSessionAddReply();
@ -713,6 +712,22 @@ public class SAMStreamSink {
throw new IOException("SESSION " + command + " failed"); throw new IOException("SESSION " + command + " failed");
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION " + command + " reply found: " + ok); _log.debug("SESSION " + command + " reply found: " + ok);
//}
if (masterMode) {
// do a bunch more
req = "SESSION ADD STYLE=STREAM FROM_PORT=99 ID=stream99\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=STREAM FROM_PORT=98 ID=stream98\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=DATAGRAM PORT=9997 LISTEN_PORT=97 ID=dg97\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=DATAGRAM PORT=9996 FROM_PORT=96 ID=dg96\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=RAW PORT=9995 LISTEN_PORT=95 ID=raw95\n";
samOut.write(req.getBytes("UTF-8"));
req = "SESSION ADD STYLE=RAW PORT=9994 FROM_PORT=94 LISTEN_PROTOCOL=222 ID=raw94\n";
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
} }
req = "NAMING LOOKUP NAME=ME\n"; req = "NAMING LOOKUP NAME=ME\n";
samOut.write(req.getBytes("UTF-8")); samOut.write(req.getBytes("UTF-8"));