propagate from branch 'i2p.i2p.zzz.sam' (head 68de14d0053dea374413f9e0419b1c0f7e9ec3af)

to branch 'i2p.i2p' (head 54f5dd288f7c0c5a50f7f63f911aec4008be27e2)
This commit is contained in:
zzz
2015-06-03 11:42:54 +00:00
13 changed files with 349 additions and 162 deletions

View File

@ -0,0 +1,14 @@
package net.i2p.sam;
/**
* Something that can be stopped by the SAMBridge.
*
* @since 0.9.20
*/
public interface Handler {
/**
* Stop the handler
*/
public void stopHandling();
}

View File

@ -19,9 +19,13 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.app.*;
@ -30,6 +34,7 @@ import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
import net.i2p.util.PortMapper;
/**
* SAM bridge implementation.
@ -55,6 +60,7 @@ public class SAMBridge implements Runnable, ClientApp {
* destination keys (Destination+PrivateKey+SigningPrivateKey)
*/
private final Map<String,String> nameToPrivKeys;
private final Set<Handler> _handlers;
private volatile boolean acceptConnections = true;
@ -95,6 +101,7 @@ public class SAMBridge implements Runnable, ClientApp {
_listenPort = options.port;
persistFilename = options.keyFile;
nameToPrivKeys = new HashMap<String,String>(8);
_handlers = new HashSet<Handler>(8);
this.i2cpProps = options.opts;
_state = INITIALIZED;
}
@ -124,6 +131,7 @@ public class SAMBridge implements Runnable, ClientApp {
_listenPort = listenPort;
persistFilename = persistFile;
nameToPrivKeys = new HashMap<String,String>(8);
_handlers = new HashSet<Handler>(8);
loadKeys();
try {
openSocket();
@ -209,8 +217,9 @@ public class SAMBridge implements Runnable, ClientApp {
}
/**
* Load up the keys from the persistFilename
*
* Load up the keys from the persistFilename.
* TODO use DataHelper
* TODO store in config dir, not base dir
*/
private void loadKeys() {
synchronized (nameToPrivKeys) {
@ -218,7 +227,7 @@ public class SAMBridge implements Runnable, ClientApp {
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(
new FileInputStream(persistFilename)));
new FileInputStream(persistFilename), "UTF-8"));
String line = null;
while ( (line = br.readLine()) != null) {
int eq = line.indexOf('=');
@ -226,6 +235,8 @@ public class SAMBridge implements Runnable, ClientApp {
String privKeys = line.substring(eq+1);
nameToPrivKeys.put(name, privKeys);
}
if (_log.shouldInfo())
_log.info("Loaded " + nameToPrivKeys.size() + " private keys from " + persistFilename);
} catch (FileNotFoundException fnfe) {
_log.warn("Key file does not exist at " + persistFilename);
} catch (IOException ioe) {
@ -237,8 +248,9 @@ public class SAMBridge implements Runnable, ClientApp {
}
/**
* Store the current keys to disk in the location specified on creation
*
* Store the current keys to disk in the location specified on creation.
* TODO use DataHelper
* TODO store in config dir, not base dir
*/
private void storeKeys() {
synchronized (nameToPrivKeys) {
@ -248,11 +260,13 @@ public class SAMBridge implements Runnable, ClientApp {
for (Map.Entry<String, String> entry : nameToPrivKeys.entrySet()) {
String name = entry.getKey();
String privKeys = entry.getValue();
out.write(name.getBytes());
out.write(name.getBytes("UTF-8"));
out.write('=');
out.write(privKeys.getBytes());
out.write(privKeys.getBytes("UTF-8"));
out.write('\n');
}
if (_log.shouldInfo())
_log.info("Saved " + nameToPrivKeys.size() + " private keys to " + persistFilename);
} catch (IOException ioe) {
_log.error("Error writing out the SAM keys to " + persistFilename, ioe);
} finally {
@ -261,6 +275,51 @@ public class SAMBridge implements Runnable, ClientApp {
}
}
/**
* Handlers must call on startup
* @since 0.9.20
*/
public void register(Handler handler) {
if (_log.shouldInfo())
_log.info("Register " + handler);
synchronized (_handlers) {
_handlers.add(handler);
}
}
/**
* Handlers must call on stop
* @since 0.9.20
*/
public void unregister(Handler handler) {
if (_log.shouldInfo())
_log.info("Unregister " + handler);
synchronized (_handlers) {
_handlers.remove(handler);
}
}
/**
* Stop all the handlers.
* @since 0.9.20
*/
private void stopHandlers() {
List<Handler> handlers = null;
synchronized (_handlers) {
if (!_handlers.isEmpty()) {
handlers = new ArrayList<Handler>(_handlers);
_handlers.clear();
}
}
if (handlers != null) {
for (Handler handler : handlers) {
if (_log.shouldInfo())
_log.info("Stopping " + handler);
handler.stopHandling();
}
}
}
////// begin ClientApp interface, use only if using correct construtor
/**
@ -270,6 +329,9 @@ public class SAMBridge implements Runnable, ClientApp {
if (_state != INITIALIZED)
return;
changeState(STARTING);
synchronized (_handlers) {
_handlers.clear();
}
loadKeys();
try {
openSocket();
@ -285,7 +347,8 @@ public class SAMBridge implements Runnable, ClientApp {
}
/**
* Does NOT stop existing sessions.
* As of 0.9.20, stops running handlers and sessions.
*
* @since 0.9.6
*/
public synchronized void shutdown(String[] args) {
@ -293,11 +356,11 @@ public class SAMBridge implements Runnable, ClientApp {
return;
changeState(STOPPING);
acceptConnections = false;
stopHandlers();
if (_runner != null)
_runner.interrupt();
else
changeState(STOPPED);
// TODO does not stop active connections / sessions
}
/**
@ -375,7 +438,7 @@ public class SAMBridge implements Runnable, ClientApp {
* @since 0.9.6
*/
private void startThread() {
I2PAppThread t = new I2PAppThread(this, "SAMListener");
I2PAppThread t = new I2PAppThread(this, "SAMListener " + _listenPort);
if (Boolean.parseBoolean(System.getProperty("sam.shutdownOnOOM"))) {
t.addOOMEventThreadListener(new I2PAppThread.OOMEventListener() {
public void outOfMemory(OutOfMemoryError err) {
@ -487,6 +550,7 @@ public class SAMBridge implements Runnable, ClientApp {
changeState(RUNNING);
if (_mgr != null)
_mgr.register(this);
I2PAppContext.getGlobalContext().portMapper().register(PortMapper.SVC_SAM, _listenPort);
try {
while (acceptConnections) {
SocketChannel s = serverSocket.accept();
@ -495,18 +559,19 @@ public class SAMBridge implements Runnable, ClientApp {
+ s.socket().getInetAddress().toString() + ":"
+ s.socket().getPort());
class HelloHandler implements Runnable {
private final SocketChannel s;
private final SAMBridge parent;
class HelloHandler implements Runnable, Handler {
private final SocketChannel s;
private final SAMBridge parent;
HelloHandler(SocketChannel s, SAMBridge parent) {
HelloHandler(SocketChannel s, SAMBridge parent) {
this.s = s ;
this.parent = parent ;
}
}
public void run() {
public void run() {
parent.register(this);
try {
SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps);
SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps, parent);
if (handler == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM handler has not been instantiated");
@ -515,7 +580,6 @@ public class SAMBridge implements Runnable, ClientApp {
} catch (IOException e) {}
return;
}
handler.setBridge(parent);
handler.startHandling();
} catch (SAMException e) {
if (_log.shouldLog(Log.ERROR))
@ -526,11 +590,17 @@ public class SAMBridge implements Runnable, ClientApp {
} catch (Exception ee) {
try { s.close(); } catch (IOException ioe) {}
_log.log(Log.CRIT, "Unexpected error handling SAM connection", ee);
}
}
} finally {
parent.unregister(this);
}
}
/** @since 0.9.20 */
public void stopHandling() {
try { s.close(); } catch (IOException ioe) {}
}
}
// TODO: Handler threads are not saved or tracked and cannot be stopped
new I2PAppThread(new HelloHandler(s,this), "HelloHandler").start();
new I2PAppThread(new HelloHandler(s,this), "SAM HelloHandler").start();
}
changeState(STOPPING);
} catch (Exception e) {
@ -546,6 +616,8 @@ public class SAMBridge implements Runnable, ClientApp {
if (serverSocket != null)
serverSocket.close();
} catch (IOException e) {}
I2PAppContext.getGlobalContext().portMapper().unregister(PortMapper.SVC_SAM);
stopHandlers();
changeState(STOPPED);
}
}

View File

@ -25,12 +25,12 @@ import net.i2p.util.Log;
*
* @author human
*/
abstract class SAMHandler implements Runnable {
abstract class SAMHandler implements Runnable, Handler {
protected final Log _log;
protected I2PAppThread thread = null;
protected SAMBridge bridge = null;
protected I2PAppThread thread;
protected final SAMBridge bridge;
private final Object socketWLock = new Object(); // Guards writings on socket
protected final SocketChannel socket;
@ -41,8 +41,8 @@ abstract class SAMHandler implements Runnable {
/** I2CP options configuring the I2CP connection (port, host, numHops, etc) */
protected final Properties i2cpProps;
private final Object stopLock = new Object();
private volatile boolean stopHandler;
protected final Object stopLock = new Object();
protected boolean stopHandler;
/**
* SAMHandler constructor (to be called by subclasses)
@ -53,14 +53,15 @@ abstract class SAMHandler implements Runnable {
* @param i2cpProps properties to configure the I2CP connection (host, port, etc)
* @throws IOException
*/
protected SAMHandler(SocketChannel s,
int verMajor, int verMinor, Properties i2cpProps) throws IOException {
protected SAMHandler(SocketChannel s, int verMajor, int verMinor,
Properties i2cpProps, SAMBridge parent) throws IOException {
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
socket = s;
this.verMajor = verMajor;
this.verMinor = verMinor;
this.i2cpProps = i2cpProps;
bridge = parent;
}
/**
@ -68,12 +69,10 @@ abstract class SAMHandler implements Runnable {
*
*/
public final void startHandling() {
thread = new I2PAppThread(this, "SAMHandler");
thread = new I2PAppThread(this, getClass().getSimpleName());
thread.start();
}
public void setBridge(SAMBridge bridge) { this.bridge = bridge; }
/**
* Actually handle the SAM protocol.
*
@ -81,10 +80,9 @@ abstract class SAMHandler implements Runnable {
protected abstract void handle();
/**
* Get the input stream of the socket connected to the SAM client
* Get the channel of the socket connected to the SAM client
*
* @return input stream
* @throws IOException
* @return channel
*/
protected final SocketChannel getClientSocket() {
return socket ;
@ -156,13 +154,17 @@ abstract class SAMHandler implements Runnable {
}
/**
* Stop the SAM handler
*
* Stop the SAM handler, close the client socket,
* unregister with the bridge.
*/
public final void stopHandling() {
public void stopHandling() {
synchronized (stopLock) {
stopHandler = true;
}
try {
closeClientSocket();
} catch (IOException e) {}
bridge.unregister(this);
}
/**
@ -183,14 +185,23 @@ abstract class SAMHandler implements Runnable {
*/
@Override
public final String toString() {
return ("SAM handler (class: " + this.getClass().getName()
return (this.getClass().getSimpleName()
+ "; SAM version: " + verMajor + "." + verMinor
+ "; client: "
+ this.socket.socket().getInetAddress().toString() + ":"
+ this.socket.socket().getPort() + ")");
}
/**
* Register with the bridge, call handle(),
* unregister with the bridge.
*/
public final void run() {
handle();
bridge.register(this);
try {
handle();
} finally {
bridge.unregister(this);
}
}
}

View File

@ -38,13 +38,15 @@ class SAMHandlerFactory {
* @throws SAMException if the connection handshake (HELLO message) was malformed
* @return A SAM protocol handler, or null if the client closed before the handshake
*/
public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps) throws SAMException {
public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps,
SAMBridge parent) throws SAMException {
StringTokenizer tok;
Log log = I2PAppContext.getGlobalContext().logManager().getLog(SAMHandlerFactory.class);
try {
Socket sock = s.socket();
sock.setSoTimeout(HELLO_TIMEOUT);
sock.setKeepAlive(true);
String line = DataHelper.readLine(sock.getInputStream());
sock.setSoTimeout(0);
if (line == null) {
@ -103,13 +105,13 @@ class SAMHandlerFactory {
try {
switch (verMajor) {
case 1:
handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps);
handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps, parent);
break;
case 2:
handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps);
handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps, parent);
break;
case 3:
handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps);
handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps, parent);
break;
default:
log.error("BUG! Trying to initialize the wrong SAM version!");

View File

@ -72,6 +72,7 @@ abstract class SAMMessageSession {
handler = new SAMMessageSessionHandler(destStream, props);
// FIXME don't start threads in constructors
Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler");
t.start();
}
@ -125,7 +126,6 @@ abstract class SAMMessageSession {
/**
* Close a SAM message-based session.
*
*/
public void close() {
handler.stopRunning();

View File

@ -212,6 +212,7 @@ class SAMUtils {
}
/* Dump a Properties object in an human-readable form */
/****
private static String dumpProperties(Properties props) {
StringBuilder builder = new StringBuilder();
String key, val;
@ -231,6 +232,7 @@ class SAMUtils {
return builder.toString();
}
****/
/****
public static void main(String args[]) {

View File

@ -60,8 +60,9 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
* @throws SAMException
* @throws IOException
*/
public SAMv1Handler(SocketChannel s, int verMajor, int verMinor) throws SAMException, IOException {
this(s, verMajor, verMinor, new Properties());
public SAMv1Handler(SocketChannel s, int verMajor, int verMinor,
SAMBridge parent) throws SAMException, IOException {
this(s, verMajor, verMinor, new Properties(), parent);
}
/**
* Create a new SAM version 1 handler. This constructor expects
@ -75,13 +76,14 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
* @throws SAMException
* @throws IOException
*/
public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
super(s, verMajor, verMinor, i2cpProps);
public SAMv1Handler(SocketChannel s, int verMajor, int verMinor,
Properties i2cpProps, SAMBridge parent) throws SAMException, IOException {
super(s, verMajor, verMinor, i2cpProps, parent);
_id = __id.incrementAndGet();
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM version 1 handler instantiated");
if ( ! verifVersion() ) {
if ( ! verifVersion() ) {
throw new SAMException("BUG! Wrong protocol version!");
}
}
@ -183,8 +185,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
} catch (IOException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Caught IOException ("
+ e.getMessage() + ") for message [" + msg + "]", e);
_log.debug("Caught IOException for message [" + msg + "]", e);
} catch (Exception e) {
_log.error("Unexpected exception for message [" + msg + "]", e);
} finally {
@ -193,7 +194,8 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
try {
closeClientSocket();
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
if (_log.shouldWarn())
_log.warn("Error closing socket", e);
}
if (getRawSession() != null) {
getRawSession().close();
@ -797,7 +799,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
public void receiveRawBytes(byte data[]) throws IOException {
if (getRawSession() == null) {
_log.error("BUG! Received raw bytes, but session is null!");
throw new NullPointerException("BUG! RAW session is null!");
return;
}
ByteArrayOutputStream msg = new ByteArrayOutputStream();
@ -818,7 +820,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (getRawSession() == null) {
_log.error("BUG! Got raw receiving stop, but session is null!");
throw new NullPointerException("BUG! RAW session is null!");
return;
}
try {
@ -833,7 +835,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException {
if (getDatagramSession() == null) {
_log.error("BUG! Received datagram bytes, but session is null!");
throw new NullPointerException("BUG! DATAGRAM session is null!");
return;
}
ByteArrayOutputStream msg = new ByteArrayOutputStream();
@ -855,7 +857,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (getDatagramSession() == null) {
_log.error("BUG! Got datagram receiving stop, but session is null!");
throw new NullPointerException("BUG! DATAGRAM session is null!");
return;
}
try {
@ -873,7 +875,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if ( getStreamSession() == null )
{
_log.error ( "BUG! Want to answer to stream SEND, but session is null!" );
throw new NullPointerException ( "BUG! STREAM session is null!" );
return;
}
if ( !writeString ( "STREAM SEND ID=" + id
@ -891,7 +893,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if ( getStreamSession() == null )
{
_log.error ( "BUG! Stream outgoing buffer is free, but session is null!" );
throw new NullPointerException ( "BUG! STREAM session is null!" );
return;
}
if ( !writeString ( "STREAM READY_TO_SEND ID=" + id + "\n" ) )
@ -904,7 +906,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
public void notifyStreamIncomingConnection(int id, Destination d) throws IOException {
if (getStreamSession() == null) {
_log.error("BUG! Received stream connection, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
return;
}
if (!writeString("STREAM CONNECTED DESTINATION="
@ -914,18 +916,16 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
}
/** @param msg may be null */
public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException
{
if ( getStreamSession() == null )
{
_log.error ( "BUG! Received stream connection, but session is null!" );
throw new NullPointerException ( "BUG! STREAM session is null!" );
return;
}
String msgString = "" ;
if ( msg != null ) msgString = " MESSAGE=\"" + msg + "\"";
String msgString = createMessageString(msg);
if ( !writeString ( "STREAM STATUS RESULT="
+ result
+ " ID=" + id
@ -935,11 +935,36 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
throw new IOException ( "Error notifying connection to SAM client" );
}
}
/**
* Create a string to be appended to a status.
*
* @param msg may be null
* @return non-null, "" if msg is null, MESSAGE=msg or MESSAGE="msg a b c"
* with leading space if msg is non-null
* @since 0.9.20
*/
protected static String createMessageString(String msg) {
String rv;
if ( msg != null ) {
msg = msg.replace("\n", " ");
msg = msg.replace("\r", " ");
if (!msg.startsWith("\"")) {
msg = msg.replace("\"", "");
if (msg.contains("\"") || msg.contains("\t"))
msg = '"' + msg + '"';
}
rv = " MESSAGE=\"" + msg + "\"";
} else {
rv = "";
}
return rv;
}
public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
if (getStreamSession() == null) {
_log.error("Received stream bytes, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
return;
}
String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + data.remaining() + "\n";
@ -956,16 +981,15 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
}
/** @param msg may be null */
public void notifyStreamDisconnection(int id, String result, String msg) throws IOException {
if (getStreamSession() == null) {
_log.error("BUG! Received stream disconnection, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
return;
}
// FIXME: msg should be escaped!
if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result
+ (msg == null ? "" : (" MESSAGE=" + msg))
+ "\n")) {
String msgString = createMessageString(msg);
if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result + msgString + '\n')) {
throw new IOException("Error notifying disconnection to SAM client");
}
}
@ -976,7 +1000,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (getStreamSession() == null) {
_log.error("BUG! Got stream receiving stop, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
return;
}
try {

View File

@ -34,9 +34,10 @@ class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDatagramRe
* @param verMajor SAM major version to manage (should be 2)
* @param verMinor SAM minor version to manage
*/
public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException
public SAMv2Handler(SocketChannel s, int verMajor, int verMinor,
SAMBridge parent) throws SAMException, IOException
{
this ( s, verMajor, verMinor, new Properties() );
this(s, verMajor, verMinor, new Properties(), parent);
}
/**
@ -50,9 +51,10 @@ class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDatagramRe
* @param i2cpProps properties to configure the I2CP connection (host, port, etc)
*/
public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
public SAMv2Handler(SocketChannel s, int verMajor, int verMinor,
Properties i2cpProps, SAMBridge parent) throws SAMException, IOException
{
super ( s, verMajor, verMinor, i2cpProps );
super(s, verMajor, verMinor, i2cpProps, parent);
}
@Override

View File

@ -48,8 +48,8 @@ class SAMv3Handler extends SAMv1Handler
private Session session;
public static final SessionsDB sSessionsHash = new SessionsDB();
private boolean stolenSocket;
private boolean streamForwardingSocket;
private volatile boolean stolenSocket;
private volatile boolean streamForwardingSocket;
interface Session {
@ -67,9 +67,10 @@ class SAMv3Handler extends SAMv1Handler
* @param verMajor SAM major version to manage (should be 3)
* @param verMinor SAM minor version to manage
*/
public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException
public SAMv3Handler(SocketChannel s, int verMajor, int verMinor,
SAMBridge parent) throws SAMException, IOException
{
this ( s, verMajor, verMinor, new Properties() );
this(s, verMajor, verMinor, new Properties(), parent);
}
/**
@ -83,9 +84,10 @@ class SAMv3Handler extends SAMv1Handler
* @param i2cpProps properties to configure the I2CP connection (host, port, etc)
*/
public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException
public SAMv3Handler(SocketChannel s, int verMajor, int verMinor,
Properties i2cpProps, SAMBridge parent) throws SAMException, IOException
{
super ( s, verMajor, verMinor, i2cpProps );
super(s, verMajor, verMinor, i2cpProps, parent);
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM version 3 handler instantiated");
}
@ -214,6 +216,9 @@ class SAMv3Handler extends SAMv1Handler
}
}
/**
* The values in the SessionsDB
*/
public static class SessionRecord
{
private final String m_dest ;
@ -266,11 +271,14 @@ class SAMv3Handler extends SAMv1Handler
}
}
/**
* basically a HashMap from String to SessionRecord
*/
public static class SessionsDB
{
private static final long serialVersionUID = 0x1;
static class ExistingIdException extends Exception {
static class ExistingIdException extends Exception {
private static final long serialVersionUID = 0x1;
}
@ -284,6 +292,7 @@ class SAMv3Handler extends SAMv1Handler
map = new HashMap<String, SessionRecord>() ;
}
/** @return success */
synchronized public boolean put( String nick, SessionRecord session )
throws ExistingIdException, ExistingDestException
{
@ -305,17 +314,12 @@ class SAMv3Handler extends SAMv1Handler
return false ;
}
/** @return true if removed */
synchronized public boolean del( String nick )
{
SessionRecord rec = map.get(nick);
if ( rec!=null ) {
map.remove(nick);
return true ;
}
else
return false ;
return map.remove(nick) != null;
}
synchronized public SessionRecord get(String nick)
{
return map.get(nick);
@ -332,12 +336,23 @@ class SAMv3Handler extends SAMv1Handler
return this.socket.socket().getInetAddress().getHostAddress();
}
/**
* For SAMv3StreamSession connect and accept
*/
public void stealSocket()
{
stolenSocket = true ;
this.stopHandling();
}
/**
* For SAMv3StreamSession
* @since 0.9.20
*/
SAMBridge getBridge() {
return bridge;
}
public void handle() {
String msg = null;
String domain = null;
@ -348,7 +363,7 @@ class SAMv3Handler extends SAMv1Handler
this.thread.setName("SAMv3Handler " + _id);
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM handling started");
_log.debug("SAMv3 handling started");
try {
InputStream in = getClientSocket().socket().getInputStream();
@ -422,8 +437,7 @@ class SAMv3Handler extends SAMv1Handler
}
} catch (IOException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Caught IOException ("
+ e.getMessage() + ") for message [" + msg + "]", e);
_log.debug("Caught IOException for message [" + msg + "]", e);
} catch (Exception e) {
_log.error("Unexpected exception for message [" + msg + "]", e);
} finally {
@ -435,7 +449,8 @@ class SAMv3Handler extends SAMv1Handler
try {
closeClientSocket();
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
if (_log.shouldWarn())
_log.warn("Error closing socket", e);
}
}
if (streamForwardingSocket)
@ -444,20 +459,40 @@ class SAMv3Handler extends SAMv1Handler
try {
((SAMv3StreamSession)streamSession).stopForwardingIncoming();
} catch (SAMException e) {
_log.error("Error while stopping forwarding connections: " + e.getMessage());
if (_log.shouldWarn())
_log.warn("Error while stopping forwarding connections", e);
} catch (InterruptedIOException e) {
_log.error("Interrupted while stopping forwarding connections: " + e.getMessage());
if (_log.shouldWarn())
_log.warn("Interrupted while stopping forwarding connections", e);
}
}
}
die();
}
}
protected void die() {
/**
* Stop the SAM handler, close the socket,
* unregister with the bridge.
*
* Overridden to not close the client socket if stolen.
*
* @since 0.9.20
*/
@Override
public void stopHandling() {
synchronized (stopLock) {
stopHandler = true;
}
if (!stolenSocket) {
try {
closeClientSocket();
} catch (IOException e) {}
}
bridge.unregister(this);
}
private void die() {
SessionRecord rec = null ;
if (session!=null) {
@ -813,20 +848,15 @@ class SAMv3Handler extends SAMv1Handler
}
public void notifyStreamResult(boolean verbose, String result, String message) throws IOException
{
public void notifyStreamResult(boolean verbose, String result, String message) throws IOException {
if (!verbose) return ;
String out = "STREAM STATUS RESULT="+result;
if (message!=null)
out = out + " MESSAGE=\"" + message + "\"";
out = out + '\n';
String msgString = createMessageString(message);
String out = "STREAM STATUS RESULT=" + result + msgString + '\n';
if ( !writeString ( out ) )
{
throw new IOException ( "Error notifying connection to SAM client" );
}
}
if (!writeString(out)) {
throw new IOException ( "Error notifying connection to SAM client" );
}
}
public void notifyStreamIncomingConnection(Destination d) throws IOException {
if (getStreamSession() == null) {

View File

@ -74,6 +74,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
/**
* Connect the SAM STREAM session to the specified Destination
* for a single connection, using the socket stolen from the handler.
*
* @param handler The handler that communicates with the requesting client
* @param dest Base64-encoded Destination to connect to
@ -87,7 +88,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
* @throws IOException
*/
public void connect ( SAMv3Handler handler, String dest, Properties props )
throws I2PException, ConnectException, NoRouteToHostException,
throws I2PException, ConnectException, NoRouteToHostException,
DataFormatException, InterruptedIOException, IOException {
boolean verbose = (props.getProperty("SILENT", "false").equals("false"));
@ -117,13 +118,17 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
WritableByteChannel toClient = handler.getClientSocket();
WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream());
(new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start();
(new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start();
SAMBridge bridge = handler.getBridge();
(new Thread(rec.getThreadGroup(),
new Pipe(fromClient, toI2P, bridge),
"ConnectV3 SAMPipeClientToI2P")).start();
(new Thread(rec.getThreadGroup(),
new Pipe(fromI2P, toClient, bridge),
"ConnectV3 SAMPipeI2PToClient")).start();
}
/**
* Accept an incoming STREAM
* Accept a single incoming STREAM on the socket stolen from the handler.
*
* @param handler The handler that communicates with the requesting client
* @param verbose If true, SAM will send the Base64-encoded peer Destination of an
@ -150,8 +155,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
this.socketServer = this.socketMgr.getServerSocket();
}
I2PSocket i2ps;
i2ps = this.socketServer.accept();
I2PSocket i2ps = this.socketServer.accept();
synchronized( this.socketServerLock )
{
@ -159,11 +163,11 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
}
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
if ( rec==null || i2ps==null ) throw new InterruptedIOException() ;
if (verbose)
handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ;
if ( rec==null || i2ps==null ) throw new InterruptedIOException() ;
if (verbose)
handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ;
handler.stealSocket() ;
ReadableByteChannel fromClient = handler.getClientSocket();
@ -171,8 +175,13 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
WritableByteChannel toClient = handler.getClientSocket();
WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream());
(new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start();
(new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start();
SAMBridge bridge = handler.getBridge();
(new Thread(rec.getThreadGroup(),
new Pipe(fromClient, toI2P, bridge),
"AcceptV3 SAMPipeClientToI2P")).start();
(new Thread(rec.getThreadGroup(),
new Pipe(fromI2P, toClient, bridge),
"AcceptV3 SAMPipeI2PToClient")).start();
}
@ -210,10 +219,10 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
}
SocketForwarder forwarder = new SocketForwarder(host, port, this, verbose);
(new Thread(rec.getThreadGroup(), new I2PAppThread(forwarder, "SAMStreamForwarder"), "SAMStreamForwarder")).start();
(new Thread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start();
}
private static class SocketForwarder extends Thread
private static class SocketForwarder implements Runnable
{
private final String host;
private final int port;
@ -254,6 +263,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
// build pipes between both sockets
try {
clientServerSock.socket().setKeepAlive(true);
if (this.verbose)
SAMv3Handler.notifyStreamIncomingConnection(
clientServerSock, i2ps.getPeerDestination());
@ -261,8 +271,10 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream());
WritableByteChannel toClient = clientServerSock ;
WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream());
(new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start();
(new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start();
(new I2PAppThread(new Pipe(fromClient, toI2P, null),
"ForwardV3 SAMPipeClientToI2P")).start();
(new I2PAppThread(new Pipe(fromI2P,toClient, null),
"ForwardV3 SAMPipeI2PToClient")).start();
} catch (IOException e) {
try {
@ -277,48 +289,62 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
}
}
private static class Pipe extends Thread
private static class Pipe implements Runnable, Handler
{
private final ReadableByteChannel in ;
private final WritableByteChannel out ;
private final ByteBuffer buf ;
private final SAMBridge bridge;
public Pipe(ReadableByteChannel in, WritableByteChannel out, String name)
/**
* @param bridge may be null
*/
public Pipe(ReadableByteChannel in, WritableByteChannel out, SAMBridge bridge)
{
super(name);
this.in = in ;
this.out = out ;
this.buf = ByteBuffer.allocate(BUFFER_SIZE) ;
this.bridge = bridge;
}
public void run()
{
try {
while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) {
buf.flip();
out.write(buf);
buf.compact();
}
}
catch (IOException e)
{
this.interrupt();
public void run() {
if (bridge != null)
bridge.register(this);
try {
while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) {
buf.flip();
out.write(buf);
buf.compact();
}
try {
in.close();
}
catch (IOException e) {}
try {
buf.flip();
while (buf.hasRemaining())
out.write(buf);
}
catch (IOException e) {}
try {
out.close();
}
catch (IOException e) {}
}
} catch (IOException ioe) {
// ignore
} finally {
try {
in.close();
} catch (IOException e) {}
try {
buf.flip();
while (buf.hasRemaining()) {
out.write(buf);
}
} catch (IOException e) {}
try {
out.close();
} catch (IOException e) {}
if (bridge != null)
bridge.unregister(this);
}
}
/**
* Handler interface
* @since 0.9.20
*/
public void stopHandling() {
try {
in.close();
} catch (IOException e) {}
}
}
public I2PServerSocket getSocketServer()

View File

@ -493,6 +493,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
try {
I2PSSLSocketFactory fact = new I2PSSLSocketFactory(_context, false, "certificates/i2cp");
_socket = fact.createSocket(_hostname, _portNum);
_socket.setKeepAlive(true);
} catch (GeneralSecurityException gse) {
IOException ioe = new IOException("SSL Fail");
ioe.initCause(gse);
@ -500,6 +501,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2
}
} else {
_socket = new Socket(_hostname, _portNum);
_socket.setKeepAlive(true);
}
// _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
OutputStream out = _socket.getOutputStream();

View File

@ -89,6 +89,7 @@ class I2PSimpleSession extends I2PSessionImpl2 {
} else {
_socket = new Socket(_hostname, _portNum);
}
_socket.setKeepAlive(true);
OutputStream out = _socket.getOutputStream();
out.write(I2PClient.PROTOCOL_BYTE);
out.flush();

View File

@ -93,6 +93,7 @@ class ClientListenerRunner implements Runnable {
if (validate(socket)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection received");
socket.setKeepAlive(true);
runConnection(socket);
} else {
if (_log.shouldLog(Log.WARN))