1 7 package org.jboss.jms.serverless; 8 9 import org.jboss.logging.Logger; 10 import javax.jms.Connection ; 11 import javax.jms.JMSException ; 12 import javax.jms.ConnectionMetaData ; 13 import javax.jms.ExceptionListener ; 14 import javax.jms.ConnectionConsumer ; 15 import javax.jms.ServerSessionPool ; 16 import javax.jms.Destination ; 17 import javax.jms.Topic ; 18 import javax.jms.Session ; 19 import org.jgroups.JChannel; 20 import org.jgroups.ChannelListener; 21 import org.jgroups.Channel; 22 import org.jgroups.Address; 23 import org.jgroups.ChannelException; 24 import java.io.Serializable ; 25 import java.net.URL ; 26 import javax.jms.Queue ; 27 import org.jgroups.SetStateEvent; 28 import org.jgroups.util.Util; 29 import org.jgroups.GetStateEvent; 30 import org.jgroups.View; 31 import org.jgroups.SuspectEvent; 32 import org.jgroups.ChannelClosedException; 33 import org.jgroups.ChannelNotConnectedException; 34 35 44 class GroupConnection implements Connection , Runnable { 45 46 private static final Logger log = Logger.getLogger(GroupConnection.class); 47 48 private static final String DEFAULT_SERVER_GROUP_NAME = "serverGroup"; 49 50 private URL serverChannelConfigURL; 51 52 private SessionManager sessionManager; 53 private org.jgroups.util.Queue deliveryQueue; 54 private ConnectionState connState; 55 56 private GroupState groupState; 58 private Thread connManagementThread; 59 private JChannel serverChannel; 60 61 66 GroupConnection(URL serverChannelConfigURL) { 67 68 this.serverChannelConfigURL = serverChannelConfigURL; 69 70 deliveryQueue = new org.jgroups.util.Queue(); 71 sessionManager = new SessionManager(this, deliveryQueue); 72 groupState = new GroupState(); 73 connManagementThread = new Thread (this, "Connection Management Thread"); 74 connState = new ConnectionState(); 75 76 } 77 78 82 void connect() throws JMSException { 83 84 86 try { 87 88 serverChannel = new JChannel(serverChannelConfigURL); 89 serverChannel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE); 90 serverChannel.setChannelListener(new ChannelListener() { 91 92 public void channelClosed(Channel channel) { 93 log.debug("channelClosed("+channel+")"); 94 } 95 96 public void channelConnected(Channel channel) { 97 log.debug("channelConnected() to group ["+ 98 channel.getChannelName()+"]"); 99 } 100 101 public void channelDisconnected(Channel channel) { 102 log.debug("channelDisconnected("+channel+")"); 103 } 104 105 public void channelReconnected(Address addr) { 106 log.debug("channelReconnected("+addr+")"); 107 } 108 109 public void channelShunned() { 110 log.debug("channelShunned()"); 111 } 112 }); 113 114 log.debug("channel created"); 115 serverChannel.connect(DEFAULT_SERVER_GROUP_NAME); 116 log.debug("channel connected"); 117 connState.setStopped(); 118 connManagementThread.start(); 119 log.debug("Connection Management Thread started"); 120 boolean getStateOK = serverChannel.getState(null, 0); 121 log.debug("getState(): "+getStateOK); 122 } 123 catch(ChannelException e) { 124 String msg = "Failed to create an active connection"; 125 log.error(msg, e); 126 JMSException jmse = new JMSException (msg); 127 jmse.setLinkedException(e); 128 throw jmse; 129 } 130 } 131 132 133 void send(javax.jms.Message m) throws JMSException { 135 136 try { 137 if (m.getJMSDestination() instanceof Topic ) { 139 serverChannel.send(null, null, (Serializable )m); 141 } 142 else { 143 145 org.jgroups.Message jgmsg = 148 new org.jgroups.Message((Address)serverChannel.getView().getMembers().get(0), 149 null, new QueueCarrier(m)); 150 serverChannel.send(jgmsg); 151 } 152 } 153 catch(Exception e) { 154 String msg = "Failed to send message"; 155 log.error(msg, e); 156 JMSException jmse = new JMSException (msg); 157 jmse.setLinkedException(e); 158 throw jmse; 159 } 160 161 } 162 163 167 171 public void run() { 172 173 Object incoming = null; 174 175 while(true) { 176 177 try { 178 incoming = serverChannel.receive(0); 179 } 180 catch(ChannelClosedException e) { 181 log.debug("Channel closed, exiting"); 182 break; 183 } 184 catch(ChannelNotConnectedException e) { 185 log.warn("TO_DO: Channel not connected, I should block the thread ..."); 186 continue; 187 } 188 catch(Exception e) { 189 log.error("Failed to synchronously read from the channel", e); 191 } 192 193 try { 194 dispatch(incoming); 195 } 196 catch(Exception e) { 197 log.error("Dispatching failed", e); 201 } 202 } 203 } 204 205 209 private void dispatch(Object o) throws Exception { 210 211 log.debug("dispatching "+o); 212 213 if (o instanceof SetStateEvent) { 214 byte[] buffer = ((SetStateEvent)o).getArg(); 215 if (buffer == null) { 216 log.debug("null group state, ignoring ..."); 218 } 219 else { 220 groupState.fromByteBuffer(buffer); 222 } 223 return; 224 } 225 else if (o instanceof GetStateEvent) { 226 serverChannel.returnState(groupState.toByteBuffer()); 228 return; 229 } 230 else if (o instanceof View) { 231 return; 233 } 234 else if (o instanceof SuspectEvent) { 235 return; 237 } 238 else if (!(o instanceof org.jgroups.Message)) { 239 log.warn("Ignoring "+o); 241 return; 242 } 243 244 org.jgroups.Message jgmsg = (org.jgroups.Message)o; 245 Object payload = jgmsg.getObject(); 246 if (payload instanceof ServerAdminCommand) { 247 handleServerAdminCommand(jgmsg.getSrc(), (ServerAdminCommand)payload); 249 } 250 else if (payload instanceof QueueCarrier) { 251 QueueCarrier qc = (QueueCarrier)payload; 252 String sessionID = qc.getSessionID(); 253 if (sessionID == null) { 257 queueForward(qc); 258 } 259 else { 260 deliveryQueue.add(qc); 261 } 262 } 263 else if (payload instanceof javax.jms.Message ) { 264 if (connState.isStarted()) { 266 deliveryQueue.add((javax.jms.Message )payload); 267 } 268 } 269 else { 270 log.warn("JG Message with a payload something else than a JMS Message: "+ 271 (payload == null ? "null" : payload.getClass().getName())); 272 } 273 } 274 275 private void handleServerAdminCommand(Address src, ServerAdminCommand c) { 276 String comm = c.getCommand(); 278 if (ServerAdminCommand.ADD_QUEUE_RECEIVER.equals(comm)) { 279 String queueName = (String )c.get(0); 280 String sessionID = (String )c.get(1); 281 String queueReceiverID = (String )c.get(2); 282 groupState.addQueueReceiver(queueName, src, sessionID, queueReceiverID); 283 } 284 else if (ServerAdminCommand.REMOVE_QUEUE_RECEIVER.equals(comm)) { 285 String queueName = (String )c.get(0); 286 String sessionID = (String )c.get(1); 287 String queueReceiverID = (String )c.get(2); 288 groupState.removeQueueReceiver(queueName, src, sessionID, queueReceiverID); 289 } 290 else { 291 log.error("Unknown server administration command: "+comm); 292 } 293 } 294 295 void advertiseQueueReceiver(String queueName, String sessionID, 296 String queueReceiverID, boolean isOn) throws ProviderException { 297 298 try { 299 String cs = isOn ? 301 ServerAdminCommand.ADD_QUEUE_RECEIVER : 302 ServerAdminCommand.REMOVE_QUEUE_RECEIVER; 303 ServerAdminCommand comm = 304 new ServerAdminCommand(cs, queueName, sessionID, queueReceiverID); 305 serverChannel.send(null, null, comm); 306 } 307 catch(ChannelException e) { 308 throw new ProviderException("Failed to advertise the queue receiver", e); 309 } 310 } 311 312 private void queueForward(QueueCarrier qc) throws Exception { 313 314 Queue destQueue = (Queue )qc.getJMSMessage().getJMSDestination(); 315 QueueReceiverAddress ra = groupState.selectReceiver(destQueue.getQueueName()); 316 if (ra == null) { 317 log.warn("Discarding message for queue "+destQueue.getQueueName()+"!"); 319 return; 320 } 321 Address destAddress = ra.getAddress(); 322 qc.setSessionID(ra.getSessionID()); 323 qc.setReceiverID(ra.getReceiverID()); 324 325 serverChannel.send(destAddress, null, qc); 327 328 } 329 330 334 public void start() throws JMSException { 335 336 if (connState.isStarted()) { 340 return; 341 } 342 synchronized(connState) { 343 connState.setStarted(); 344 connState.notify(); 345 } 346 347 } 348 349 public void stop() throws JMSException { 350 351 connState.setStopped(); 353 } 354 355 public void close() throws JMSException { 356 357 if (connState.isClosed()) { 360 return; 361 } 362 connState.setClosed(); 363 serverChannel.close(); 364 365 } 366 367 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 368 369 return sessionManager.createSession(transacted, acknowledgeMode); 370 371 } 372 373 public String getClientID() throws JMSException { 374 throw new NotImplementedException(); 375 } 376 377 public void setClientID(String clientID) throws JMSException { 378 379 String msg = "ClientID ("+""+") cannot be modified"; 383 throw new IllegalStateException (msg); 384 } 385 386 public ConnectionMetaData getMetaData() throws JMSException { 387 throw new NotImplementedException(); 388 } 389 390 public ExceptionListener getExceptionListener() throws JMSException { 391 throw new NotImplementedException(); 392 } 393 394 public void setExceptionListener(ExceptionListener listener) throws JMSException { 395 throw new NotImplementedException(); 396 } 397 398 public ConnectionConsumer createConnectionConsumer(Destination destination, 399 String messageSelector, 400 ServerSessionPool sessionPool, 401 int maxMessages) 402 throws JMSException { 403 throw new NotImplementedException(); 404 } 405 406 407 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 408 String subscriptionName, 409 String messageSelector, 410 ServerSessionPool sessionPool, 411 int maxMessages) 412 throws JMSException { 413 throw new NotImplementedException(); 414 } 415 416 420 421 424 public static void main(String [] args) throws Exception { 425 426 GroupConnection c = new GroupConnection(new URL (args[0])); 427 c.connect(); 428 } 429 430 431 432 } 433 | Popular Tags |