initial impl for the new streaming lib (saying this isn't done should be obvious, but the

packet spec is at a save point)
This commit is contained in:
jrandom
2004-10-17 03:47:03 +00:00
committed by zzz
parent cebe0a151f
commit f904b012e9
4 changed files with 790 additions and 0 deletions

View File

@ -0,0 +1,216 @@
package net.i2p.client.streaming;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import net.i2p.data.ByteArray;
/**
* Stream that can be given messages out of order
* yet present them in order.
*
*/
public class MessageInputStream extends InputStream {
/**
* List of ByteArray objects of data ready to be read,
* with the first ByteArray at index 0, and the next
* actual byte to be read at _readyDataBlockIndex of
* that array.
*
*/
private List _readyDataBlocks;
private int _readyDataBlockIndex;
/** highest message ID used in the readyDataBlocks */
private long _highestReadyBlockId;
/**
* Message ID (Long) to ByteArray for blocks received
* out of order when there are lower IDs not yet
* received
*/
private Map _notYetReadyBlocks;
/**
* if we have received a flag saying there won't be later messages, EOF
* after we have cleared what we have received.
*/
private boolean _closeReceived;
/** if we don't want any more data, ignore the data */
private boolean _locallyClosed;
private int _readTimeout;
private Object _dataLock;
public MessageInputStream() {
_readyDataBlocks = new ArrayList(4);
_readyDataBlockIndex = 0;
_highestReadyBlockId = -1;
_readTimeout = -1;
_notYetReadyBlocks = new HashMap(4);
_dataLock = new Object();
_closeReceived = false;
_locallyClosed = false;
}
/** What is the highest block ID we've completely received through? */
public long getHighestReadyBockId() {
synchronized (_dataLock) {
return _highestReadyBlockId;
}
}
/**
* Ascending list of block IDs greater than the highest
* ready block ID, or null if there aren't any.
*
*/
public long[] getOutOfOrderBlocks() {
long blocks[] = null;
synchronized (_dataLock) {
int num = _notYetReadyBlocks.size();
if (num <= 0) return null;
blocks = new long[num];
int i = 0;
for (Iterator iter = _notYetReadyBlocks.keySet().iterator(); iter.hasNext(); ) {
Long id = (Long)iter.next();
blocks[i] = id.longValue();
i++;
}
}
Arrays.sort(blocks);
return blocks;
}
/** how many blocks have we received that we still have holes before? */
public int getOutOfOrderBlockCount() {
synchronized (_dataLock) {
return _notYetReadyBlocks.size();
}
}
/**
* how long a read() call should block (if less than 0, block indefinitely,
* but if it is 0, do not block at all)
*/
public int getReadTimeout() { return _readTimeout; }
public void setReadTimeout(int timeout) { _readTimeout = timeout; }
/**
* A new message has arrived - toss it on the appropriate queue (moving
* previously pending messages to the ready queue if it fills the gap, etc)
*
*/
public void messageReceived(long messageId, byte payload[]) {
synchronized (_dataLock) {
if (messageId <= _highestReadyBlockId) return; // already received
if (_highestReadyBlockId + 1 == messageId) {
if (!_locallyClosed)
_readyDataBlocks.add(new ByteArray(payload));
_highestReadyBlockId = messageId;
// now pull in any previously pending blocks
while (_notYetReadyBlocks.containsKey(new Long(_highestReadyBlockId + 1))) {
_readyDataBlocks.add(_notYetReadyBlocks.get(new Long(_highestReadyBlockId + 1)));
_highestReadyBlockId++;
}
_dataLock.notifyAll();
} else {
if (_locallyClosed) // dont need the payload, just the msgId in order
_notYetReadyBlocks.put(new Long(messageId), new ByteArray(null));
else
_notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload));
}
}
}
public int read() throws IOException {
if (_locallyClosed) throw new IOException("Already locally closed");
synchronized (_dataLock) {
if (_readyDataBlocks.size() <= 0) {
if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
return -1;
} else {
if (_readTimeout < 0) {
try { _dataLock.wait(); } catch (InterruptedException ie) { }
} else if (_readTimeout > 0) {
try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { }
} else { // readTimeout == 0
// noop, don't block
}
if (_readyDataBlocks.size() <= 0) {
throw new InterruptedIOException("Timeout reading");
}
}
}
// either was already ready, or we wait()ed and it arrived
ByteArray cur = (ByteArray)_readyDataBlocks.get(0);
byte rv = cur.getData()[_readyDataBlockIndex++];
if (cur.getData().length <= _readyDataBlockIndex) {
_readyDataBlockIndex = 0;
_readyDataBlocks.remove(0);
}
return (rv < 0 ? rv + 256 : rv);
}
}
public int available() throws IOException {
if (_locallyClosed) throw new IOException("Already closed, you wanker");
synchronized (_dataLock) {
if (_readyDataBlocks.size() <= 0)
return 0;
int numBytes = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
if (i == 0)
numBytes += cur.getData().length - _readyDataBlockIndex;
else
numBytes += cur.getData().length;
}
return numBytes;
}
}
/**
* How many bytes are queued up for reading (or sitting in the out-of-order
* buffer)?
*
*/
public int getTotalQueuedSize() {
synchronized (_dataLock) {
if (_locallyClosed) return 0;
int numBytes = 0;
for (int i = 0; i < _readyDataBlocks.size(); i++) {
ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
if (i == 0)
numBytes += cur.getData().length - _readyDataBlockIndex;
else
numBytes += cur.getData().length;
}
for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) {
ByteArray cur = (ByteArray)iter.next();
numBytes += cur.getData().length;
}
return numBytes;
}
}
public void close() {
synchronized (_dataLock) {
_readyDataBlocks.clear();
// we don't need the data, but we do need to keep track of the messageIds
// received, so we can ACK accordingly
for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) {
ByteArray ba = (ByteArray)iter.next();
ba.setData(null);
}
_locallyClosed = true;
}
}
}

View File

@ -0,0 +1,95 @@
package net.i2p.client.streaming;
import java.io.IOException;
import java.io.OutputStream;
/**
*
*/
public class MessageOutputStream extends OutputStream {
private byte _buf[];
private int _valid;
private Object _dataLock;
private DataReceiver _dataReceiver;
private IOException _streamError;
public MessageOutputStream(DataReceiver receiver) {
this(receiver, 64*1024);
}
public MessageOutputStream(DataReceiver receiver, int bufSize) {
super();
_buf = new byte[bufSize];
_dataReceiver = receiver;
_dataLock = new Object();
}
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
public void write(byte b[], int off, int len) throws IOException {
synchronized (_dataLock) {
int remaining = len;
while (remaining > 0) {
if (_valid + remaining < _buf.length) {
// simply buffer the data, no flush
System.arraycopy(b, off, _buf, _valid, remaining);
remaining = 0;
} else {
// buffer whatever we can fit then flush,
// repeating until we've pushed all of the
// data through
int toWrite = _buf.length - _valid;
System.arraycopy(b, off, _buf, _valid, toWrite);
remaining -= toWrite;
_valid = _buf.length;
_dataReceiver.writeData(_buf, 0, _valid);
_valid = 0;
throwAnyError();
}
}
}
throwAnyError();
}
public void write(int b) throws IOException {
write(new byte[] { (byte)b }, 0, 1);
throwAnyError();
}
public void flush() throws IOException {
synchronized (_dataLock) {
_dataReceiver.writeData(_buf, 0, _valid);
_valid = 0;
}
throwAnyError();
}
private void throwAnyError() throws IOException {
if (_streamError != null) {
IOException ioe = _streamError;
_streamError = null;
throw ioe;
}
}
void streamErrorOccurred(IOException ioe) {
_streamError = ioe;
}
/**
* called whenever the engine wants to push more data to the
* peer
*
*/
void flushAvailable(DataReceiver target) {
synchronized (_dataLock) {
target.writeData(_buf, 0, _valid);
_valid = 0;
}
}
public interface DataReceiver {
public void writeData(byte buf[], int off, int size);
}
}

View File

@ -0,0 +1,389 @@
package net.i2p.client.streaming;
import java.util.Arrays;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.data.Signature;
import net.i2p.data.SigningPrivateKey;
/**
* Contain a single packet transferred as part of a streaming connection.
* The data format is as follows:<ul>
* <li>{@see #getSendStreamId sendStreamId} [4 byte value]</li>
* <li>{@see #getReceiveStreamId receiveStreamId} [4 byte value]</li>
* <li>{@see #getSequenceNum sequenceNum} [4 byte unsigned integer]</li>
* <li>{@see #getAckThrough ackThrough} [4 byte unsigned integer]</li>
* <li>number of NACKs [1 byte unsigned integer]</li>
* <li>that many {@see #getNacks NACKs}</li>
* <li>{@see #getResendDelay resendDelay} [1 byte integer]</li>
* <li>flags [2 byte value]</li>
* <li>option data size [2 byte integer]</li>
* <li>option data specified by those flags [0 or more bytes]</li>
* <li>payload [remaining packet size]</li>
* </ul>
*
* <p>The flags field above specifies some metadata about the packet, and in
* turn may require certain additional data to be included. The flags are
* as follows (with any data structures specified added to the options area
* in the given order):</p><ol>
* <li>{@see #FLAG_SYNCHRONIZE}: no option data</li>
* <li>{@see #FLAG_CLOSE}: no option data</li>
* <li>{@see #FLAG_RESET}: no option data</li>
* <li>{@see #FLAG_SIGNATURE_INCLUDED}: {@see net.i2p.data.Signature}</li>
* <li>{@see #FLAG_SIGNATURE_REQUESTED}: no option data</li>
* <li>{@see #FLAG_FROM_INCLUDED}: {@see net.i2p.data.Destination}</li>
* <li>{@see #FLAG_DELAY_REQUESTED}: 1 byte integer</li>
* <li>{@see #FLAG_MAX_PACKET_SIZE_INCLUDED}: 2 byte integer</li>
* <li>{@see #FLAG_PROFILE_INTERACTIVE}: no option data</li>
* </ol>
*
* <p>If the signature is included, it uses the Destination's DSA key
* to sign the entire header and payload with the space in the options
* for the signature being set to all zeroes.</p>
*
*/
public class Packet {
private byte _sendStreamId[];
private byte _receiveStreamId[];
private long _sequenceNum;
private long _ackThrough;
private long _nacks[];
private int _resendDelay;
private int _flags;
private byte _payload[];
// the next four are set only if the flags say so
private Signature _optionSignature;
private Destination _optionFrom;
private int _optionDelay;
private int _optionMaxSize;
/**
* The receiveStreamId will be set to this when the packet doesn't know
* what ID will be assigned by the remote peer (aka this is the initial
* synchronize packet)
*
*/
public static final byte RECEIVE_STREAM_ID_UNKNOWN[] = new byte[] { 0x00, 0x00, 0x00, 0x00 };
/**
* This packet is creating a new socket connection (if the receiveStreamId
* is RECEIVE_STREAM_ID_UNKNOWN) or it is acknowledging a request to
* create a connection and in turn is accepting the socket.
*
*/
public static final int FLAG_SYNCHRONIZE = (1 << 0);
/**
* The sender of this packet will not be sending any more payload data.
*/
public static final int FLAG_CLOSE = (1 << 1);
/**
* This packet is being sent to signify that the socket does not exist
* (or, if in response to an initial synchronize packet, that the
* connection was refused).
*
*/
public static final int FLAG_RESET = (1 << 2);
/**
* This packet contains a DSA signature from the packet's sender. This
* signature is within the packet options. All synchronize packets must
* have this flag set.
*
*/
public static final int FLAG_SIGNATURE_INCLUDED = (1 << 3);
/**
* This packet wants the recipient to include signatures on subsequent
* packets sent to the creator of this packet.
*/
public static final int FLAG_SIGNATURE_REQUESTED = (1 << 4);
/**
* This packet includes the full I2P destination of the packet's sender.
* The initial synchronize packet must have this flag set.
*/
public static final int FLAG_FROM_INCLUDED = (1 << 5);
/**
* This packet includes an explicit request for the recipient to delay
* sending any packets with data for a given amount of time.
*
*/
public static final int FLAG_DELAY_REQUESTED = (1 << 6);
/**
* This packet includes a request that the recipient not send any
* subsequent packets with payloads greater than a specific size.
* If not set and no prior value was delivered, the maximum value
* will be assumed (approximately 32KB).
*
*/
public static final int FLAG_MAX_PACKET_SIZE_INCLUDED = (1 << 7);
/**
* If set, this packet is travelling as part of an interactive flow,
* meaning it is more lag sensitive than throughput sensitive. aka
* send data ASAP rather than waiting around to send full packets.
*
*/
public static final int FLAG_PROFILE_INTERACTIVE = (1 << 8);
/** what stream is this packet a part of? */
public byte[] getSendStreamId() { return _sendStreamId; }
public void setSendStreamId(byte[] id) { _sendStreamId = id; }
/**
* what is the stream replies should be sent on? if the
* connection is still being built, this should be
* {@see #RECEIVE_STREAM_ID_UNKNOWN}.
*
*/
public byte[] getReceiveStreamId() { return _receiveStreamId; }
public void setReceiveStreamId(byte[] id) { _receiveStreamId = id; }
/** 0-indexed sequence number for this Packet in the sendStream */
public long getSequenceNum() { return _sequenceNum; }
public void setSequenceNum(long num) { _sequenceNum = num; }
/**
* what is the highest packet sequence number that received
* on the receiveStreamId? This field is ignored on the initial
* connection packet (where receiveStreamId is the unknown id).
*
*/
public long getAckThrough() { return _ackThrough; }
public void setAckThrough(long id) { _ackThrough = id; }
/**
* What packet sequence numbers below the getAckThrough() value
* have not been received? this may be null.
*
*/
public long[] getNacks() { return _nacks; }
public void setNacks(long nacks[]) { _nacks = nacks; }
/**
* How long is the creator of this packet going to wait before
* resending this packet (if it hasn't yet been ACKed). The
* value is seconds since the packet was created.
*
*/
public int getResendDelay() { return _resendDelay; }
public void setResendDelay(int numSeconds) { _resendDelay = numSeconds; }
/** get the actual payload of the message. may be null */
public byte[] getPayload() { return _payload; }
public void setPayload(byte payload[]) { _payload = payload; }
/** is a particular flag set on this packet? */
public boolean isFlagSet(int flag) { return 0 != (_flags & flag); }
public void setFlag(int flag) { _flags |= flag; }
/** the signature on the packet (only included if the flag for it is set) */
public Signature getOptionalSignature() { return _optionSignature; }
public void setOptionalSignature(Signature sig) { _optionSignature = sig; }
/** the sender of the packet (only included if the flag for it is set) */
public Destination getOptionalFrom() { return _optionFrom; }
public void setOptionalFrom(Destination from) { _optionFrom = from; }
/**
* How many milliseconds the sender of this packet wants the recipient
* to wait before sending any more data (only valid if the flag for it is
* set)
*/
public int getOptionalDelay() { return _optionDelay; }
public void setOptionalDelay(int delayMs) { _optionDelay = delayMs; }
/**
* What is the largest payload the sender of this packet wants to receive?
*
*/
public int getOptionalMaxSize() { return _optionMaxSize; }
public void setOptionalMaxSize(int numBytes) { _optionMaxSize = numBytes; }
/**
* Write the packet to the buffer (starting at the offset) and return
* the number of bytes written.
*
* @throws IllegalStateException if there is data missing or otherwise b0rked
*/
public int writePacket(byte buffer[], int offset) throws IllegalStateException {
return writePacket(buffer, offset, true);
}
/**
* @param includeSig if true, include the real signature, otherwise put zeroes
* in its place.
*/
private int writePacket(byte buffer[], int offset, boolean includeSig) throws IllegalStateException {
int cur = offset;
System.arraycopy(_sendStreamId, 0, buffer, cur, _sendStreamId.length);
cur += _sendStreamId.length;
System.arraycopy(_receiveStreamId, 0, buffer, cur, _receiveStreamId.length);
cur += _receiveStreamId.length;
DataHelper.toLong(buffer, cur, 4, _sequenceNum);
cur += 4;
DataHelper.toLong(buffer, cur, 4, _ackThrough);
cur += 4;
if (_nacks != null) {
DataHelper.toLong(buffer, cur, 1, _nacks.length);
cur++;
for (int i = 0; i < _nacks.length; i++) {
DataHelper.toLong(buffer, cur, 4, _nacks[i]);
cur += 4;
}
} else {
DataHelper.toLong(buffer, cur, 1, 0);
cur++;
}
DataHelper.toLong(buffer, cur, 1, _resendDelay);
cur++;
DataHelper.toLong(buffer, cur, 2, _flags);
cur += 2;
int optionSize = 0;
if (isFlagSet(FLAG_DELAY_REQUESTED))
optionSize += 1;
if (isFlagSet(FLAG_FROM_INCLUDED))
optionSize += _optionFrom.size();
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED))
optionSize += 2;
if (isFlagSet(FLAG_SIGNATURE_INCLUDED))
optionSize += Signature.SIGNATURE_BYTES;
DataHelper.toLong(buffer, cur, 2, optionSize);
cur += 2;
if (isFlagSet(FLAG_DELAY_REQUESTED)) {
DataHelper.toLong(buffer, cur, 1, _optionDelay);
cur++;
}
if (isFlagSet(FLAG_FROM_INCLUDED)) {
cur += _optionFrom.writeBytes(buffer, cur);
}
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) {
DataHelper.toLong(buffer, cur, 2, _optionMaxSize);
cur += 2;
}
if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) {
if (includeSig)
System.arraycopy(_optionSignature.getData(), 0, buffer, cur, Signature.SIGNATURE_BYTES);
else // we're signing (or validating)
Arrays.fill(buffer, cur, Signature.SIGNATURE_BYTES, (byte)0x0);
cur += Signature.SIGNATURE_BYTES;
}
if (_payload != null) {
System.arraycopy(_payload, 0, buffer, cur, _payload.length);
cur += _payload.length;
}
return cur - offset;
}
/**
* Read the packet from the buffer (starting at the offset) and return
* the number of bytes read.
*
* @param buffer packet buffer containing the data
* @param offset index into the buffer to start readign
* @param length how many bytes within the buffer past the offset are
* part of the packet?
*
* @throws IllegalArgumentException if the data is b0rked
*/
public void readPacket(byte buffer[], int offset, int length) throws IllegalArgumentException {
int cur = offset;
_sendStreamId = new byte[4];
System.arraycopy(buffer, cur, _sendStreamId, 0, 4);
cur += 4;
_receiveStreamId = new byte[4];
System.arraycopy(buffer, cur, _receiveStreamId, 0, 4);
cur += 4;
_sequenceNum = DataHelper.fromLong(buffer, cur, 4);
cur += 4;
_ackThrough = DataHelper.fromLong(buffer, cur, 4);
cur += 4;
int numNacks = (int)DataHelper.fromLong(buffer, cur, 1);
cur++;
if (numNacks > 0) {
_nacks = new long[numNacks];
for (int i = 0; i < numNacks; i++) {
_nacks[i] = DataHelper.fromLong(buffer, cur, 4);
cur += 4;
}
} else {
_nacks = null;
}
_resendDelay = (int)DataHelper.fromLong(buffer, cur, 1);
cur++;
_flags = (int)DataHelper.fromLong(buffer, cur, 2);
cur += 2;
int optionSize = (int)DataHelper.fromLong(buffer, cur, 2);
cur += 2;
int payloadBegin = cur + optionSize;
// skip ahead to the payload
_payload = new byte[offset + length - payloadBegin];
System.arraycopy(buffer, payloadBegin, _payload, 0, _payload.length);
// ok now lets go back and deal with the options
if (isFlagSet(FLAG_DELAY_REQUESTED)) {
_optionDelay = (int)DataHelper.fromLong(buffer, cur, 1);
cur++;
}
if (isFlagSet(FLAG_FROM_INCLUDED)) {
_optionFrom = new Destination();
cur += _optionFrom.readBytes(buffer, cur);
}
if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) {
_optionMaxSize = (int)DataHelper.fromLong(buffer, cur, 2);
cur += 2;
}
if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) {
Signature sig = new Signature();
byte buf[] = new byte[Signature.SIGNATURE_BYTES];
System.arraycopy(buffer, cur, buf, 0, Signature.SIGNATURE_BYTES);
sig.setData(buf);
cur += Signature.SIGNATURE_BYTES;
}
}
/**
* Determine whether the signature on the data is valid.
*
* @return true if the signature exists and validates against the data,
* false otherwise.
*/
public boolean verifySignature(I2PAppContext ctx, Destination from, byte buffer[]) {
if (!isFlagSet(FLAG_SIGNATURE_INCLUDED)) return false;
if (_optionSignature == null) return false;
int size = writePacket(buffer, 0, false);
return ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey());
}
/**
* Sign and write the packet to the buffer (starting at the offset) and return
* the number of bytes written.
*
* @throws IllegalStateException if there is data missing or otherwise b0rked
*/
public int writeSignedPacket(byte buffer[], int offset, I2PAppContext ctx, SigningPrivateKey key) throws IllegalStateException {
setFlag(FLAG_SIGNATURE_INCLUDED);
int size = writePacket(buffer, offset, false);
_optionSignature = ctx.dsa().sign(buffer, offset, size, key);
// jump into the signed data and inject the signature where we
// previously placed a bunch of zeroes
int signatureOffset = offset
+ 4 // sendStreamId
+ 4 // receiveStreamId
+ 4 // sequenceNum
+ 4 // ackThrough
+ (_nacks != null ? 4*_nacks.length + 1 : 1)
+ 1 // resendDelay
+ 2 // flags
+ 2 // optionSize
+ (isFlagSet(FLAG_DELAY_REQUESTED) ? 1 : 0)
+ (isFlagSet(FLAG_FROM_INCLUDED) ? _optionFrom.size() : 0)
+ (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED) ? 2 : 0);
System.arraycopy(_optionSignature.getData(), 0, buffer, signatureOffset, Signature.SIGNATURE_BYTES);
return size;
}
}

View File

@ -0,0 +1,90 @@
package net.i2p.client.streaming;
import java.io.InputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
* Stress test the MessageInputStream
*/
public class MessageInputStreamTest {
private I2PAppContext _context;
private Log _log;
public MessageInputStreamTest() {
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(MessageInputStreamTest.class);
}
public void testInOrder() {
byte orig[] = new byte[32*1024];
_context.random().nextBytes(orig);
MessageInputStream in = new MessageInputStream();
for (int i = 0; i < 32; i++) {
byte msg[] = new byte[1024];
System.arraycopy(orig, i*1024, msg, 0, 1024);
in.messageReceived(i, msg);
}
byte read[] = new byte[32*1024];
try {
int howMany = DataHelper.read(in, read);
if (howMany != orig.length)
throw new RuntimeException("Failed test: not enough bytes read [" + howMany + "]");
if (!DataHelper.eq(orig, read))
throw new RuntimeException("Failed test: data read is not equal");
_log.info("Passed test: in order");
} catch (IOException ioe) {
throw new RuntimeException("IOError reading: " + ioe.getMessage());
}
}
public void testRandomOrder() {
byte orig[] = new byte[32*1024];
_context.random().nextBytes(orig);
MessageInputStream in = new MessageInputStream();
ArrayList order = new ArrayList(32);
for (int i = 0; i < 32; i++)
order.add(new Integer(i));
Collections.shuffle(order);
for (int i = 0; i < 32; i++) {
byte msg[] = new byte[1024];
Integer cur = (Integer)order.get(i);
System.arraycopy(orig, cur.intValue()*1024, msg, 0, 1024);
in.messageReceived(cur.intValue(), msg);
_log.debug("Injecting " + cur);
}
byte read[] = new byte[32*1024];
try {
int howMany = DataHelper.read(in, read);
if (howMany != orig.length)
throw new RuntimeException("Failed test: not enough bytes read [" + howMany + "]");
if (!DataHelper.eq(orig, read))
throw new RuntimeException("Failed test: data read is not equal");
_log.info("Passed test: random order");
} catch (IOException ioe) {
throw new RuntimeException("IOError reading: " + ioe.getMessage());
}
}
public static void main(String args[]) {
MessageInputStreamTest t = new MessageInputStreamTest();
try {
t.testInOrder();
t.testRandomOrder();
} catch (Exception e) {
e.printStackTrace();
}
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
}
}