Files
i2p.i2p/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java

391 lines
13 KiB
Java
Raw Normal View History

2004-04-08 04:41:54 +00:00
package net.i2p.client.streaming;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import net.i2p.I2PException;
import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
2004-04-08 04:41:54 +00:00
/**
* Initial stub implementation for the socket
*
*/
class I2PSocketImpl implements I2PSocket {
private final static Log _log = new Log(I2PSocketImpl.class);
2004-04-10 11:50:11 +00:00
public static final int MAX_PACKET_SIZE = 1024 * 32;
public static final int PACKET_DELAY = 100;
2004-04-08 04:41:54 +00:00
private I2PSocketManager manager;
private Destination local;
private Destination remote;
private String localID;
private String remoteID;
private Object remoteIDWaiter = new Object();
private I2PInputStream in;
private I2POutputStream out;
private boolean outgoing;
private Object flagLock = new Object();
2004-04-10 11:50:11 +00:00
private boolean closed = false, sendClose = true, closed2 = false;
public I2PSocketImpl(Destination peer, I2PSocketManager mgr, boolean outgoing, String localID) {
this.outgoing = outgoing;
manager = mgr;
remote = peer;
local = mgr.getSession().getMyDestination();
in = new I2PInputStream();
I2PInputStream pin = new I2PInputStream();
out = new I2POutputStream(pin);
new I2PSocketRunner(pin);
this.localID = localID;
2004-04-08 04:41:54 +00:00
}
2004-04-10 11:50:11 +00:00
2004-04-08 04:41:54 +00:00
public String getLocalID() {
2004-04-10 11:50:11 +00:00
return localID;
2004-04-08 04:41:54 +00:00
}
public void setRemoteID(String id) {
2004-04-10 11:50:11 +00:00
synchronized (remoteIDWaiter) {
remoteID = id;
remoteIDWaiter.notifyAll();
}
2004-04-08 04:41:54 +00:00
}
public String getRemoteID(boolean wait) throws InterruptedIOException {
2004-04-10 11:50:11 +00:00
return getRemoteID(wait, -1);
2004-04-08 04:41:54 +00:00
}
2004-04-10 11:50:11 +00:00
2004-04-08 04:41:54 +00:00
public String getRemoteID(boolean wait, long maxWait) throws InterruptedIOException {
2004-04-10 11:50:11 +00:00
long dieAfter = System.currentTimeMillis() + maxWait;
synchronized (remoteIDWaiter) {
if (wait) {
2004-04-10 11:50:11 +00:00
try {
if (maxWait >= 0)
2004-04-10 11:50:11 +00:00
remoteIDWaiter.wait(maxWait);
else
remoteIDWaiter.wait();
} catch (InterruptedException ex) {
}
if ((maxWait >= 0) && (System.currentTimeMillis() >= dieAfter))
2004-04-10 11:50:11 +00:00
throw new InterruptedIOException("Timed out waiting for remote ID");
if (_log.shouldLog(Log.DEBUG))
_log.debug("TIMING: RemoteID set to "
+ I2PSocketManager.getReadableForm(remoteID) + " for "
+ this.hashCode());
2004-04-10 11:50:11 +00:00
}
return remoteID;
}
2004-04-08 04:41:54 +00:00
}
public String getRemoteID() throws InterruptedIOException {
2004-04-10 11:50:11 +00:00
return getRemoteID(false);
2004-04-08 04:41:54 +00:00
}
public void queueData(byte[] data) {
2004-04-10 11:50:11 +00:00
in.queueData(data);
2004-04-08 04:41:54 +00:00
}
/**
* Return the Destination of this side of the socket.
*/
2004-04-10 11:50:11 +00:00
public Destination getThisDestination() {
return local;
}
2004-04-08 04:41:54 +00:00
/**
* Return the destination of the peer.
*/
2004-04-10 11:50:11 +00:00
public Destination getPeerDestination() {
return remote;
}
2004-04-08 04:41:54 +00:00
/**
* Return an InputStream to read from the socket.
*/
2004-04-10 11:50:11 +00:00
public InputStream getInputStream() throws IOException {
if ((in == null)) throw new IOException("Not connected");
return in;
2004-04-08 04:41:54 +00:00
}
/**
* Return an OutputStream to write into the socket.
*/
public OutputStream getOutputStream() throws IOException {
2004-04-10 11:50:11 +00:00
if ((out == null)) throw new IOException("Not connected");
return out;
2004-04-08 04:41:54 +00:00
}
/**
* Closes the socket if not closed yet
*/
public void close() throws IOException {
2004-04-10 11:50:11 +00:00
synchronized (flagLock) {
_log.debug("Closing connection");
closed = true;
}
out.close();
in.notifyClosed();
2004-04-08 04:41:54 +00:00
}
public void internalClose() {
2004-04-10 11:50:11 +00:00
synchronized (flagLock) {
closed = true;
closed2 = true;
sendClose = false;
}
out.close();
in.notifyClosed();
2004-04-08 04:41:54 +00:00
}
private byte getMask(int add) {
if (outgoing)
return (byte)(I2PSocketManager.DATA_IN + (byte)add);
else
return (byte)(I2PSocketManager.DATA_OUT + (byte)add);
2004-04-08 04:41:54 +00:00
}
2004-04-10 11:50:11 +00:00
public long getReadTimeout() {
return in.getReadTimeout();
}
public void setReadTimeout(long ms) {
in.setReadTimeout(ms);
}
2004-04-08 04:41:54 +00:00
//--------------------------------------------------
public class I2PInputStream extends InputStream {
2004-04-10 11:50:11 +00:00
private ByteCollector bc = new ByteCollector();
private long readTimeout = -1;
public long getReadTimeout() {
return readTimeout;
}
public void setReadTimeout(long ms) {
readTimeout = ms;
}
2004-04-10 11:50:11 +00:00
public int read() throws IOException {
byte[] b = new byte[1];
int res = read(b);
if (res == 1) return b[0] & 0xff;
if (res == -1) return -1;
throw new RuntimeException("Incorrect read() result");
}
public synchronized int read(byte[] b, int off, int len) throws IOException {
_log.debug("Read called: " + this.hashCode());
if (len == 0) return 0;
long dieAfter = System.currentTimeMillis() + readTimeout;
2004-04-10 11:50:11 +00:00
byte[] read = bc.startToByteArray(len);
boolean timedOut = false;
2004-04-10 11:50:11 +00:00
while (read.length == 0) {
synchronized (flagLock) {
if (closed) {
_log.debug("Closed is set, so closing stream: " + hashCode());
2004-04-10 11:50:11 +00:00
return -1;
}
}
try {
if (readTimeout >= 0) {
wait(readTimeout);
} else {
wait();
}
} catch (InterruptedException ex) {}
if ((readTimeout >= 0)
&& (System.currentTimeMillis() >= dieAfter)) {
throw new InterruptedIOException("Timeout reading from I2PSocket (" + readTimeout + " msecs)");
2004-04-10 11:50:11 +00:00
}
2004-04-10 11:50:11 +00:00
read = bc.startToByteArray(len);
}
if (read.length > len) throw new RuntimeException("BUG");
System.arraycopy(read, 0, b, off, read.length);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Read from I2PInputStream " + hashCode() + " returned "
+ read.length + " bytes");
2004-04-10 11:50:11 +00:00
}
//if (_log.shouldLog(Log.DEBUG)) {
// _log.debug("Read from I2PInputStream " + this.hashCode()
// + " returned "+read.length+" bytes:\n"
// + HexDump.dump(read));
2004-04-10 11:50:11 +00:00
//}
return read.length;
}
public int available() {
return bc.getCurrentSize();
}
public void queueData(byte[] data) {
queueData(data, 0, data.length);
}
public synchronized void queueData(byte[] data, int off, int len) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Insert " + len + " bytes into queue: " + hashCode());
2004-04-10 11:50:11 +00:00
bc.append(data, off, len);
notifyAll();
}
public synchronized void notifyClosed() {
notifyAll();
}
2004-04-08 04:41:54 +00:00
}
public class I2POutputStream extends OutputStream {
2004-04-10 11:50:11 +00:00
public I2PInputStream sendTo;
public I2POutputStream(I2PInputStream sendTo) {
this.sendTo = sendTo;
}
public void write(int b) throws IOException {
write(new byte[] { (byte) b});
}
public void write(byte[] b, int off, int len) throws IOException {
sendTo.queueData(b, off, len);
}
public void close() {
sendTo.notifyClosed();
}
2004-04-08 04:41:54 +00:00
}
public class I2PSocketRunner extends I2PThread {
2004-04-08 04:41:54 +00:00
2004-04-10 11:50:11 +00:00
public InputStream in;
public I2PSocketRunner(InputStream in) {
_log.debug("Runner's input stream is: " + in.hashCode());
this.in = in;
String peer = I2PSocketImpl.this.remote.calculateHash().toBase64();
setName("SocketRunner from " + peer.substring(0, 4));
2004-04-10 11:50:11 +00:00
start();
}
/**
* Pump some more data
*
* @return true if we should keep on handling, false otherwise
*/
private boolean handleNextPacket(ByteCollector bc, byte buffer[])
throws IOException, I2PSessionException {
int len = in.read(buffer);
int bcsize = bc.getCurrentSize();
if (len != -1) {
bc.append(buffer, len);
} else if (bcsize == 0) {
// nothing left in the buffer, but the read(..) didn't EOF (-1)
// this used to be 'break' (aka return false), though that seems
// odd to me - shouldn't it keep reading packets until EOF?
// but perhaps there's something funky in the stream's operation,
// or some other dependency within the rest of the ministreaming
// lib, so for the moment, return false. --jr
return false;
}
if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Runner Point d: " + hashCode());
try {
Thread.sleep(PACKET_DELAY);
} catch (InterruptedException e) {
_log.warn("wtf", e);
}
}
if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
byte[] data = bc.startToByteArray(MAX_PACKET_SIZE);
if (data.length > 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Message size is: " + data.length);
boolean sent = sendBlock(data);
if (!sent) {
_log.error("Error sending message to peer. Killing socket runner");
return false;
}
}
}
return true;
}
2004-04-10 11:50:11 +00:00
public void run() {
byte[] buffer = new byte[MAX_PACKET_SIZE];
ByteCollector bc = new ByteCollector();
boolean keepHandling = true;
int packetsHandled = 0;
2004-04-10 11:50:11 +00:00
try {
// try {
while (keepHandling) {
keepHandling = handleNextPacket(bc, buffer);
packetsHandled++;
2004-04-10 11:50:11 +00:00
}
if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
_log.error("A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: "
+ in.hashCode() + "; "
2004-04-10 11:50:11 +00:00
+ "queue size: " + bc.getCurrentSize() + ")");
}
synchronized (flagLock) {
closed2 = true;
}
boolean sc;
synchronized (flagLock) {
sc = sendClose;
} // FIXME: Race here?
if (sc) {
if (_log.shouldLog(Log.INFO))
_log.info("Sending close packet: " + outgoing);
byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]);
boolean sent = manager.getSession().sendMessage(remote, packet);
2004-04-10 11:50:11 +00:00
if (!sent) {
_log.error("Error sending close packet to peer");
}
}
manager.removeSocket(I2PSocketImpl.this);
} catch (InterruptedIOException ex) {
_log.error("BUG! read() operations should not timeout!", ex);
2004-04-10 11:50:11 +00:00
} catch (IOException ex) {
// WHOEVER removes this event on inconsistent
// state before fixing the inconsistent state (a
// reference on the socket in the socket manager
// etc.) will get hanged by me personally -- mihi
_log.error("Error running - **INCONSISTENT STATE!!!**", ex);
} catch (I2PException ex) {
_log.error("Error running - **INCONSISTENT STATE!!!**", ex);
}
}
private boolean sendBlock(byte data[]) throws I2PSessionException {
if (_log.shouldLog(Log.DEBUG))
_log.debug("TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
2004-04-10 11:50:11 +00:00
if (remoteID == null) {
_log.error("NULL REMOTEID");
return false;
}
byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID, data);
boolean sent;
synchronized (flagLock) {
if (closed2) return false;
}
sent = manager.getSession().sendMessage(remote, packet);
2004-04-10 11:50:11 +00:00
return sent;
}
2004-04-08 04:41:54 +00:00
}
}