1 46 package org.mr.api.jms; 47 48 import org.apache.commons.logging.Log; 49 import org.apache.commons.logging.LogFactory; 50 import java.util.ArrayList ; 51 import java.util.Hashtable ; 52 import java.util.Iterator ; 53 54 import javax.jms.*; 55 import javax.jms.IllegalStateException ; 56 57 import org.mr.kernel.security.MantaAuthentication; 58 import org.mr.kernel.security.MantaAuthorization; 59 import org.mr.kernel.security.SecurityActionTypes; 60 import org.mr.kernel.security.SessionID; 61 import org.mr.kernel.services.MantaService; 62 import org.mr.kernel.services.ServiceProducer; 63 import org.mr.kernel.services.queues.VirtualQueuesManager; 64 import org.mr.kernel.services.topics.VirtualTopicManager; 65 import org.mr.MantaAgent; 66 import org.mr.MantaException; 67 import org.mr.core.protocol.MantaBusMessage; 68 import org.mr.api.jms.selector.SelectorManager; 69 import org.mr.core.protocol.MantaBusMessageConsts; 70 71 110 111 public class MantaConnection implements Connection, QueueConnection, TopicConnection { 112 public Log log; 113 114 122 MantaConnection(MantaConnectionFactory factory) throws JMSException { 123 this(factory, MantaConnection.DEF_USER, MantaConnection.DEF_PASSWORD); 124 } 126 135 MantaConnection(MantaConnectionFactory factory, String userName, 136 String password) throws JMSException { 137 log = LogFactory.getLog("MantaConnection"); 138 if (factory == null) 140 throw new JMSException("MNJMS000BA : FAILED TO CREATE CONNECTION. FACTORY IS NULL."); 141 this.theFactory = factory; 142 this.isStarted = false; 143 this.isOpened = true; 144 this.mantaConnectionMetaData = new MantaConnectionMetaData(); 145 this.userName = userName; 146 this.password = password; 147 this.messageChannel = factory.getChannel(); 148 MantaAuthentication ma = messageChannel.getSingletonRepository().getMantaAuthentication(); 150 if(ma !=null){ 151 this.securitySessionId = ma.authenticate(userName,password ); 152 this.authorizator = messageChannel.getSingletonRepository().getMantaAuthorization(); 153 } 154 this.clientId = messageChannel.getMessageId(); 155 this.msgSelectorManager = SelectorManager.getInstance(); 156 this.messageChannel.getSingletonRepository().getSelectorsManager() 157 .setSelector(MantaBusMessageConsts.PAYLOAD_TYPE_JMS, 158 msgSelectorManager); 159 this.tempTopics = new Hashtable (); 161 this.tempQueues = new Hashtable (); 162 this.createdSessions = new ArrayList (); 163 } 165 181 public Session createSession(boolean transacted, int acknowledgeMode) 182 throws JMSException { 183 184 checkLegalOperation(); 185 186 MantaSession sess = new MantaSession(messageChannel.getMessageId(),this, 187 acknowledgeMode,transacted); 188 189 addSession(sess); 190 return sess; 191 } 193 196 public QueueSession createQueueSession(boolean trx, int acknowledgeMode) throws JMSException { 197 checkLegalOperation(); 198 199 MantaQueueSession sess = new MantaQueueSession(messageChannel.getMessageId(),this, 200 trx,acknowledgeMode); 201 addSession(sess); 202 return sess; 203 } 204 205 208 public TopicSession createTopicSession (boolean trx, int acknowledgeMode) throws JMSException{ 209 210 checkLegalOperation(); 211 212 MantaTopicSession sess = new MantaTopicSession(messageChannel.getMessageId(),this, 213 trx,acknowledgeMode); 214 215 addSession(sess); 216 return sess; 217 } 218 219 224 public String getClientID() throws JMSException { 225 226 checkLegalOperation(); 227 return clientId; 228 } 230 239 public void setClientID(String clientID) throws JMSException { 240 checkLegalOperation(); 241 if (this.getClientID() != null) 242 throw new IllegalStateException ("MNJMS00026 : METHOD setClientID() FAILED. SET BY SYSTEM."); 243 244 } 246 251 public ConnectionMetaData getMetaData() throws JMSException { 252 checkLegalOperation(); 253 254 return mantaConnectionMetaData; 255 } 257 265 public ExceptionListener getExceptionListener() throws JMSException { 266 checkLegalOperation(); 267 return this.abnormalExceptionListener; 268 } 270 276 public void setExceptionListener(ExceptionListener listener) 277 throws JMSException { 278 checkLegalOperation(); 279 this.abnormalExceptionListener = listener; 280 } 282 286 protected void notifyListener(JMSException jmse) { 287 if (this.abnormalExceptionListener != null && jmse != null) 288 this.abnormalExceptionListener.onException(jmse); 289 } 290 291 297 public synchronized void start() throws JMSException { 298 checkLegalOperation(); 299 if (isStarted()) 300 return; 301 302 try { 303 synchronized (createdSessions) { 304 Iterator sessionsToNotify = createdSessions.iterator(); 305 while (sessionsToNotify.hasNext()) { 306 MantaSession session = (MantaSession) sessionsToNotify.next(); 307 try { 308 session.start(); 309 } catch (JMSException e) { 310 log.error("An error occured while starting a session. ", e); 311 } 312 } 313 } 314 } 315 finally { 316 isStarted = true; 317 } 318 } 320 327 public synchronized void stop() throws JMSException { 328 checkLegalOperation(); 329 if (!isStarted()) 330 return; 331 332 try { 333 synchronized (createdSessions) { 335 Iterator sessionsToNotify = createdSessions.iterator(); 336 while (sessionsToNotify.hasNext()) { 337 MantaSession session = (MantaSession) sessionsToNotify.next(); 338 session.stop(); 339 } 340 } 341 } 342 finally { 343 isStarted = false; 344 } 345 } 347 361 362 public synchronized void close() throws JMSException { 363 if (!isOpened) 364 return; 365 366 try { 367 cleanup(); 368 MantaAuthentication ma = messageChannel.getSingletonRepository().getMantaAuthentication(); 369 if(ma!=null){ 370 ma.logout(securitySessionId); 371 } 372 373 this.messageChannel = null; 375 this.theFactory.removeConnection(this); 378 this.theFactory = null; 379 this.tempTopics = null; 380 this.tempQueues = null; 381 } catch (Exception e) { 382 log.error("Connection.close() failed",e); 383 throw new JMSException("MNJMS00027 : FAILED ON CONNECTION close() METHOD. ERROR TEXT : "+e.getMessage()); 384 } 385 finally { 386 isOpened = false; 387 388 } 389 } 391 public synchronized void cleanup() throws JMSException { 393 try { 394 stop(); 396 397 synchronized (createdSessions) { 399 ArrayList sessionsForIterator = new ArrayList (createdSessions); 400 Iterator sessionsToNotify = sessionsForIterator.iterator(); 401 while (sessionsToNotify.hasNext()) { 402 MantaSession session = (MantaSession) sessionsToNotify.next(); 403 session.close(); 404 } 406 } 407 408 Iterator it = tempTopics.values().iterator(); 410 VirtualTopicManager topicsManager = MantaAgent.getInstance().getSingletonRepository().getVirtualTopicManager(); 411 while (it.hasNext()) { 412 topicsManager.closeTopic(it.next().toString()); 413 } 414 415 VirtualQueuesManager queuesManager = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager(); 417 it = tempQueues.values().iterator(); 418 while (it.hasNext()) { 419 queuesManager.closeQueue(it.next().toString()); 420 } 421 } 422 catch (Exception e) { 423 log.error("An Error occured during connection cleanup.", e); 424 throw new JMSException("MNJMS00028 : FAILED ON CONNECTION cleanup() METHOD. ERROR TEXT : "+e.getMessage()); 425 } 426 } 427 428 429 435 436 public ConnectionConsumer createConnectionConsumer(Destination destination, 437 String messageSelector, ServerSessionPool sessionPool, 438 int maxMessages) throws JMSException { 439 440 checkLegalOperation(); 441 if(destination instanceof Queue){ 442 authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_QUEUE,destination.toString()); 443 444 }else{ 445 authorize(SecurityActionTypes.ACTION_CREATE_PRODUCER_FOR_TOPIC,destination.toString()); 446 } 447 448 MantaConnectionConsumer corCc = new MantaConnectionConsumer( 449 this, destination, messageSelector, sessionPool, maxMessages); 450 return corCc; 451 452 } 454 457 void ack(MantaBusMessage msg) throws JMSException { 458 checkLegalOperation(); 459 this.messageChannel.ack(msg); 460 } 461 462 470 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 471 String subscriptionName, String messageSelector, 472 ServerSessionPool sessionPool, int maxMessages) throws JMSException { 473 checkLegalOperation(); 474 return createConnectionConsumer(topic, messageSelector, sessionPool, 475 maxMessages); 476 } 478 486 protected TemporaryQueue addTempQueue() throws JMSException { 487 checkLegalOperation(); 488 String name = MantaConnection.TMP_QUEUE_PREFIX + "{" 489 + getChannel().getAgentName() + "}" + "[" + this.getClientID() 490 + "]" + tempQueuesNum++; 491 MantaTemporaryQueue result = new MantaTemporaryQueue(name,this); 492 this.tempQueues.put(name, result); 493 return result; 494 } 496 504 protected TemporaryTopic addTempTopic() throws JMSException { 505 checkLegalOperation(); 506 String topicName = MantaConnection.TMP_TOPIC_PREFIX + "{" 507 + getChannel().getAgentName() + "}" + "[" + this.getClientID() 508 + "]" + tempTopicsNum++; 509 510 MantaTemporaryTopic result = new MantaTemporaryTopic(topicName, this); 511 tempTopics.put(topicName, result); 512 return result; 513 } 515 protected void deleteTempTopic(String serviceName) { 516 tempTopics.remove(serviceName); 517 } 518 519 protected void deleteTempQueue(String serviceName) throws MantaException { 520 tempQueues.remove(serviceName); 521 VirtualQueuesManager queuesManager = MantaAgent.getInstance().getSingletonRepository().getVirtualQueuesManager(); 522 queuesManager.closeQueue(serviceName); 523 } 524 525 526 527 528 531 boolean isStarted() { 532 return isStarted; } 535 546 public ConnectionConsumer createConnectionConsumer(Queue queue, 547 String messageSelector, ServerSessionPool sessionPool, 548 int maxMessages) throws JMSException { 549 checkLegalOperation(); 550 return createConnectionConsumer((Destination) queue, messageSelector, 551 sessionPool, maxMessages); 552 } 553 554 565 public ConnectionConsumer createConnectionConsumer(Topic topic, 566 String messageSelector, ServerSessionPool sessionPool, 567 int maxMessages) throws JMSException { 568 569 checkLegalOperation(); 570 return createConnectionConsumer((Destination) topic, messageSelector, 571 sessionPool, maxMessages); 572 } 573 574 581 protected void checkLegalOperation() throws JMSException { 582 if (!isOpened) 583 throw new IllegalStateException ("MNJMS0002D : ILLEGAL OPERATION. CONNECTION IS CLOSED."); 584 } 585 586 590 void addSession(Session session) { 591 synchronized (createdSessions) { 592 createdSessions.add(session); 593 } 594 } 595 596 600 void deleteSession(Session session) { 601 synchronized (createdSessions) { 602 createdSessions.remove(session); 603 } 604 } 605 606 607 void authorize(int actionType, Object param) throws JMSSecurityException{ 608 if(authorizator!=null){ 609 authorizator.authorize(securitySessionId, actionType,param ); 610 } 611 } 612 void authorize(int actionType) throws JMSSecurityException{ 613 if(authorizator!=null){ 614 authorizator.authorize(securitySessionId, actionType); 615 } 616 } 617 618 623 public MantaAgent getChannel() { 625 return messageChannel; 626 } 627 628 629 public String getUserName() { 630 return userName; 631 } 632 633 protected ExceptionListener abnormalExceptionListener = null; 635 636 protected String clientId; 637 638 protected String userName = null; 639 640 protected String password = null; 641 642 protected ArrayList createdSessions; 643 644 protected Hashtable tempTopics; 645 646 protected Hashtable tempQueues; 647 648 protected MantaConnectionFactory theFactory = null; 649 650 protected ConnectionMetaData mantaConnectionMetaData = null; 651 652 protected MantaAgent messageChannel = null; 653 654 SelectorManager msgSelectorManager; 655 656 protected int sessionsNumber = 0; 657 658 protected int tempTopicsNum = 0; 659 660 protected int tempQueuesNum = 0; 661 662 protected boolean isOpened; 663 664 protected boolean isStarted; 665 666 protected SessionID securitySessionId; 668 protected MantaAuthorization authorizator; 669 670 672 final static String DEF_USER = "unknown"; 673 674 final static String DEF_PASSWORD = "unknown"; 675 676 public final static String TMP_DESTINATION_PREFIX = "&&TMP"; 677 678 public final static String TMP_QUEUE_PREFIX = "&&TMPQ"; 679 680 final static String TMP_TOPIC_PREFIX = "&&TMPT"; 681 682 final static String JMS_AGENT_NAME = "MANTAJMS"; 683 684 685 } 686 | Popular Tags |