1 23 package org.objectweb.joram.client.jms.connection; 24 25 import java.util.Timer ; 26 import java.util.Vector ; 27 28 import org.objectweb.joram.shared.client.AbstractJmsReply; 29 import org.objectweb.joram.shared.client.AbstractJmsRequest; 30 import org.objectweb.joram.shared.client.JmsRequestGroup; 31 32 36 public class MultiThreadSyncChannel implements RequestChannel { 37 40 private SyncRound currentRound; 41 42 45 private Vector syncRequests; 46 47 52 private int multiThreadSyncDelay; 53 54 57 private int multiThreadSyncThreshold; 58 59 60 private RequestChannel channel; 61 62 MultiThreadSyncChannel(RequestChannel rc, int delay, int threshold) { 63 channel = rc; 64 multiThreadSyncDelay = delay; 65 multiThreadSyncThreshold = threshold; 66 currentRound = new SyncRound(); 67 syncRequests = new Vector (); 68 } 69 70 public synchronized void send(AbstractJmsRequest request) throws Exception { 71 SyncRound localRound = currentRound; 72 syncRequests.addElement(request); 73 if (syncRequests.size() < multiThreadSyncThreshold) { 74 try { 75 wait(multiThreadSyncDelay); 76 } catch (InterruptedException ie) { 77 } 78 } 79 if (!localRound.done) { 80 AbstractJmsRequest[] requests = 82 new AbstractJmsRequest[syncRequests.size()]; 83 syncRequests.copyInto(requests); 84 syncRequests.clear(); 85 localRound.done = true; 86 currentRound = new SyncRound(); 87 channel.send(new JmsRequestGroup(requests)); 88 notifyAll(); 89 } 90 } 92 93 96 public void setTimer(Timer timer) { 97 channel.setTimer(timer); 98 } 99 100 103 public void connect() throws Exception { 104 channel.connect(); 105 } 106 107 110 public AbstractJmsReply receive() throws Exception { 111 return channel.receive(); 112 } 113 114 117 public void close() { 118 channel.close(); 119 } 120 121 private static class SyncRound { 122 private volatile boolean done = false; 123 } 124 } 125 | Popular Tags |