2004-12-03 jrandom

* Toss in a small pool of threads (3) to execute the events queued up with
      the SimpleTimer, as we do currently see the occational event
      notification spiking up to a second or so.
    * Implement a SAM client API in java, useful for event based streaming (or
      for testing the SAM bridge)
    * Added support to shut down the SAM bridge on OOM (useful if the SAM
      bridge is being run outside of the router).
    * Include the SAM test code in the sam.jar
    * Remove an irrelevent warning message from SAM, which was caused by
      perfectly normal operation due to a session being closed.
    * Removed some unnecessary synchronization in the streaming lib's
      PacketQueue
    * More quickly clean up the memory used by the streaming lib by
      immediately killing each packet's resend job as soon as it is ACKed (or
      cancelled), so that there are no longer any valid pointers to the
      (potentially 32KB) packet.
    * Fixed the timestamps dumped to stdout when debugging the PacketHandler.
    * Drop packets that would expand our inbound window beyond our maximum
      buffer size (default 32 messages)
    * Always read the ACK/NACK data from the verified packets received, even
      if we are going to drop them
    * Always adjust the window when there are messages ACKed, though do not
      change its size except as before.
    * Streamlined some synchronization in the router's I2CP handling
    * Streamlined some memory allocation in the SAM bridge
    * Default the streaming lib to disconnect on inactivity, rather than send
      an empty message.
this still doesnt get the BT to where it needs to be, or fix the timeout problem,
but i dont like having so many commits outstanding and these updates are sound
This commit is contained in:
jrandom
2004-12-04 23:40:50 +00:00
committed by zzz
parent f54687f398
commit 1a30cd5f4a
29 changed files with 1287 additions and 186 deletions

View File

@ -108,6 +108,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
private boolean sendBestEffort(Destination dest, byte payload[], SessionKey keyUsed, Set tagsSent)
throws I2PSessionException {
long begin = _context.clock().now();
SessionKey key = _context.sessionKeyManager().getCurrentKey(dest.getPublicKey());
if (key == null) key = _context.sessionKeyManager().createSession(dest.getPublicKey());
SessionTag tag = _context.sessionKeyManager().consumeNextAvailableTag(dest.getPublicKey(), key);
@ -180,9 +182,17 @@ class I2PSessionImpl2 extends I2PSessionImpl {
+ " sync took " + (inSendingSync-beforeSendingSync)
+ " add took " + (afterSendingSync-inSendingSync));
_producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey);
// since this is 'best effort', all we're waiting for is a status update
// saying that the router received it - in theory, that should come back
// immediately, but in practice can take up to a second (though usually
// much quicker). setting this to false will short-circuit that delay
boolean actuallyWait = true;
long beforeWaitFor = _context.clock().now();
state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED,
_context.clock().now() + getTimeout());
if (actuallyWait)
state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED,
_context.clock().now() + getTimeout());
long afterWaitFor = _context.clock().now();
long inRemovingSync = 0;
synchronized (_sendingStates) {
@ -190,7 +200,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
_sendingStates.remove(state);
}
long afterRemovingSync = _context.clock().now();
boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
boolean found = !actuallyWait || state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED);
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId()
+ " / " + state.getNonce() + " found = " + found);
@ -200,6 +210,13 @@ class I2PSessionImpl2 extends I2PSessionImpl {
_log.warn("wtf, took " + timeToSend + "ms to send the message?!", new Exception("baz"));
}
if ( (afterRemovingSync - begin > 500) && (_log.shouldLog(Log.WARN) ) ) {
_log.warn("Took " + (afterRemovingSync-begin) + "ms to sendBestEffort, "
+ (afterSendingSync-begin) + "ms to prepare, "
+ (beforeWaitFor-afterSendingSync) + "ms to send, "
+ (afterRemovingSync-beforeWaitFor) + "ms waiting for reply");
}
if (found) {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with "

View File

@ -16,6 +16,7 @@ import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.ReceiveMessageEndMessage;
import net.i2p.util.Log;
/**
* Handle I2CP MessagePayloadMessages from the router delivering the contents
@ -30,7 +31,8 @@ class MessagePayloadMessageHandler extends HandlerImpl {
}
public void handleMessage(I2CPMessage message, I2PSessionImpl session) {
_log.debug("Handle message " + message);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Handle message " + message);
try {
MessagePayloadMessage msg = (MessagePayloadMessage) message;
MessageId id = msg.getMessageId();
@ -55,9 +57,8 @@ class MessagePayloadMessageHandler extends HandlerImpl {
Payload payload = msg.getPayload();
byte[] data = _context.elGamalAESEngine().decrypt(payload.getEncryptedData(), session.getDecryptionKey());
if (data == null) {
_log
.error("Error decrypting the payload to public key "
+ session.getMyDestination().getPublicKey().toBase64() + "\nPayload: " + payload.calculateHash());
if (_log.shouldLog(Log.ERROR))
_log.error("Error decrypting the payload");
throw new DataFormatException("Unable to decrypt the payload");
}
payload.setUnencryptedData(data);

View File

@ -735,7 +735,7 @@ public class DataHelper {
/** decompress the GZIP compressed data (returning null on error) */
public static byte[] decompress(byte orig[]) throws IOException {
return decompress(orig, 0, orig.length);
return (orig != null ? decompress(orig, 0, orig.length) : null);
}
public static byte[] decompress(byte orig[], int offset, int length) throws IOException {
if ((orig == null) || (orig.length <= 0)) return orig;

View File

@ -18,20 +18,30 @@ import net.i2p.I2PAppContext;
public class SimpleTimer {
private static final SimpleTimer _instance = new SimpleTimer();
public static SimpleTimer getInstance() { return _instance; }
private I2PAppContext _context;
private Log _log;
/** event time (Long) to event (TimedEvent) mapping */
private TreeMap _events;
/** event (TimedEvent) to event time (Long) mapping */
private Map _eventTimes;
private List _readyEvents;
private SimpleTimer() {
_log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
_context = I2PAppContext.getGlobalContext();
_log = _context.logManager().getLog(SimpleTimer.class);
_events = new TreeMap();
_eventTimes = new HashMap();
_readyEvents = new ArrayList(4);
I2PThread runner = new I2PThread(new SimpleTimerRunner());
runner.setName("SimpleTimer");
runner.setDaemon(true);
runner.start();
for (int i = 0; i < 3; i++) {
I2PThread executor = new I2PThread(new Executor());
executor.setName("SimpleTimerExecutor " + i);
executor.setDaemon(true);
executor.start();
}
}
/**
@ -40,18 +50,42 @@ public class SimpleTimer {
*/
public void addEvent(TimedEvent event, long timeoutMs) {
long eventTime = System.currentTimeMillis() + timeoutMs;
Long time = new Long(eventTime);
synchronized (_events) {
// remove the old scheduled position, then reinsert it
if (_eventTimes.containsKey(event))
_events.remove(_eventTimes.get(event));
while (_events.containsKey(new Long(eventTime)))
eventTime++;
_events.put(new Long(eventTime), event);
_eventTimes.put(event, new Long(eventTime));
while (_events.containsKey(time))
time = new Long(time.longValue() + 1);
_events.put(time, event);
_eventTimes.put(event, time);
if ( (_events.size() != _eventTimes.size()) ) {
_log.error("Skewed events: " + _events.size() + " for " + _eventTimes.size());
for (Iterator iter = _eventTimes.keySet().iterator(); iter.hasNext(); ) {
TimedEvent evt = (TimedEvent)iter.next();
Long when = (Long)_eventTimes.get(evt);
TimedEvent cur = (TimedEvent)_events.get(when);
if (cur != evt) {
_log.error("event " + evt + " @ " + when + ": " + cur);
}
}
}
_events.notifyAll();
}
}
public boolean removeEvent(TimedEvent evt) {
if (evt == null) return false;
synchronized (_events) {
Long when = (Long)_eventTimes.remove(evt);
if (when != null)
_events.remove(when);
return null != when;
}
}
/**
* Simple interface for events to be queued up and notified on expiration
*/
@ -82,8 +116,8 @@ public class SimpleTimer {
while (true) {
try {
synchronized (_events) {
if (_events.size() <= 0)
_events.wait();
//if (_events.size() <= 0)
// _events.wait();
//if (_events.size() > 100)
// _log.warn("> 100 events! " + _events.values());
long now = System.currentTimeMillis();
@ -97,7 +131,7 @@ public class SimpleTimer {
if (evt != null) {
_eventTimes.remove(evt);
eventsToFire.add(evt);
}
}
} else {
nextEventDelay = when.longValue() - now;
nextEvent = _events.get(when);
@ -128,32 +162,20 @@ public class SimpleTimer {
long now = System.currentTimeMillis();
now = now - (now % 1000);
for (int i = 0; i < eventsToFire.size(); i++) {
TimedEvent evt = (TimedEvent)eventsToFire.get(i);
try {
evt.timeReached();
} catch (Throwable t) {
log("wtf, event borked: " + evt, t);
}
_recentEvents[4] = _recentEvents[3];
_recentEvents[3] = _recentEvents[2];
_recentEvents[2] = _recentEvents[1];
_recentEvents[1] = _recentEvents[0];
_recentEvents[0] = evt;
synchronized (_readyEvents) {
for (int i = 0; i < eventsToFire.size(); i++)
_readyEvents.add(eventsToFire.get(i));
_readyEvents.notifyAll();
}
if (_occurredTime == now) {
_occurredEventCount += eventsToFire.size();
} else {
_occurredTime = now;
if (_occurredEventCount > 100) {
StringBuffer buf = new StringBuffer(256);
if (_occurredEventCount > 1000) {
StringBuffer buf = new StringBuffer(128);
buf.append("Too many simpleTimerJobs (").append(_occurredEventCount);
buf.append(") in a second! Last 5: \n");
for (int i = 0; i < _recentEvents.length; i++) {
if (_recentEvents[i] != null)
buf.append(_recentEvents[i]).append('\n');
}
buf.append(") in a second!");
_log.log(Log.CRIT, buf.toString());
}
_occurredEventCount = 0;
@ -163,4 +185,30 @@ public class SimpleTimer {
}
}
}
private class Executor implements Runnable {
public void run() {
while (true) {
TimedEvent evt = null;
synchronized (_readyEvents) {
if (_readyEvents.size() <= 0)
try { _readyEvents.wait(); } catch (InterruptedException ie) {}
if (_readyEvents.size() > 0)
evt = (TimedEvent)_readyEvents.remove(0);
}
if (evt != null) {
long before = _context.clock().now();
try {
evt.timeReached();
} catch (Throwable t) {
log("wtf, event borked: " + evt, t);
}
long time = _context.clock().now() - before;
if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
_log.warn("wtf, event execution took " + time + ": " + evt);
}
}
}
}
}