* Added DATAGRAM supprt;

* refactoring of SAMRawSession.java, to make it derive from
  SAMMessageSession.java (parent class for SAMDatagramSession.java, too);
* removed unuseful cruft;
* some fixes;
* M-x untabify.
(human)
This commit is contained in:
human
2004-04-17 23:19:20 +00:00
committed by zzz
parent ccb309fd92
commit 3295c18829
8 changed files with 673 additions and 302 deletions

View File

@ -0,0 +1,32 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.IOException;
import net.i2p.data.Destination;
/**
* Interface for sending raw data to a SAM client
*/
public interface SAMDatagramReceiver {
/**
* Send a byte array to a SAM client.
*
* @param data Byte array to be received
*/
public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException;
/**
* Stop receiving data.
*
*/
public void stopDatagramReceiving();
}

View File

@ -0,0 +1,111 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.util.Properties;
import net.i2p.client.datagram.I2PDatagramDissector;
import net.i2p.client.datagram.I2PDatagramMaker;
import net.i2p.client.datagram.I2PInvalidDatagramException;
import net.i2p.client.I2PSessionException;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.Log;
/**
* SAM DATAGRAM session class.
*
* @author human
*/
public class SAMDatagramSession extends SAMMessageSession {
private final static Log _log = new Log(SAMDatagramSession.class);
private SAMDatagramReceiver recv = null;
private I2PDatagramMaker dgramMaker;
private I2PDatagramDissector dgramDissector = new I2PDatagramDissector();
/**
* Create a new SAM DATAGRAM session.
*
* @param dest Base64-encoded destination (private key)
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
*/
public SAMDatagramSession(String dest, Properties props,
SAMDatagramReceiver recv) throws IOException, DataFormatException, I2PSessionException {
super(dest, props);
this.recv = recv;
dgramMaker = new I2PDatagramMaker(getI2PSession());
}
/**
* Create a new SAM DATAGRAM session.
*
* @param destStream Input stream containing the destination keys
* @param props Properties to setup the I2P session
* @param recv Object that will receive incoming data
*/
public SAMDatagramSession(InputStream destStream, Properties props,
SAMDatagramReceiver recv) throws IOException, DataFormatException, I2PSessionException {
super(destStream, props);
this.recv = recv;
dgramMaker = new I2PDatagramMaker(getI2PSession());
}
/**
* Send bytes through a SAM DATAGRAM session.
*
* @param data Bytes to be sent
*
* @return True if the data was sent, false otherwise
*/
public boolean sendBytes(String dest, byte[] data) throws DataFormatException {
byte[] dgram = dgramMaker.makeI2PDatagram(data);
return sendBytesThroughMessageSession(dest, dgram);
}
protected void messageReceived(byte[] msg) {
byte[] payload;
Destination sender;
try {
dgramDissector.loadI2PDatagram(msg);
sender = dgramDissector.getSender();
payload = dgramDissector.extractPayload();
} catch (DataFormatException e) {
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Dropping ill-formatted I2P repliable datagram");
}
return;
} catch (I2PInvalidDatagramException e) {
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Dropping ill-signed I2P repliable datagram");
}
return;
}
try {
recv.receiveDatagramBytes(sender, payload);
} catch (IOException e) {
_log.error("Error forwarding message to receiver", e);
close();
}
}
protected void shutDown() {
recv.stopDatagramReceiving();
}
}

View File

@ -9,6 +9,7 @@ package net.i2p.sam;
*/
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
@ -29,20 +30,36 @@ public abstract class SAMHandler implements Runnable {
protected I2PThread thread = null;
private Object socketWLock = new Object(); // Guards writings on socket
private Socket socket = null;
private OutputStream socketOS = null; // Stream associated to socket
protected Socket socket = null;
protected int verMajor = 0;
protected int verMinor = 0;
private boolean stopHandler = false;
private Object stopLock = new Object();
private boolean stopHandler = false;
/**
* SAMHandler constructor (to be called by subclasses)
*
* @param s Socket attached to a SAM client
* @param verMajor SAM major version to manage
* @param verMinor SAM minor version to manage
*/
protected SAMHandler(Socket s,
int verMajor, int verMinor) throws IOException {
socket = s;
socketOS = socket.getOutputStream();
this.verMajor = verMajor;
this.verMinor = verMinor;
}
/**
* Start handling the SAM connection, detaching an handling thread.
*
*/
public void startHandling() {
public final void startHandling() {
thread = new I2PThread(this, "SAMHandler");
thread.start();
}
@ -53,6 +70,14 @@ public abstract class SAMHandler implements Runnable {
*/
protected abstract void handle();
/**
* Get the input stream of the socket connected to the SAM client
*
*/
protected final InputStream getClientSocketInputStream() throws IOException {
return socket.getInputStream();
}
/**
* Write a byte array on the handler's socket. This method must
* always be used when writing data, unless you really know what
@ -60,11 +85,8 @@ public abstract class SAMHandler implements Runnable {
*
* @param data A byte array to be written
*/
protected void writeBytes(byte[] data) throws IOException {
protected final void writeBytes(byte[] data) throws IOException {
synchronized (socketWLock) {
if (socketOS == null) {
socketOS = socket.getOutputStream();
}
socketOS.write(data);
socketOS.flush();
}
@ -79,7 +101,7 @@ public abstract class SAMHandler implements Runnable {
*
* @return True is the string was successfully written, false otherwise
*/
protected boolean writeString(String str) {
protected final boolean writeString(String str) {
try {
writeBytes(str.getBytes("ISO-8859-1"));
} catch (IOException e) {
@ -90,11 +112,19 @@ public abstract class SAMHandler implements Runnable {
return true;
}
/**
* Close the socket connected to the SAM client.
*
*/
protected final void closeClientSocket() throws IOException {
socket.close();
}
/**
* Stop the SAM handler
*
*/
public void stopHandling() {
public final void stopHandling() {
synchronized (stopLock) {
stopHandler = true;
}
@ -105,7 +135,7 @@ public abstract class SAMHandler implements Runnable {
*
* @return True if the handler should be stopped, false otherwise
*/
protected boolean shouldStop() {
protected final boolean shouldStop() {
synchronized (stopLock) {
return stopHandler;
}
@ -116,7 +146,13 @@ public abstract class SAMHandler implements Runnable {
*
* @return A String describing the handler;
*/
public abstract String toString();
public final String toString() {
return ("SAM handler (class: " + this.getClass().getName()
+ "; SAM version: " + verMajor + "." + verMinor
+ "; client: "
+ this.socket.getInetAddress().toString() + ":"
+ this.socket.getPort() + ")");
}
public final void run() {
handle();

View File

@ -36,18 +36,23 @@ public class SAMHandlerFactory {
*/
public static SAMHandler createSAMHandler(Socket s) throws SAMException {
BufferedReader br;
String line;
StringTokenizer tok;
try {
br = new BufferedReader(new InputStreamReader(s.getInputStream(),
"ISO-8859-1"));
tok = new StringTokenizer(br.readLine(), " ");
line = br.readLine();
if (line == null) {
_log.debug("Connection closed by client");
return null;
}
tok = new StringTokenizer(line, " ");
} catch (IOException e) {
throw new SAMException("Error reading from socket: "
+ e.getMessage());
} catch (Exception e) {
throw new SAMException("Unexpected error: "
+ e.getMessage());
throw new SAMException("Unexpected error: " + e.getMessage());
}
// Message format: HELLO VERSION MIN=v1 MAX=v2
@ -118,15 +123,20 @@ public class SAMHandlerFactory {
int verMajor = getMajor(ver);
int verMinor = getMinor(ver);
SAMHandler handler;
try {
switch (verMajor) {
case 1:
handler = new SAMv1Handler(s, verMajor, verMinor);
break;
default:
_log.error("BUG! Trying to initialize the wrong SAM version!");
throw new SAMException("BUG triggered! (handler instantiation)");
throw new SAMException("BUG! (in handler instantiation)");
}
} catch (IOException e) {
_log.error("IOException caught during SAM handler instantiation");
return null;
}
return handler;
}

View File

@ -0,0 +1,246 @@
package net.i2p.sam;
/*
* free (adj.): unencumbered; not under the control of others
* Written by human in 2004 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.util.Properties;
import net.i2p.client.I2PClient;
import net.i2p.client.I2PClientFactory;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionListener;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.HexDump;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Base abstract class for SAM message-based sessions.
*
* @author human
*/
public abstract class SAMMessageSession {
private final static Log _log = new Log(SAMMessageSession.class);
private I2PSession session = null;
private SAMMessageSessionHandler handler = null;
/**
* Initialize a new SAM message-based session.
*
* @param dest Base64-encoded destination (private key)
* @param props Properties to setup the I2P session
*/
protected SAMMessageSession(String dest, Properties props) throws IOException, DataFormatException, I2PSessionException {
ByteArrayInputStream bais;
bais = new ByteArrayInputStream(Base64.decode(dest));
initSAMMessageSession(bais, props);
}
/**
* Initialize a new SAM message-based session.
*
* @param destStream Input stream containing the destination keys
* @param props Properties to setup the I2P session
*/
protected SAMMessageSession(InputStream destStream, Properties props) throws IOException, DataFormatException, I2PSessionException {
initSAMMessageSession(destStream, props);
}
private void initSAMMessageSession (InputStream destStream, Properties props) throws IOException, DataFormatException, I2PSessionException {
_log.debug("Initializing SAM message-based session");
handler = new SAMMessageSessionHandler(destStream, props);
Thread t = new I2PThread(handler, "SAMMessageSessionHandler");
t.start();
}
/**
* Get the SAM message-based session Destination.
*
* @return The SAM message-based session Destination.
*/
public Destination getDestination() {
return session.getMyDestination();
}
/**
* Send bytes through a SAM message-based session.
*
* @param data Bytes to be sent
*
* @return True if the data was sent, false otherwise
*/
public abstract boolean sendBytes(String dest, byte[] data) throws DataFormatException;
/**
* Actually send bytes through the SAM message-based session I2PSession
* (er...).
*
* @param data Bytes to be sent
*
* @return True if the data was sent, false otherwise
*/
protected boolean sendBytesThroughMessageSession(String dest, byte[] data) throws DataFormatException {
Destination d = new Destination();
d.fromBase64(dest);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Sending " + data.length + " bytes to " + dest);
}
try {
return session.sendMessage(d, data);
} catch (I2PSessionException e) {
_log.error("I2PSessionException while sending data", e);
return false;
}
}
/**
* Close a SAM message-based session.
*
*/
public void close() {
handler.stopRunning();
}
/**
* Handle a new received message
*/
protected abstract void messageReceived(byte[] msg);
/**
* Do whatever is needed to shutdown the SAM session
*/
protected abstract void shutDown();
/**
* Get the I2PSession object used by the SAM message-based session.
*
* @return The I2PSession of the SAM message-based session
*/
protected I2PSession getI2PSession() {
return session;
}
/**
* SAM message-based session handler, running in its own thread
*
* @author human
*/
public class SAMMessageSessionHandler implements Runnable, I2PSessionListener {
private Object runningLock = new Object();
private boolean stillRunning = true;
/**
* Create a new SAM message-based session handler
*
* @param destStream Input stream containing the destination keys
* @param props Properties to setup the I2P session
*/
public SAMMessageSessionHandler(InputStream destStream, Properties props) throws I2PSessionException {
_log.debug("Instantiating new SAM message-based session handler");
I2PClient client = I2PClientFactory.createClient();
session = client.createSession(destStream, props);
_log.debug("Connecting I2P session...");
session.connect();
_log.debug("I2P session connected");
session.setSessionListener(this);
}
/**
* Stop a SAM message-based session handling thread
*
*/
public final void stopRunning() {
synchronized (runningLock) {
stillRunning = false;
runningLock.notify();
}
}
public void run() {
_log.debug("SAM message-based session handler running");
synchronized (runningLock) {
while (stillRunning) {
try {
runningLock.wait();
} catch (InterruptedException ie) {}
}
}
_log.debug("Shutting down SAM message-based session handler");
shutDown();
try {
_log.debug("Destroying I2P session...");
session.destroySession();
_log.debug("I2P session destroyed");
} catch (I2PSessionException e) {
_log.error("Error destroying I2P session", e);
}
}
public void disconnected(I2PSession session) {
_log.debug("I2P session disconnected");
stopRunning();
}
public void errorOccurred(I2PSession session, String message,
Throwable error) {
_log.debug("I2P error: " + message, error);
stopRunning();
}
public void messageAvailable(I2PSession session, int msgId, long size){
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("I2P message available (id: " + msgId
+ "; size: " + size + ")");
}
try {
byte msg[] = session.receiveMessage(msgId);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Content of message " + msgId + ":\n"
+ HexDump.dump(msg));
}
messageReceived(msg);
} catch (I2PSessionException e) {
_log.error("Error fetching I2P message", e);
stopRunning();
}
}
public void reportAbuse(I2PSession session, int severity) {
_log.warn("Abuse reported (severity: " + severity + ")");
stopRunning();
}
}
}

View File

@ -14,16 +14,8 @@ import java.io.InputStream;
import java.io.IOException;
import java.util.Properties;
import net.i2p.client.I2PClient;
import net.i2p.client.I2PClientFactory;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionListener;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
import net.i2p.util.HexDump;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
@ -31,16 +23,11 @@ import net.i2p.util.Log;
*
* @author human
*/
public class SAMRawSession {
public class SAMRawSession extends SAMMessageSession {
private final static Log _log = new Log(SAMRawSession.class);
private I2PSession session = null;
private SAMRawReceiver recv = null;
private SAMRawSessionHandler handler = null;
/**
* Create a new SAM RAW session.
*
@ -50,11 +37,9 @@ public class SAMRawSession {
*/
public SAMRawSession(String dest, Properties props,
SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
ByteArrayInputStream bais;
super(dest, props);
bais = new ByteArrayInputStream(Base64.decode(dest));
initSAMRawSession(bais, props, recv);
this.recv = recv;
}
/**
@ -66,28 +51,9 @@ public class SAMRawSession {
*/
public SAMRawSession(InputStream destStream, Properties props,
SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
initSAMRawSession(destStream, props, recv);
}
super(destStream, props);
private void initSAMRawSession(InputStream destStream, Properties props,
SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException {
this.recv = recv;
_log.debug("SAM RAW session instantiated");
handler = new SAMRawSessionHandler(destStream, props);
Thread t = new I2PThread(handler, "SAMRawSessionHandler");
t.start();
}
/**
* Get the SAM RAW session Destination.
*
* @return The SAM RAW session Destination.
*/
public Destination getDestination() {
return session.getMyDestination();
}
/**
@ -98,130 +64,19 @@ public class SAMRawSession {
* @return True if the data was sent, false otherwise
*/
public boolean sendBytes(String dest, byte[] data) throws DataFormatException {
Destination d = new Destination();
d.fromBase64(dest);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Sending " + data.length + " bytes to " + dest);
return sendBytesThroughMessageSession(dest, data);
}
protected void messageReceived(byte[] msg) {
try {
return session.sendMessage(d, data);
} catch (I2PSessionException e) {
_log.error("I2PSessionException while sending data", e);
return false;
}
}
/**
* Close a SAM RAW session.
*
*/
public void close() {
handler.stopRunning();
}
/**
* SAM RAW session handler, running in its own thread
*
* @author human
*/
public class SAMRawSessionHandler implements Runnable, I2PSessionListener {
private Object runningLock = new Object();
private boolean stillRunning = true;
/**
* Create a new SAM RAW session handler
*
* @param destStream Input stream containing the destination keys
* @param props Properties to setup the I2P session
*/
public SAMRawSessionHandler(InputStream destStream, Properties props) throws I2PSessionException {
_log.debug("Instantiating new SAM RAW session handler");
I2PClient client = I2PClientFactory.createClient();
session = client.createSession(destStream, props);
_log.debug("Connecting I2P session...");
session.connect();
_log.debug("I2P session connected");
session.setSessionListener(this);
}
/**
* Stop a SAM RAW session handling thread
*
*/
public void stopRunning() {
synchronized (runningLock) {
stillRunning = false;
runningLock.notify();
}
}
public void run() {
_log.debug("SAM RAW session handler running");
synchronized (runningLock) {
while (stillRunning) {
try {
runningLock.wait();
} catch (InterruptedException ie) {}
}
}
_log.debug("Shutting down SAM RAW session handler");
recv.stopRawReceiving();
try {
_log.debug("Destroying I2P session...");
session.destroySession();
_log.debug("I2P session destroyed");
} catch (I2PSessionException e) {
_log.error("Error destroying I2P session", e);
}
}
public void disconnected(I2PSession session) {
_log.debug("I2P session disconnected");
stopRunning();
}
public void errorOccurred(I2PSession session, String message,
Throwable error) {
_log.debug("I2P error: " + message, error);
stopRunning();
}
public void messageAvailable(I2PSession session, int msgId, long size){
_log.debug("I2P message available (id: " + msgId
+ "; size: " + size + ")");
try {
byte msg[] = session.receiveMessage(msgId);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Content of message " + msgId + ":\n"
+ HexDump.dump(msg));
}
recv.receiveRawBytes(msg);
} catch (IOException e) {
_log.error("Error forwarding message to receiver", e);
stopRunning();
} catch (I2PSessionException e) {
_log.error("Error fetching I2P message", e);
stopRunning();
close();
}
}
}
public void reportAbuse(I2PSession session, int severity) {
_log.warn("Abuse reported (severity: " + severity + ")");
stopRunning();
}
protected void shutDown() {
recv.stopRawReceiving();
}
}

View File

@ -57,28 +57,6 @@ public class SAMUtils {
}
}
/**
* Get the Base64 representation of a Destination public key
*
* @param d A Destination
*
* @return A String representing the Destination public key
*/
public static String getBase64DestinationPubKey(Destination d) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
d.writeBytes(baos);
return Base64.encode(baos.toByteArray());
} catch (IOException e) {
_log.error("getDestinationPubKey(): caught IOException", e);
return null;
} catch (DataFormatException e) {
_log.error("getDestinationPubKey(): caught DataFormatException",e);
return null;
}
}
/**
* Check whether a base64-encoded dest is valid
*

View File

@ -33,14 +33,14 @@ import net.i2p.util.Log;
*
* @author human
*/
public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStreamReceiver {
public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver {
private final static Log _log = new Log(SAMv1Handler.class);
private final static int IN_BUFSIZE = 2048;
private SAMRawSession rawSession = null;
private SAMRawSession datagramSession = null;
private SAMDatagramSession datagramSession = null;
private SAMStreamSession streamSession = null;
/**
@ -49,20 +49,16 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
* stripped) from the socket input stream.
*
* @param s Socket attached to a SAM client
* @param verMajor SAM major version to manage (should be 1)
* @param verMinor SAM minor version to manage
*/
public SAMv1Handler(Socket s, int verMajor, int verMinor) throws SAMException{
public SAMv1Handler(Socket s, int verMajor, int verMinor) throws SAMException, IOException {
super(s, verMajor, verMinor);
_log.debug("SAM version 1 handler instantiated");
this.verMajor = verMajor;
this.verMinor = verMinor;
if ((this.verMajor != 1) || (this.verMinor != 0)) {
throw new SAMException("BUG! Wrong protocol version!");
}
this.socket = s;
this.verMajor = verMajor;
this.verMinor = verMinor;
}
public void handle() {
@ -76,7 +72,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
_log.debug("SAM handling started");
try {
InputStream in = socket.getInputStream();
InputStream in = getClientSocketInputStream();
int b = -1;
while (true) {
@ -118,6 +114,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
if (domain.equals("STREAM")) {
canContinue = execStreamMessage(opcode, props);
} else if (domain.equals("DATAGRAM")) {
canContinue = execDatagramMessage(opcode, props);
} else if (domain.equals("RAW")) {
canContinue = execRawMessage(opcode, props);
} else if (domain.equals("SESSION")) {
@ -147,7 +145,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
} finally {
_log.debug("Stopping handler");
try {
this.socket.close();
closeClientSocket();
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
}
@ -204,6 +202,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
if (style.equals("RAW")) {
rawSession = new SAMRawSession(dest, props, this);
} else if (style.equals("DATAGRAM")) {
datagramSession = new SAMDatagramSession(dest, props,this);
} else if (style.equals("STREAM")) {
String dir = props.getProperty("DIRECTION");
if (dir == null) {
@ -305,7 +305,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
return writeString("NAMING REPLY RESULT=OK NAME=" + name
+ " VALUE="
+ SAMUtils.getBase64DestinationPubKey(dest)
+ dest.toBase64()
+ "\n");
} else {
_log.debug("Unrecognized NAMING message opcode: \""
@ -314,10 +314,82 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
}
}
/* Parse and execute a DATAGRAM message */
private boolean execDatagramMessage(String opcode, Properties props) {
if (datagramSession == null) {
_log.error("DATAGRAM message received, but no DATAGRAM session exists");
return false;
}
if (opcode.equals("SEND")) {
if (props == null) {
_log.debug("No parameters specified in DATAGRAM SEND message");
return false;
}
String dest = props.getProperty("DESTINATION");
if (dest == null) {
_log.debug("Destination not specified in DATAGRAM SEND message");
return false;
}
int size;
{
String strsize = props.getProperty("SIZE");
if (strsize == null) {
_log.debug("Size not specified in DATAGRAM SEND message");
return false;
}
try {
size = Integer.parseInt(strsize);
} catch (NumberFormatException e) {
_log.debug("Invalid DATAGRAM SEND size specified: " + strsize);
return false;
}
if (!checkDatagramSize(size)) {
_log.debug("Specified size (" + size
+ ") is out of protocol limits");
return false;
}
}
try {
DataInputStream in = new DataInputStream(getClientSocketInputStream());
byte[] data = new byte[size];
in.readFully(data);
if (!datagramSession.sendBytes(dest, data)) {
_log.error("DATAGRAM SEND failed");
return false;
}
return true;
} catch (EOFException e) {
_log.debug("Too few bytes with DATAGRAM SEND message (expected: "
+ size);
return false;
} catch (IOException e) {
_log.debug("Caught IOException while parsing DATAGRAM SEND message",
e);
return false;
} catch (DataFormatException e) {
_log.debug("Invalid key specified with DATAGRAM SEND message",
e);
return false;
}
} else {
_log.debug("Unrecognized DATAGRAM message opcode: \""
+ opcode + "\"");
return false;
}
}
/* Parse and execute a RAW message */
private boolean execRawMessage(String opcode, Properties props) {
if (rawSession == null) {
_log.debug("RAW message received, but no RAW session exists");
_log.error("RAW message received, but no RAW session exists");
return false;
}
@ -354,7 +426,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
}
try {
DataInputStream in = new DataInputStream(socket.getInputStream());
DataInputStream in = new DataInputStream(getClientSocketInputStream());
byte[] data = new byte[size];
in.readFully(data);
@ -388,7 +460,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
/* Parse and execute a STREAM message */
private boolean execStreamMessage(String opcode, Properties props) {
if (streamSession == null) {
_log.debug("STREAM message received, but no STREAM session exists");
_log.error("STREAM message received, but no STREAM session exists");
return false;
}
@ -434,7 +506,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
}
try {
DataInputStream in = new DataInputStream(socket.getInputStream());
DataInputStream in = new DataInputStream(getClientSocketInputStream());
byte[] data = new byte[size];
in.readFully(data);
@ -535,17 +607,16 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
}
}
public String toString() {
return "SAM v1 handler (client: "
+ this.socket.getInetAddress().toString() + ":"
+ this.socket.getPort() + ")";
}
/* Check whether a size is inside the limits allowed by this protocol */
private boolean checkSize(int size) {
return ((size >= 1) && (size <= 32768));
}
/* Check whether a size is inside the limits allowed by this protocol */
private boolean checkDatagramSize(int size) {
return ((size >= 1) && (size <= 31744));
}
// SAMRawReceiver implementation
public void receiveRawBytes(byte data[]) throws IOException {
if (rawSession == null) {
@ -571,7 +642,39 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
}
try {
this.socket.close();
closeClientSocket();
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
}
}
// SAMDatagramReceiver implementation
public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException {
if (datagramSession == null) {
_log.error("BUG! Received datagram bytes, but session is null!");
throw new NullPointerException("BUG! DATAGRAM session is null!");
}
ByteArrayOutputStream msg = new ByteArrayOutputStream();
msg.write(("DATAGRAM RECEIVED DESTINATION=" + sender.toBase64()
+ " SIZE=" + data.length
+ "\n").getBytes("ISO-8859-1"));
msg.write(data);
writeBytes(msg.toByteArray());
}
public void stopDatagramReceiving() {
_log.debug("stopDatagramReceiving() invoked");
if (datagramSession == null) {
_log.error("BUG! Got datagram receiving stop, but session is null!");
throw new NullPointerException("BUG! DATAGRAM session is null!");
}
try {
closeClientSocket();
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
}
@ -585,7 +688,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
}
if (!writeString("STREAM CONNECTED DESTINATION="
+ SAMUtils.getBase64DestinationPubKey(d)
+ d.toBase64()
+ " ID=" + id + "\n")) {
throw new IOException("Error notifying connection to SAM client");
}
@ -629,7 +732,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMStrea
}
try {
this.socket.close();
closeClientSocket();
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
}