1 21 22 package com.rift.coad.daemon.messageservice.rpc; 24 25 import java.util.List ; 27 import java.lang.reflect.InvocationHandler ; 28 import java.lang.reflect.Method ; 29 import javax.naming.Context ; 30 import javax.naming.InitialContext ; 31 32 import org.apache.log4j.Logger; 34 35 import com.rift.coad.daemon.messageservice.Producer; 37 import com.rift.coad.daemon.messageservice.MessageProducer; 38 import com.rift.coad.daemon.messageservice.RPCMessage; 39 import com.rift.coad.daemon.messageservice.Message; 40 import com.rift.coad.util.connection.ConnectionManager; 41 42 47 public class RPCMessageHandler implements InvocationHandler { 48 49 protected Logger log = 51 Logger.getLogger(RPCMessageHandler.class.getName()); 52 53 private Class targetInterface = null; 55 private Producer producer = null; 56 private String targetURL = null; 57 private String [] services = null; 58 private boolean reply = true; 59 private boolean broadcast = false; 60 private String correlationId = null; 61 private boolean usedCorrelationId = false; 62 63 72 public RPCMessageHandler(String from, Class targetInterface, 73 String targetURL, boolean reply) throws RPCMessageClientException { 74 try { 75 this.targetInterface = targetInterface; 76 MessageProducer messageProducer = (MessageProducer)ConnectionManager 77 .getInstance().getConnection(MessageProducer.class, 78 MessageProducer.JNDI_URL); 79 producer = messageProducer.createProducer(from); 80 this.targetURL = targetURL; 81 this.reply = reply; 82 } catch (Exception ex) { 83 throw new RPCMessageClientException( 84 "Failed to init the message handler : " + ex.getMessage(), 85 ex); 86 } 87 } 88 89 90 101 public RPCMessageHandler(String from, Class targetInterface, 102 List services, boolean broadcast, boolean reply) throws 103 RPCMessageClientException { 104 try { 105 this.targetInterface = targetInterface; 106 MessageProducer messageProducer = (MessageProducer)ConnectionManager 107 .getInstance().getConnection(MessageProducer.class, 108 MessageProducer.JNDI_URL); 109 producer = messageProducer.createProducer(from); 110 copyServices(services); 111 this.broadcast = broadcast; 112 this.reply = reply; 113 } catch (Exception ex) { 114 throw new RPCMessageClientException( 115 "Failed to init the message handler : " + ex.getMessage(), 116 ex); 117 } 118 } 119 120 121 130 public RPCMessageHandler(String from, Class targetInterface, 131 String targetURL, String correlationId) throws 132 RPCMessageClientException { 133 try { 134 this.targetInterface = targetInterface; 135 MessageProducer messageProducer = (MessageProducer)ConnectionManager 136 .getInstance().getConnection(MessageProducer.class, 137 MessageProducer.JNDI_URL); 138 producer = messageProducer.createProducer(from); 139 this.targetURL = targetURL; 140 this.correlationId = correlationId; 141 } catch (Exception ex) { 142 throw new RPCMessageClientException( 143 "Failed to init the message handler : " + ex.getMessage(), 144 ex); 145 } 146 } 147 148 149 160 public RPCMessageHandler(String from, Class targetInterface, 161 List services, boolean broadcast, String correlationId) throws 162 RPCMessageClientException { 163 try { 164 this.targetInterface = targetInterface; 165 MessageProducer messageProducer = (MessageProducer)ConnectionManager 166 .getInstance().getConnection(MessageProducer.class, 167 MessageProducer.JNDI_URL); 168 producer = messageProducer.createProducer(from); 169 copyServices(services); 170 this.broadcast = broadcast; 171 this.correlationId = correlationId; 172 } catch (Exception ex) { 173 throw new RPCMessageClientException( 174 "Failed to init the message handler : " + ex.getMessage(), 175 ex); 176 } 177 } 178 179 180 190 public RPCMessageHandler(InitialContext context, String jndiURL, 191 String from, Class targetInterface, String targetURL) 192 throws RPCMessageClientException { 193 try { 194 this.targetInterface = targetInterface; 195 MessageProducer messageProducer = (MessageProducer)ConnectionManager 196 .getInstance(context).getConnection(MessageProducer.class, 197 jndiURL); 198 producer = messageProducer.createProducer(from); 199 this.targetURL = targetURL; 200 this.reply = false; 201 } catch (Exception ex) { 202 throw new RPCMessageClientException( 203 "Failed to init the message handler : " + ex.getMessage(), 204 ex); 205 } 206 } 207 208 209 221 public RPCMessageHandler(InitialContext context, String jndiURL, 222 String from, Class targetInterface, List services, boolean broadcast) 223 throws RPCMessageClientException { 224 try { 225 this.targetInterface = targetInterface; 226 MessageProducer messageProducer = (MessageProducer)ConnectionManager 227 .getInstance(context).getConnection(MessageProducer.class, 228 jndiURL); 229 producer = messageProducer.createProducer(from); 230 copyServices(services); 231 this.reply = false; 232 this.broadcast = broadcast; 233 } catch (Exception ex) { 234 throw new RPCMessageClientException( 235 "Failed to init the message handler : " + ex.getMessage(), 236 ex); 237 } 238 } 239 240 241 250 public Object invoke(Object proxy, Method method, Object [] args) throws 251 Throwable { 252 if (correlationId != null && usedCorrelationId) { 253 throw new RPCMessageClientException( 254 "The correlation Id has been used cannot re-use this " + 255 "object."); 256 } 257 usedCorrelationId = true; 258 try { 259 RPCMessage message = null; 260 if (targetURL != null) { 261 message = producer.createRPCMessage(Message.POINT_TO_POINT); 262 message.setTarget(targetURL); 263 } else if (broadcast == false) { 264 message = producer.createRPCMessage(Message.POINT_TO_SERVICE); 265 message.setServices(this.services); 266 } else { 267 message = producer.createRPCMessage( 268 Message.POINT_TO_MULTI_SERVICE); 269 message.setServices(this.services); 270 } 271 message.setReply(reply); 272 message.setCorrelationId(correlationId); 273 274 Method targetMethod = this.targetInterface.getMethod( 275 method.getName(),method.getParameterTypes()); 276 message.defineMethod(targetMethod.getReturnType(), 277 method.getName(),method.getParameterTypes()); 278 message.setArguments(args); 279 280 producer.submit(message); 281 return message.getMessageId(); 282 } catch (Exception ex) { 283 log.error("Failed to setup the RPC message because : " + 284 ex.getMessage(),ex); 285 throw new RPCMessageClientException( 286 "Failed to setup the RPC message because : " + 287 ex.getMessage(),ex); 288 } 289 } 290 291 292 295 private void copyServices(List serviceList) { 296 services = new String [serviceList.size()]; 297 for (int index = 0; index < serviceList.size(); index++) { 298 services[index] = (String )serviceList.get(index); 299 } 300 } 301 302 } 303 | Popular Tags |