* Added STREAMing support;

* added NAMING LOOKUP NAME=ME support;
* various cleanups & fixes;
* what else?
(human)
This commit is contained in:
human
2004-04-14 16:59:47 +00:00
committed by zzz
parent d2b09ecfda
commit 7b03c95cfd
7 changed files with 953 additions and 151 deletions

View File

@ -60,7 +60,7 @@ public abstract class SAMHandler implements Runnable {
*
* @param data A byte array to be written
*/
protected void writeBytes(byte[] data) throws IOException {
protected void writeBytes(byte[] data) throws IOException {
synchronized (socketWLock) {
if (socketOS == null) {
socketOS = socket.getOutputStream();
@ -70,6 +70,26 @@ public abstract class SAMHandler implements Runnable {
}
}
/**
* Write a string to the handler's socket. This method must
* always be used when writing strings, unless you really know what
* you're doing.
*
* @param str A byte array to be written
*
* @return True is the string was successfully written, false otherwise
*/
protected boolean writeString(String str) {
try {
writeBytes(str.getBytes("ISO-8859-1"));
} catch (IOException e) {
_log.debug("Caught IOException", e);
return false;
}
return true;
}
/**
* Stop the SAM handler
*

View File

@ -19,7 +19,7 @@ public interface SAMRawReceiver {
* Send a byte array to a SAM client, without informations
* regarding the sender.
*
* @param data Byte array to be written
* @param data Byte array to be received
*/
public void receiveRawBytes(byte data[]) throws IOException;
@ -27,5 +27,5 @@ public interface SAMRawReceiver {
* Stop receiving data.
*
*/
public void stopReceiving();
public void stopRawReceiving();
}

View File

@ -49,7 +49,7 @@ public class SAMRawSession {
* @param recv Object that will receive incoming data
*/
public SAMRawSession(String dest, Properties props,
SAMRawReceiver recv) throws DataFormatException, I2PSessionException {
SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
ByteArrayInputStream bais;
bais = new ByteArrayInputStream(Base64.decode(dest));
@ -65,22 +65,31 @@ public class SAMRawSession {
* @param recv Object that will receive incoming data
*/
public SAMRawSession(InputStream destStream, Properties props,
SAMRawReceiver recv) throws I2PSessionException {
SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
initSAMRawSession(destStream, props, recv);
}
private void initSAMRawSession(InputStream destStream, Properties props,
SAMRawReceiver recv) throws I2PSessionException {
SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
this.recv = recv;
_log.debug("SAM RAW session instantiated");
handler = new SAMRawSessionHandler(destStream, props);
Thread t = new I2PThread(handler, "SAMRawSessionHandler");
Thread t = new I2PThread(handler, "SAMRawSessionHandler");
t.start();
}
/**
* Get the SAM RAW session Destination.
*
* @return The SAM RAW session Destination.
*/
public Destination getDestination() {
return session.getMyDestination();
}
/**
* Send bytes through a SAM RAW session.
*
@ -92,6 +101,10 @@ public class SAMRawSession {
Destination d = new Destination();
d.fromBase64(dest);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Sending " + data.length + " bytes to " + dest);
}
try {
return session.sendMessage(d, data);
} catch (I2PSessionException e) {
@ -158,17 +171,18 @@ public class SAMRawSession {
runningLock.wait();
} catch (InterruptedException ie) {}
}
_log.debug("Shutting down SAM RAW session handler");
}
recv.stopReceiving();
try {
_log.debug("Destroying I2P session...");
session.destroySession();
_log.debug("I2P session destroyed");
} catch (I2PSessionException e) {
_log.debug("Shutting down SAM RAW session handler");
recv.stopRawReceiving();
try {
_log.debug("Destroying I2P session...");
session.destroySession();
_log.debug("I2P session destroyed");
} catch (I2PSessionException e) {
_log.error("Error destroying I2P session", e);
}
}
}

View File

@ -0,0 +1,51 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.IOException;
import net.i2p.data.Destination;
/**
* Interface for sending streaming data to a SAM client
*/
public interface SAMStreamReceiver {
/**
* Notify about a new incoming connection
*
* @param id New connection id
*/
public void notifyStreamConnection(int id, Destination dest) throws IOException;
/**
* Send a byte array to a SAM client.
*
* @param id Connection id
* @param data Byte array to be received
* @param len Number of bytes in data
*/
public void receiveStreamBytes(int id, byte data[], int len) throws IOException;
/**
* Notify that a connection has been closed
* FIXME: this interface should be cleaner
*
* @param id Connection id
* @param result Disconnection reason ("OK" or something else)
* @param msg Error message, if any
*/
public void notifyStreamDisconnection(int id, String result, String msg) throws IOException;
/**
* Stop receiving data.
*
*/
public void stopStreamReceiving();
}

View File

@ -0,0 +1,488 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.util.HashMap;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.client.streaming.I2PSocketManagerFactory;
import net.i2p.client.streaming.I2PSocketOptions;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.I2PException;
import net.i2p.util.HexDump;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* SAM STREAM session class.
*
* @author human
*/
public class SAMStreamSession {
private final static Log _log = new Log(SAMStreamSession.class);
private final static int SOCKET_HANDLER_BUF_SIZE = 32768;
private SAMStreamReceiver recv = null;
private SAMStreamSessionServer server = null;
private I2PSocketManager socketMgr = null;
private Object handlersMapLock = new Object();
private HashMap handlersMap = new HashMap();
private Object idLock = new Object();
private int lastNegativeId = 0;
/**
* Create a new SAM STREAM session.
*
* @param dest Base64-encoded destination (private key)
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
*/
public SAMStreamSession(String dest, Properties props,
SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
ByteArrayInputStream bais;
bais = new ByteArrayInputStream(Base64.decode(dest));
initSAMStreamSession(bais, props, recv);
}
/**
* Create a new SAM STREAM session.
*
* @param destStream Input stream containing the destination keys
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
*/
public SAMStreamSession(InputStream destStream, Properties props,
SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException {
initSAMStreamSession(destStream, props, recv);
}
private void initSAMStreamSession(InputStream destStream, Properties props,
SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException{
this.recv = recv;
_log.debug("SAM STREAM session instantiated");
Properties allprops = new Properties();
allprops.putAll(System.getProperties());
allprops.putAll(props);
// FIXME: we should setup I2CP host and port, too
_log.debug("Creating I2PSocketManager...");
socketMgr = I2PSocketManagerFactory.createManager(destStream,
"127.0.0.1",
7654, allprops);
if (socketMgr == null) {
throw new SAMException("Error creating I2PSocketManager");
}
server = new SAMStreamSessionServer();
Thread t = new I2PThread(server, "SAMStreamSessionServer");
t.start();
}
/**
* Get the SAM STREAM session Destination.
*
* @return The SAM STREAM session Destination.
*/
public Destination getDestination() {
return socketMgr.getSession().getMyDestination();
}
/**
* Connect the SAM STREAM session to the specified Destination
*
* @param id Unique id for the connection
* @param dest Base64-encoded Destination to connect to
* @param props Options to be used for connection
*/
public boolean connect(int id, String dest, Properties props) throws I2PException, DataFormatException {
if (checkSocketHandlerId(id)) {
_log.debug("The specified id (" + id + ") is already in use");
return false;
}
Destination d = new Destination();
d.fromBase64(dest);
// FIXME: we should config I2PSocketOptions here
I2PSocketOptions opts = new I2PSocketOptions();
opts.setConnectTimeout(60 * 1000);
_log.debug("Connecting new I2PSocket...");
I2PSocket i2ps = socketMgr.connect(d, opts);
createSocketHandler(i2ps, id);
return true;
}
/**
* Send bytes through a SAM STREAM session.
*
* @param data Bytes to be sent
*
* @return True if the data was sent, false otherwise
*/
public boolean sendBytes(int id, byte[] data) {
Destination d = new Destination();
SAMStreamSessionSocketHandler handler = getSocketHandler(id);
if (handler == null) {
_log.error("Trying to send bytes through inexistent handler " +id);
return false;
}
return handler.sendBytes(data);
}
/**
* Close a SAM STREAM session.
*
*/
public void close() {
server.stopRunning();
removeAllSocketHandlers();
recv.stopStreamReceiving();
}
/**
* Close a connection managed by the SAM STREAM session.
*
* @param id Connection id
*/
public boolean closeConnection(int id) {
if (!checkSocketHandlerId(id)) {
_log.debug("The specified id (" + id + ") does not exist!");
return false;
}
removeSocketHandler(id);
return true;
}
/**
* Create a new SAM STREAM session socket handler, detaching its thread.
*
* @param s Socket to be handled
* @param id Socket id, or 0 if it must be auto-generated
*
* @return An id associated to the socket handler
*/
private int createSocketHandler(I2PSocket s, int id) {
SAMStreamSessionSocketHandler handler;
if (id == 0) {
id = createUniqueId();
}
try {
handler = new SAMStreamSessionSocketHandler(s, id);
} catch (IOException e) {
_log.error("IOException when creating SAM STREAM session socket handler", e);
recv.stopStreamReceiving();
return 0;
}
synchronized (handlersMapLock) {
handlersMap.put(new Integer(id), handler);
}
I2PThread t = new I2PThread(handler, "SAMStreamSessionSocketHandler");
t.start();
return id;
}
/* Create an unique id, either positive or negative */
private int createUniqueId() {
synchronized (idLock) {
return --lastNegativeId;
}
}
/**
* Get a SAM STREAM session socket handler.
*
* @param id Handler id
*/
private SAMStreamSessionSocketHandler getSocketHandler(int id) {
synchronized (handlersMapLock) {
return (SAMStreamSessionSocketHandler)handlersMap.get(new Integer(id));
}
}
/**
* Check whether a SAM STREAM session socket handler id is still in use.
*
* @param id Handler id
*/
private boolean checkSocketHandlerId(int id) {
synchronized (handlersMapLock) {
return (!(handlersMap.get(new Integer(id)) == null));
}
}
/**
* Remove and close a SAM STREAM session socket handler.
*
* @param id Handler id to be removed
*/
private void removeSocketHandler(int id) {
SAMStreamSessionSocketHandler removed;
synchronized (handlersMapLock) {
removed = (SAMStreamSessionSocketHandler)handlersMap.remove(new Integer(id));
}
if (removed == null) {
_log.error("BUG! Trying to remove inexistent SAM STREAM session socket handler " + id);
recv.stopStreamReceiving();
} else {
removed.stopRunning();
_log.debug("Removed SAM STREAM session socket handler " + id);
}
}
/**
* Remove and close all the socket handlers managed by this SAM
* STREAM session.
*
* @param id Handler id to be removed
*/
private void removeAllSocketHandlers() {
Integer id;
Set keySet;
Iterator iter;
synchronized (handlersMapLock) {
keySet = handlersMap.keySet();
iter = keySet.iterator();
while (iter.hasNext()) {
id = (Integer)iter.next();
((SAMStreamSessionSocketHandler)handlersMap.get(id)).stopRunning();
}
handlersMap.clear();
}
}
/**
* SAM STREAM session server, running in its own thread. It will
* wait for incoming connections from the I2P network.
*
* @author human
*/
public class SAMStreamSessionServer implements Runnable {
private Object runningLock = new Object();
private boolean stillRunning = true;
private I2PServerSocket serverSocket = null;
/**
* Create a new SAM STREAM session server
*
*/
public SAMStreamSessionServer() {
_log.debug("Instantiating new SAM STREAM session server");
serverSocket = socketMgr.getServerSocket();
}
/**
* Stop a SAM STREAM session server
*
*/
public void stopRunning() {
_log.debug("SAMStreamSessionServer.stopRunning() invoked");
synchronized (runningLock) {
if (stillRunning) {
stillRunning = false;
try {
serverSocket.close();
} catch (I2PException e) {
_log.error("I2PException caught", e);
}
}
}
}
public void run() {
_log.debug("SAM STREAM session server running");
I2PSocket i2ps;
while (stillRunning) {
try {
i2ps = serverSocket.accept();
_log.debug("New incoming connection");
int id = createSocketHandler(i2ps, 0);
if (id == 0) {
_log.error("SAM STREAM session handler not created!");
i2ps.close();
continue;
}
_log.debug("New connection id: " + id);
recv.notifyStreamConnection(id, i2ps.getPeerDestination());
} catch (I2PException e) {
_log.debug("Caught I2PException", e);
break;
} catch (IOException e) {
_log.debug("Caught IOException", e);
break;
}
}
try {
serverSocket.close(); // In case it wasn't closed, yet
} catch (I2PException e) {
_log.debug("Caught I2PException", e);
}
_log.debug("Shutting down SAM STREAM session server");
}
}
/**
* SAM STREAM socket handler, running in its own thread. It forwards
* forward data to/from an I2P socket.
*
* @author human
*/
public class SAMStreamSessionSocketHandler implements Runnable {
private I2PSocket i2pSocket = null;
private OutputStream i2pSocketOS = null;
private Object runningLock = new Object();
private boolean stillRunning = true;
private int id;
/**
* Create a new SAM STREAM session socket handler
*
* @param s Socket to be handled
* @param id Unique id assigned to the handler
*/
public SAMStreamSessionSocketHandler(I2PSocket s, int id) throws IOException {
_log.debug("Instantiating new SAM STREAM session socket handler");
i2pSocket = s;
i2pSocketOS = s.getOutputStream();
this.id = id;
}
/**
* Send bytes through the SAM STREAM session socket handler
*
* @param data Data to be sent
*
* @return True if data has been sent without errors, false otherwise
*/
public boolean sendBytes(byte[] data) {
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Handler " + id + ": sending " + data.length
+ " bytes");
}
try {
i2pSocketOS.write(data);
} catch (IOException e) {
_log.error("Error sending data through I2P socket", e);
return false;
}
return true;
}
/**
* Stop a SAM STREAM session socket handler
*
*/
public void stopRunning() {
_log.debug("stopRunning() invoked on socket handler " + id);
synchronized (runningLock) {
if (stillRunning) {
stillRunning = false;
try {
i2pSocket.close();
} catch (IOException e) {
_log.debug("Caught IOException", e);
}
}
}
}
public void run() {
_log.debug("SAM STREAM session socket handler running");
int read = -1;
byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE];
try {
InputStream in = i2pSocket.getInputStream();
while (stillRunning) {
read = in.read(data);
if (read == -1) {
_log.debug("Handler " + id + ": connection closed");
break;
}
recv.receiveStreamBytes(id, data, read);
}
} catch (IOException e) {
_log.debug("Caught IOException", e);
}
try {
i2pSocket.close();
} catch (IOException e) {
_log.debug("Caught IOException", e);
}
if (stillRunning) {
removeSocketHandler(id);
// FIXME: we need error reporting here!
try {
recv.notifyStreamDisconnection(id, "OK", null);
} catch (IOException e) {
_log.debug("Error sending disconnection notice for handler "
+ id, e);
}
}
_log.debug("Shutting down SAM STREAM session socket handler " +id);
}
}
}

View File

@ -8,6 +8,7 @@ package net.i2p.sam;
*
*/
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Enumeration;
@ -56,6 +57,28 @@ public class SAMUtils {
}
}
/**
* Get the Base64 representation of a Destination public key
*
* @param d A Destination
*
* @return A String representing the Destination public key
*/
public static String getBase64DestinationPubKey(Destination d) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
d.writeBytes(baos);
return Base64.encode(baos.toByteArray());
} catch (IOException e) {
_log.error("getDestinationPubKey(): caught IOException", e);
return null;
} catch (DataFormatException e) {
_log.error("getDestinationPubKey(): caught DataFormatException",e);
return null;
}
}
/**
* Check whether a base64-encoded dest is valid
*
@ -106,10 +129,10 @@ public class SAMUtils {
*
* @param tok A StringTokenizer pointing to the SAM parameters
*
* @return A Properties object with the parsed SAM parameters
* @return Properties with the parsed SAM params, or null if none is found
*/
public static Properties parseParams(StringTokenizer tok) {
int pos, ntoks = tok.countTokens();
int pos, nprops = 0, ntoks = tok.countTokens();
String token, param, value;
Properties props = new Properties();
@ -125,13 +148,18 @@ public class SAMUtils {
value = token.substring(pos + 1);
props.setProperty(param, value);
nprops += 1;
}
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Parsed properties: " + dumpProperties(props));
}
return props;
if (nprops != 0) {
return props;
} else {
return null;
}
}
/* Dump a Properties object in an human-readable form */

View File

@ -25,6 +25,7 @@ import net.i2p.client.I2PSessionException;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.I2PException;
import net.i2p.util.Log;
/**
@ -32,7 +33,7 @@ import net.i2p.util.Log;
*
* @author human
*/
public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStreamReceiver {
private final static Log _log = new Log(SAMv1Handler.class);
@ -40,7 +41,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
private SAMRawSession rawSession = null;
private SAMRawSession datagramSession = null;
private SAMRawSession streamSession = null;
private SAMStreamSession streamSession = null;
/**
* Create a new SAM version 1 handler. This constructor expects
@ -69,6 +70,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
boolean canContinue = false;
ByteArrayOutputStream buf = new ByteArrayOutputStream(IN_BUFSIZE);
StringTokenizer tok;
Properties props;
this.thread.setName("SAMv1Handler");
_log.debug("SAM handling started");
@ -108,17 +110,22 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
}
domain = tok.nextToken();
opcode = tok.nextToken();
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Parsing (domain: \"" + domain
+ "\"; opcode: \"" + opcode + "\")");
}
props = SAMUtils.parseParams(tok);
_log.debug("Parsing (domain: \"" + domain + "\"; opcode: \""
+ opcode + "\")");
if (domain.equals("RAW")) {
canContinue = execRawMessage(opcode, tok);
if (domain.equals("STREAM")) {
canContinue = execStreamMessage(opcode, props);
} else if (domain.equals("RAW")) {
canContinue = execRawMessage(opcode, props);
} else if (domain.equals("SESSION")) {
canContinue = execSessionMessage(opcode, tok);
canContinue = execSessionMessage(opcode, props);
} else if (domain.equals("DEST")) {
canContinue = execDestMessage(opcode, tok);
canContinue = execDestMessage(opcode, props);
} else if (domain.equals("NAMING")) {
canContinue = execNamingMessage(opcode, tok);
canContinue = execNamingMessage(opcode, props);
} else {
_log.debug("Unrecognized message domain: \""
+ domain + "\"");
@ -157,152 +164,137 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
}
/* Parse and execute a SESSION message */
private boolean execSessionMessage(String opcode, StringTokenizer tok) {
Properties props = null;
private boolean execSessionMessage(String opcode, Properties props) {
if (opcode.equals("CREATE")) {
String dest = "BUG!";
if ((rawSession != null) || (datagramSession != null)
|| (streamSession != null)) {
_log.debug("Trying to create a session, but one still exists");
return false;
}
props = SAMUtils.parseParams(tok);
if (props == null) {
return false;
}
String dest = props.getProperty("DESTINATION");
if (dest == null) {
_log.debug("SESSION DESTINATION parameter not specified");
return false;
}
props.remove("DESTINATION");
String style = props.getProperty("STYLE");
if (style == null) {
_log.debug("SESSION STYLE parameter not specified");
return false;
}
props.remove("STYLE");
try {
try{
if (opcode.equals("CREATE")) {
if ((rawSession != null) || (datagramSession != null)
|| (streamSession != null)) {
_log.debug("Trying to create a session, but one still exists");
return false;
}
if (props == null) {
_log.debug("No parameters specified in SESSION CREATE message");
return false;
}
dest = props.getProperty("DESTINATION");
if (dest == null) {
_log.debug("SESSION DESTINATION parameter not specified");
return false;
}
props.remove("DESTINATION");
if (dest.equals("TRANSIENT")) {
_log.debug("TRANSIENT destination requested");
ByteArrayOutputStream priv = new ByteArrayOutputStream();
SAMUtils.genRandomKey(priv, null);
dest = Base64.encode(priv.toByteArray());
}
String style = props.getProperty("STYLE");
if (style == null) {
_log.debug("SESSION STYLE parameter not specified");
return false;
}
props.remove("STYLE");
if (style.equals("RAW")) {
try {
if (dest.equals("TRANSIENT")) {
_log.debug("TRANSIENT destination requested");
ByteArrayOutputStream priv = new ByteArrayOutputStream();
SAMUtils.genRandomKey(priv, null);
dest = Base64.encode(priv.toByteArray());
}
rawSession = new SAMRawSession (dest, props, this);
writeBytes(("SESSION STATUS RESULT=OK DESTINATION=" + dest + "\n").getBytes("ISO-8859-1"));
} catch (DataFormatException e) {
_log.debug("Invalid destination specified");
writeBytes(("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + "\n").getBytes("ISO-8859-1"));
return true;
} catch (I2PSessionException e) {
_log.debug("I2P error when instantiating RAW session", e);
writeBytes(("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n").getBytes("ISO-8859-1"));
return true;
}
rawSession = new SAMRawSession(dest, props, this);
} else if (style.equals("STREAM")) {
streamSession = new SAMStreamSession(dest, props, this);
} else {
_log.debug("Unrecognized SESSION STYLE: \"" + style + "\"");
return false;
}
} catch (UnsupportedEncodingException e) {
_log.error("Caught UnsupportedEncodingException ("
+ e.getMessage() + ")");
return false;
} catch (IOException e) {
_log.error("Caught IOException while parsing SESSION message ("
+ e.getMessage() + ")");
return writeString("SESSION STATUS RESULT=OK DESTINATION="
+ dest + "\n");
} else {
_log.debug("Unrecognized SESSION message opcode: \""
+ opcode + "\"");
return false;
}
return true;
} else {
_log.debug("Unrecognized SESSION message opcode: \""
+ opcode + "\"");
return false;
} catch (DataFormatException e) {
_log.debug("Invalid destination specified");
return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + "\n");
} catch (I2PSessionException e) {
_log.debug("I2P error when instantiating session", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n");
} catch (SAMException e) {
_log.error("Unexpected SAM error", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n");
} catch (IOException e) {
_log.error("Unexpected IOException", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + "\n");
}
}
/* Parse and execute a DEST message*/
private boolean execDestMessage(String opcode, StringTokenizer tok) {
private boolean execDestMessage(String opcode, Properties props) {
if (opcode.equals("GENERATE")) {
if (tok.countTokens() > 0) {
_log.debug("Bad format in DEST GENERATE message");
if (props != null) {
_log.debug("Properties specified in DEST GENERATE message");
return false;
}
try {
ByteArrayOutputStream priv = new ByteArrayOutputStream();
ByteArrayOutputStream pub = new ByteArrayOutputStream();
SAMUtils.genRandomKey(priv, pub);
writeBytes(("DEST REPLY"
+ " PUB="
+ Base64.encode(pub.toByteArray())
+ " PRIV="
+ Base64.encode(priv.toByteArray())
+ "\n").getBytes("ISO-8859-1"));
} catch (UnsupportedEncodingException e) {
_log.error("Caught UnsupportedEncodingException ("
+ e.getMessage() + ")");
return false;
} catch (IOException e) {
_log.debug("IOException while executing DEST message", e);
return false;
}
ByteArrayOutputStream priv = new ByteArrayOutputStream();
ByteArrayOutputStream pub = new ByteArrayOutputStream();
SAMUtils.genRandomKey(priv, pub);
return writeString("DEST REPLY"
+ " PUB="
+ Base64.encode(pub.toByteArray())
+ " PRIV="
+ Base64.encode(priv.toByteArray())
+ "\n");
} else {
_log.debug("Unrecognized DEST message opcode: \"" + opcode + "\"");
return false;
}
return true;
}
/* Parse and execute a NAMING message */
private boolean execNamingMessage(String opcode, StringTokenizer tok) {
Properties props = null;
private boolean execNamingMessage(String opcode, Properties props) {
if (opcode.equals("LOOKUP")) {
props = SAMUtils.parseParams(tok);
if (props == null) {
_log.debug("No parameters specified in NAMING LOOKUP message");
return false;
}
String name = props.getProperty("NAME");
if (name == null) {
_log.debug("Name to resolve not specified");
_log.debug("Name to resolve not specified in NAMING message");
return false;
}
try {
ByteArrayOutputStream pubKey = new ByteArrayOutputStream();
Destination dest = SAMUtils.lookupHost(name, pubKey);
if (dest == null) {
writeBytes("NAMING REPLY RESULT=KEY_NOT_FOUND\n".getBytes("ISP-8859-1"));
return true;
Destination dest;
if (name.equals("ME")) {
if (rawSession != null) {
dest = rawSession.getDestination();
} else if (streamSession != null) {
dest = streamSession.getDestination();
} else if (datagramSession != null) {
dest = datagramSession.getDestination();
} else {
_log.debug("Lookup for SESSION destination, but session is null");
return false;
}
writeBytes(("NAMING REPLY RESULT=OK NAME=" + name
+ " VALUE=" + Base64.encode(pubKey.toByteArray())
+ "\n").getBytes("ISO-8859-1"));
return true;
} catch (UnsupportedEncodingException e) {
_log.error("Caught UnsupportedEncodingException ("
+ e.getMessage() + ")");
return false;
} catch (IOException e) {
_log.debug("Caught IOException while parsing NAMING message",
e);
return false;
} else {
dest = SAMUtils.lookupHost(name, null);
}
if (dest == null) {
return writeString("NAMING REPLY RESULT=KEY_NOT_FOUND\n");
}
return writeString("NAMING REPLY RESULT=OK NAME=" + name
+ " VALUE="
+ SAMUtils.getBase64DestinationPubKey(dest)
+ "\n");
} else {
_log.debug("Unrecognized NAMING message opcode: \""
+ opcode + "\"");
@ -310,24 +302,16 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
}
}
public String toString() {
return "SAM v1 handler (client: "
+ this.socket.getInetAddress().toString() + ":"
+ this.socket.getPort() + ")";
}
/* Parse and execute a RAW message */
private boolean execRawMessage(String opcode, StringTokenizer tok) {
Properties props = null;
private boolean execRawMessage(String opcode, Properties props) {
if (rawSession == null) {
_log.debug("RAW message received, but no RAW session exists");
return false;
}
if (opcode.equals("SEND")) {
props = SAMUtils.parseParams(tok);
if (props == null) {
_log.debug("No parameters specified in RAW SEND message");
return false;
}
@ -389,6 +373,158 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
}
}
/* Parse and execute a STREAM message */
private boolean execStreamMessage(String opcode, Properties props) {
if (streamSession == null) {
_log.debug("STREAM message received, but no STREAM session exists");
return false;
}
if (opcode.equals("SEND")) {
if (props == null) {
_log.debug("No parameters specified in STREAM SEND message");
return false;
}
int id;
{
String strid = props.getProperty("ID");
if (strid == null) {
_log.debug("ID not specified in STREAM SEND message");
return false;
}
try {
id = Integer.parseInt(strid);
} catch (NumberFormatException e) {
_log.debug("Invalid STREAM SEND ID specified: " + strid);
return false;
}
}
int size;
{
String strsize = props.getProperty("SIZE");
if (strsize == null) {
_log.debug("Size not specified in STREAM SEND message");
return false;
}
try {
size = Integer.parseInt(strsize);
} catch (NumberFormatException e) {
_log.debug("Invalid STREAM SEND size specified: "+strsize);
return false;
}
if (!checkSize(size)) {
_log.debug("Specified size (" + size
+ ") is out of protocol limits");
return false;
}
}
try {
DataInputStream in = new DataInputStream(socket.getInputStream());
byte[] data = new byte[size];
in.readFully(data);
if (!streamSession.sendBytes(id, data)) {
_log.error("STREAM SEND failed");
return false;
}
return true;
} catch (EOFException e) {
_log.debug("Too few bytes with RAW SEND message (expected: "
+ size);
return false;
} catch (IOException e) {
_log.debug("Caught IOException while parsing RAW SEND message",
e);
return false;
}
} else if (opcode.equals("CONNECT")) {
if (props == null) {
_log.debug("No parameters specified in STREAM CONNECT message");
return false;
}
int id;
{
String strid = props.getProperty("ID");
if (strid == null) {
_log.debug("ID not specified in STREAM SEND message");
return false;
}
try {
id = Integer.parseInt(strid);
} catch (NumberFormatException e) {
_log.debug("Invalid STREAM CONNECT ID specified: " +strid);
return false;
}
if (id < 1) {
_log.debug("Invalid STREAM CONNECT ID specified: " +strid);
return false;
}
props.remove("ID");
}
String dest = props.getProperty("DESTINATION");
if (dest == null) {
_log.debug("Destination not specified in RAW SEND message");
return false;
}
props.remove("DESTINATION");
try {
if (!streamSession.connect(id, dest, props)) {
_log.debug("STREAM connection failed");
return false;
}
return writeString("STREAM STATUS RESULT=OK ID=" + id + "\n");
} catch (DataFormatException e) {
_log.debug("Invalid destination in STREAM CONNECT message");
return writeString("STREAM STATUS RESULT=INVALID_KEY ID="
+ id + "\n");
} catch (I2PException e) {
_log.debug("STREAM CONNECT failed: " + e.getMessage());
return writeString("STREAM STATUS RESULT=I2P_ERROR ID="
+ id + "\n");
}
} else if (opcode.equals("CLOSE")) {
if (props == null) {
_log.debug("No parameters specified in STREAM CLOSE message");
return false;
}
int id;
{
String strid = props.getProperty("ID");
if (strid == null) {
_log.debug("ID not specified in STREAM CLOSE message");
return false;
}
try {
id = Integer.parseInt(strid);
} catch (NumberFormatException e) {
_log.debug("Invalid STREAM CLOSE ID specified: " +strid);
return false;
}
}
return streamSession.closeConnection(id);
} else {
_log.debug("Unrecognized RAW message opcode: \""
+ opcode + "\"");
return false;
}
}
public String toString() {
return "SAM v1 handler (client: "
+ this.socket.getInetAddress().toString() + ":"
+ this.socket.getPort() + ")";
}
/* Check whether a size is inside the limits allowed by this protocol */
private boolean checkSize(int size) {
return ((size >= 1) && (size <= 32768));
@ -397,20 +533,85 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver {
// SAMRawReceiver implementation
public void receiveRawBytes(byte data[]) throws IOException {
if (rawSession == null) {
_log.error("BUG! Trying to write raw bytes, but session is null!");
_log.error("BUG! Received raw bytes, but session is null!");
throw new NullPointerException("BUG! RAW session is null!");
}
ByteArrayOutputStream msg = new ByteArrayOutputStream();
msg.write(("RAW RECEIVED SIZE=" + data.length + "\n").getBytes());
msg.write(("RAW RECEIVED SIZE=" + data.length
+ "\n").getBytes("ISO-8859-1"));
msg.write(data);
writeBytes(msg.toByteArray());
}
public void stopReceiving() {
_log.debug("stopReceiving() invoked");
public void stopRawReceiving() {
_log.debug("stopRawReceiving() invoked");
if (rawSession == null) {
_log.error("BUG! Got raw receiving stop, but session is null!");
throw new NullPointerException("BUG! RAW session is null!");
}
try {
this.socket.close();
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
}
}
// SAMStreamReceiver implementation
public void notifyStreamConnection(int id, Destination d) throws IOException {
if (streamSession == null) {
_log.error("BUG! Received stream connection, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
}
if (!writeString("STREAM CONNECTED DESTINATION="
+ SAMUtils.getBase64DestinationPubKey(d)
+ " ID=" + id + "\n")) {
throw new IOException("Error notifying connection to SAM client");
}
}
public void receiveStreamBytes(int id, byte data[], int len) throws IOException {
if (streamSession == null) {
_log.error("Received stream bytes, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
}
ByteArrayOutputStream msg = new ByteArrayOutputStream();
msg.write(("STREAM RECEIVED ID=" + id
+" SIZE=" + len + "\n").getBytes("ISO-8859-1"));
msg.write(data);
writeBytes(msg.toByteArray());
}
public void notifyStreamDisconnection(int id, String result, String msg) throws IOException {
if (streamSession == null) {
_log.error("BUG! Received stream disconnection, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
}
// FIXME: msg should be escaped!
if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result
+ (msg == null ? "" : (" MESSAGE=" + msg))
+ "\n")) {
throw new IOException("Error notifying disconnection to SAM client");
}
}
public void stopStreamReceiving() {
_log.debug("stopStreamReceiving() invoked");
if (streamSession == null) {
_log.error("BUG! Got stream receiving stop, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
}
try {
this.socket.close();
} catch (IOException e) {