1 25 26 package org.objectweb.petals.jbi.transport; 27 28 import javax.jbi.JBIException; 29 import javax.jms.Message ; 30 import javax.jms.MessageListener ; 31 import javax.jms.ObjectMessage ; 32 import javax.naming.InitialContext ; 33 34 import org.objectweb.fractal.fraclet.annotation.FractalComponent; 35 import org.objectweb.fractal.fraclet.annotation.Interface; 36 import org.objectweb.fractal.fraclet.annotation.LifeCycle; 37 import org.objectweb.fractal.fraclet.annotation.LifeCycleType; 38 import org.objectweb.fractal.fraclet.annotation.Provides; 39 import org.objectweb.fractal.fraclet.annotation.Requires; 40 import org.objectweb.petals.jbi.messaging.MessageExchangeImpl; 41 import org.objectweb.petals.jbi.routing.Router; 42 import org.objectweb.petals.jbi.routing.RoutingException; 43 import org.objectweb.petals.util.LoggingUtil; 44 import org.objectweb.petals.util.PropertyUtil; 45 import org.objectweb.petals.util.SystemUtil; 46 47 63 @FractalComponent 64 @Provides(interfaces = @Interface(name = "service", signature = org.objectweb.petals.jbi.transport.Transporter.class)) 65 public class JoramTransporter implements Transporter { 66 67 protected String host; 68 69 protected int id; 70 71 protected JoramConnection joramConnection; 72 73 protected JoramAgent joramAgent; 74 75 protected Serializer serializer = new ObjectSerializer(); 76 77 protected LoggingUtil log; 78 79 protected String pwd; 80 81 @Requires(name = "router", signature = org.objectweb.petals.jbi.routing.Router.class) 82 protected Router router; 83 84 protected int tcp; 85 86 protected String user; 87 88 94 public void send(MessageExchangeImpl messageExchange, String containerName, 95 long timeToLive) throws TransportException { 96 97 if (id == Integer.parseInt(containerName)) { 98 sendLocal(messageExchange); 99 100 } else { 101 Object noPersist = messageExchange 103 .getProperty(PROPERTY_NOPERSISTANCE); 104 boolean persist = (noPersist == null || !noPersist.toString() 105 .toLowerCase().equals("true")); 106 sendDistant(messageExchange, containerName, timeToLive, persist); 107 } 108 } 109 110 117 protected void startJMS() throws Exception { 118 log.start(); 119 120 joramAgent = new JoramAgent(log); 122 joramAgent.startServer(); 123 124 id = Integer.parseInt(SystemUtil.getContainerName()); 127 128 tcp = Integer.parseInt(SystemUtil.getJoramTCPPort()); 129 user = SystemUtil.getJoramUser(); 130 pwd = SystemUtil.getJoramPassword(); 131 host = SystemUtil.getHost(); 132 133 InitialContext context = new javax.naming.InitialContext (PropertyUtil 135 .retrieveJNDIProperties()); 136 137 joramConnection = new JoramConnection(id, tcp, user, pwd, host, context); 138 139 log.end(); 140 } 141 142 public void startListening() throws TransportException { 143 log.start(); 144 145 MessageListener aML = new MessageListener () { 146 public void onMessage(Message aMessage) { 147 processIncomingMessage(aMessage); 148 } 149 }; 150 try { 151 joramConnection.start(aML); 152 } catch (Exception e) { 153 throw new TransportException(e); 154 } 155 156 log.end(); 157 } 158 159 172 protected void processIncomingMessage(Message msg) { 173 log.start(); 174 if (msg instanceof ObjectMessage ) { 175 try { 176 MessageExchangeImpl jbiMsg = serializer.jms2jbi(msg); 177 178 router.receive(jbiMsg); 180 181 msg.acknowledge(); 183 } catch (Exception e) { 184 log.error(e.getMessage(), e); 185 } 186 } else { 187 log.error("the received JMS message is not an objectmessage"); 188 } 189 log.end(); 190 } 191 192 208 protected void sendDistant(MessageExchangeImpl messageExchange, 209 String containerName, long timeToLive, boolean persist) 210 throws TransportException { 211 log.start(); 212 try { 213 joramConnection.sendTo(containerName, messageExchange, timeToLive, 214 persist); 215 } catch (TransportException e) { 216 log.error(e.getMessage(), e); 217 throw e; 218 } 219 log.end(); 220 } 221 222 229 @LifeCycle(on = LifeCycleType.START) 230 protected void start() throws Exception { 231 log = new LoggingUtil(null); 232 log.start(); 233 234 joramAgent = new JoramAgent(log); 236 joramAgent.startServer(); 237 238 id = Integer.parseInt(SystemUtil.getContainerName()); 241 242 tcp = Integer.parseInt(SystemUtil.getJoramTCPPort()); 243 user = SystemUtil.getJoramUser(); 244 pwd = SystemUtil.getJoramPassword(); 245 host = SystemUtil.getHost(); 246 247 InitialContext context = new javax.naming.InitialContext (PropertyUtil 249 .retrieveJNDIProperties()); 250 251 joramConnection = new JoramConnection(id, tcp, user, pwd, host, context); 252 253 log.end(); 254 } 255 256 264 protected void sendLocal(MessageExchangeImpl messageExchange) 265 throws TransportException { 266 log.start(); 267 try { 268 router.receive(messageExchange); 269 } catch (RoutingException e) { 270 String msg = "Can not send locally the MessageExchange to the local Router."; 271 log.error(msg, e); 272 throw new TransportException(msg, e); 273 } 274 log.end(); 275 } 276 277 282 @LifeCycle(on = LifeCycleType.STOP) 283 public void stop() throws TransportException { 284 log.start(); 285 try { 286 joramConnection.stop(); 287 joramAgent.stopServer(); 288 } catch (Exception e) { 289 String msg = "Can not stop the JMS connection."; 290 log.error(msg, e); 291 throw new TransportException(msg, e); 292 } 293 log.end(); 294 } 295 296 300 public void shutdown() throws TransportException { 301 try { 302 joramConnection.unsubscribe(); 303 } catch (Exception e) { 304 String msg = "Can not unreference the Transporter from the Registry."; 305 log.error(msg, e); 306 throw new TransportException(msg, e); 307 } 308 try { 309 joramAgent.shutdown(); 310 } catch (Exception e) { 311 String msg = "Can not shutdown correctly the Joram server."; 312 log.error(msg, e); 313 throw new TransportException(msg, e); 314 } 315 316 } 317 } | Popular Tags |