Add v3 FORWARD support to sink

This commit is contained in:
zzz
2015-11-27 20:34:11 +00:00
parent cb979fb685
commit 2849aec3c2

View File

@ -8,12 +8,14 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.Socket;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLServerSocket;
import gnu.getopt.Getopt;
@ -49,11 +51,12 @@ public class SAMStreamSink {
private final Map<String, Sink> _remotePeers;
private static I2PSSLSocketFactory _sslSocketFactory;
private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4, RAWHDR = 5;
private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4, RAWHDR = 5, FORWARD = 6;
private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] myDestFile sinkDir\n" +
" modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4 raw-with-headers: 5\n" +
" modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4; raw-with-headers: 5; stream-forward: 6\n" +
" -s: use SSL\n" +
" multiple -o session options are allowed";
private static final int V3FORWARDPORT=9998;
private static final int V3DGPORT=9999;
public static void main(String args[]) {
@ -75,7 +78,7 @@ public class SAMStreamSink {
case 'm':
mode = Integer.parseInt(g.getOptarg());
if (mode < 0 || mode > RAWHDR) {
if (mode < 0 || mode > FORWARD) {
System.err.println(USAGE);
return;
}
@ -167,9 +170,9 @@ public class SAMStreamSink {
Thread t = new Pinger(out);
t.start();
}
if (_isV3 && mode == STREAM) {
if (_isV3 && (mode == STREAM || mode == FORWARD)) {
// test multiple acceptors, only works in 3.2
int acceptors = isV32 ? 4 : 1;
int acceptors = (_isV32 && mode == STREAM) ? 4 : 1;
for (int i = 0; i < acceptors; i++) {
Socket sock2 = connect(isSSL);
out = sock2.getOutputStream();
@ -184,6 +187,10 @@ public class SAMStreamSink {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handshake " + (2 + i) + " complete.");
}
if (mode == FORWARD) {
// set up a listening ServerSocket
(new FwdRcvr(isSSL)).start();
}
} else if (_isV3 && (mode == DG || mode == RAW || mode == RAWHDR)) {
// set up a listening DatagramSocket
(new DGRcvr(mode)).start();
@ -244,6 +251,44 @@ public class SAMStreamSink {
}
}
private class FwdRcvr extends I2PAppThread {
private final boolean _isSSL;
public FwdRcvr(boolean isSSL) {
if (isSSL)
throw new UnsupportedOperationException("TODO");
_isSSL = isSSL;
}
public void run() {
try {
ServerSocket ss;
if (_isSSL) {
throw new UnsupportedOperationException("TODO");
} else {
ss = new ServerSocket(V3FORWARDPORT);
}
while (true) {
Socket s = ss.accept();
Sink sink = new Sink("FAKE", "FAKEFROM");
try {
InputStream in = s.getInputStream();
byte[] buf = new byte[32768];
int len;
while((len = in.read(buf)) >= 0) {
sink.received(buf, 0, len);
}
sink.closed();
} catch (IOException ioe) {
_log.error("Fwdcvr", ioe);
}
}
} catch (IOException ioe) {
_log.error("Fwdcvr", ioe);
}
}
}
private static class Pinger extends I2PAppThread {
private final OutputStream _out;
@ -480,17 +525,25 @@ public class SAMStreamSink {
// only for v3
//String req = "STREAM ACCEPT SILENT=true ID=" + _v3ID + "\n";
// TO_PORT not supported until 3.2 but 3.0-3.1 will ignore
String req = "STREAM ACCEPT SILENT=false TO_PORT=5678 ID=" + _v3ID + "\n";
String req;
if (mode == STREAM)
req = "STREAM ACCEPT SILENT=false TO_PORT=5678 ID=" + _v3ID + "\n";
else if (mode == FORWARD)
req = "STREAM FORWARD ID=" + _v3ID + " PORT=" + V3FORWARDPORT + '\n';
else
throw new IllegalStateException("mode " + mode);
samOut.write(req.getBytes());
samOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("STREAM ACCEPT sent");
// docs were wrong, we do not get a STREAM STATUS if SILENT=true
//boolean ok = eventHandler.waitForStreamStatusReply();
//if (!ok)
// throw new IOException("Stream status failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("got STREAM STATUS, awaiting connection");
_log.debug("STREAM ACCEPT/FORWARD sent");
if (mode == FORWARD) {
// docs were wrong, we do not get a STREAM STATUS if SILENT=true for ACCEPT
boolean ok = eventHandler.waitForStreamStatusReply();
if (!ok)
throw new IOException("Stream status failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("got STREAM STATUS, awaiting connection");
}
return "OK";
}
_isV3 = VersionComparator.comp(hisVersion, "3") >= 0;
@ -530,7 +583,7 @@ public class SAMStreamSink {
dest = _destFile;
}
String style;
if (mode == STREAM)
if (mode == STREAM || mode == FORWARD)
style = "STREAM";
else if (mode == V1DG)
style = "DATAGRAM";