|                                                                                                              1
 24  package org.objectweb.joram.mom.dest.bridge;
 25
 26  import java.util.Enumeration
  ; 27  import java.util.Hashtable
  ; 28  import java.util.Properties
  ; 29  import java.util.Vector
  ; 30
 31  import org.objectweb.joram.mom.dest.TopicForwardNot;
 32  import org.objectweb.joram.mom.dest.TopicImpl;
 33  import org.objectweb.joram.mom.messages.Message;
 34  import org.objectweb.joram.mom.notifications.ClientMessages;
 35  import org.objectweb.joram.mom.notifications.SubscribeRequest;
 36  import org.objectweb.joram.mom.notifications.UnsubscribeRequest;
 37  import org.objectweb.joram.shared.JoramTracing;
 38  import org.objectweb.joram.shared.excepts.AccessException;
 39  import org.objectweb.util.monolog.api.BasicLevel;
 40
 41  import fr.dyade.aaa.agent.AgentId;
 42  import fr.dyade.aaa.agent.DeleteNot;
 43
 44
 53  public class BridgeTopicImpl extends TopicImpl {
 54
 55    private BridgeModule jmsModule;
 56
 57
 58    private long arrivalsCounter = 0;
 59
 60
 67    private Hashtable
  outTable; 68
 69
 76    public BridgeTopicImpl(AgentId destId, AgentId adminId, Properties
  prop) { 77      super(destId, adminId, prop);
 78      outTable = new Hashtable
  (); 79      jmsModule = new BridgeModule();
 80
 81          jmsModule.init(destId, prop);
 83    }
 84
 85    public String
  toString() { 86      return "BridgeTopicImpl:" + destId.toString();
 87    }
 88
 89
 93    public void bridgeDeliveryNot(AgentId from, BridgeDeliveryNot not) {
 94      ClientMessages clientMessages = new ClientMessages();
 95      clientMessages.addMessage(not.getMessage());
 96      super.doClientMessages(destId, clientMessages);
 97    }
 98
 99
 103   public void bridgeAckNot(BridgeAckNot not) {
 104     outTable.remove(not.getIdentifier());
 105   }
 106
 107
 115   public void postSubscribe(SubscribeRequest not) {
 116
 117         try {
 119       if (subscribers.size() == 1)
 120         jmsModule.setMessageListener();
 121     } catch (Exception
  exc) { 122       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
 123         JoramTracing.dbgDestination.log(BasicLevel.ERROR,
 124                                         "Failing subscribe request on remote destination: ", exc);
 125     }
 126   }
 127
 128
 136   public void preUnsubscribe(UnsubscribeRequest not) {
 137         if (subscribers.isEmpty())
 139       jmsModule.unsetMessageListener();
 140   }
 141
 142
 150   public void topicForwardNot(AgentId from, TopicForwardNot not) {
 151         if (not.toFather && fatherId != null)
 153       forward(fatherId, not);
 154
 155         Message message;
 157     for (Enumeration
  msgs = not.messages.getMessages().elements(); 158          msgs.hasMoreElements();) {
 159             message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement());
 161       message.order = arrivalsCounter++;
 162
 163       outTable.put(message.getIdentifier(), message);
 164
 165       try {
 166         jmsModule.send(message.msg);
 167       } catch (Exception
  exc) { 168         outTable.remove(message.getIdentifier());
 169         ClientMessages deadM;
 170         deadM = new ClientMessages();
 171         deadM.addMessage(message.msg);
 172         sendToDMQ(deadM, null);
 173       }
 174     }
 175   }
 176
 177
 184   public ClientMessages preProcess(AgentId from, ClientMessages not) {
 185     if (destId.equals(from))
 186       return not;
 187
 188         forwardMessages(not);
 190
 191         Message message;
 193     for (Enumeration
  msgs = not.getMessages().elements(); 194          msgs.hasMoreElements();) {
 195       message = new Message((org.objectweb.joram.shared.messages.Message) msgs.nextElement());
 196       message.order = arrivalsCounter++;
 197
 198       outTable.put(message.getIdentifier(), message);
 199
 200       try {
 201         jmsModule.send(message.msg);
 202       } catch (Exception
  exc) { 203         outTable.remove(message.getIdentifier());
 204         ClientMessages deadM;
 205         deadM = new ClientMessages(not.getClientContext(), not.getRequestId());
 206         deadM.addMessage(message.msg);
 207         sendToDMQ(deadM, not.getDMQId());
 208       }
 209     }
 210     return null;
 211   }
 212
 213
 220   protected void doDeleteNot(DeleteNot not) {
 221     jmsModule.close();
 222     super.doDeleteNot(not);
 223   }
 224
 225
 226   private void readObject(java.io.ObjectInputStream
  in) 227                throws java.io.IOException
  , ClassNotFoundException  { 228     in.defaultReadObject();
 229
 230         try {
 232       jmsModule.connect();
 233
 234       if (! subscribers.isEmpty())
 235         jmsModule.setMessageListener();
 236
 237             Message momMsg;
 239       Vector
  outMessages = new Vector  (); 240       Message currentMsg;
 241       for (Enumeration
  keys = outTable.keys(); keys.hasMoreElements();) { 242         momMsg = (Message) outTable.get(keys.nextElement());
 243
 244         int i = 0;
 245         while (i < outMessages.size()) {
 246           currentMsg = (Message) outMessages.get(i);
 247
 248           if (momMsg.order < currentMsg.order)
 249             break;
 250
 251           i++;
 252         }
 253         outMessages.insertElementAt(momMsg, i);
 254       }
 255
 256       while (! outMessages.isEmpty()) {
 257         momMsg = (Message) outMessages.remove(0);
 258         jmsModule.send(momMsg.msg);
 259       }
 260     }
 261     catch (Exception
  exc) { 262       if (JoramTracing.dbgDestination.isLoggable(BasicLevel.ERROR))
 263         JoramTracing.dbgDestination.log(BasicLevel.ERROR, "", exc);
 264     }
 265   }
 266 }
 267
                                                                                                                                                                                                             |                                                                       
 
 
 
 
 
                                                                                   Popular Tags                                                                                                                                                                                              |