KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > enterprise > jbi > serviceengine > comm > MessageAcceptor


1 /*
2  * The contents of this file are subject to the terms
3  * of the Common Development and Distribution License
4  * (the License). You may not use this file except in
5  * compliance with the License.
6  *
7  * You can obtain a copy of the license at
8  * https://glassfish.dev.java.net/public/CDDLv1.0.html or
9  * glassfish/bootstrap/legal/CDDLv1.0.txt.
10  * See the License for the specific language governing
11  * permissions and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL
14  * Header Notice in each file and include the License file
15  * at glassfish/bootstrap/legal/CDDLv1.0.txt.
16  * If applicable, add the following below the CDDL Header,
17  * with the fields enclosed by brackets [] replaced by
18  * you own identifying information:
19  * "Portions Copyrighted [year] [name of copyright owner]"
20  *
21  * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
22  */

23 package com.sun.enterprise.jbi.serviceengine.comm;
24
25 import com.sun.enterprise.jbi.serviceengine.core.JavaEEServiceEngineContext;
26 import java.util.HashMap JavaDoc;
27 import java.util.logging.Level JavaDoc;
28
29 import javax.jbi.messaging.MessageExchange;
30 import javax.jbi.messaging.MessagingException;
31 import com.sun.enterprise.jbi.serviceengine.work.OneWork;
32
33 /**
34  * Acceptor object that continuously receives messages from
35  * NMR. There is one MessageAcceptor object per WorkManager.
36  *
37  * MessageAcceptor also keeps a house keeping datastructure
38  * that holds information about all threads that are waiting
39  * for reply from on NMR for their two-way message exchange.
40  *
41  * @authod Binod PG
42  */

43 public class MessageAcceptor extends OneWork {
44     
45     private HashMap JavaDoc<String JavaDoc,MessageReceiver> receivers = new HashMap JavaDoc();
46     private boolean released = false;
47     
48     /**
49      * Start the acceptor thread. Note that execute() inturn call
50      * doWork method, where bulk of logic is present.
51      */

52     public void startAccepting() {
53         execute();
54     }
55     
56     /**
57      * Add a MessageReceiver object that waits for a reply from
58      * from NMR on their 2-way message exchange.
59      *
60      * @param receiver MessageReceiver instance.
61      */

62     public void register(MessageReceiver receiver) {
63         String JavaDoc id = receiver.getMessageExchange().getExchangeId();
64         logger.log(Level.FINER, "Adding recever for " + id);
65         synchronized (receivers) {
66             receivers.put(id, receiver);
67         }
68     }
69     
70     /**
71      * Release the thread from accepting. This method doesnt interrupt
72      * the thread. It is just a soft release applicable only from the
73      * next iteration of acceptor thread.
74      */

75     public void release() {
76         released = true;
77     }
78     
79     /**
80      * Actual work happens in this method. DeliveryChannel available
81      * from the super class. If there is any MessageReceiver waiting
82      * for this message, then the MessageExchange is made avalable to that
83      * MessageReceiver. If no MessageReceiver is interested in this MEP,
84      * then a new MessageProcessor will process this message.
85      * In the latter case, the message is for a 109 webservice deployed
86      * in appserver.
87      */

88     public void doWork() {
89         while (true) {
90             try {
91                 
92                 MessageExchange me = getDeliveryChannel().accept();
93                 if (released) {
94                     break;
95                 }
96                 
97                 if(me != null) {
98                     String JavaDoc id = me.getExchangeId();
99                     
100                     // The full block is not synchronized since,
101
// 1. Id will always be unique
102
// 2. Service engine will register the receiver
103
// before sending and hence we wont miss any.
104
if (receivers.containsKey(id)) {
105                         synchronized(receivers) {
106                             MessageReceiver receiver = receivers.remove(id);
107                             receiver.setMessageExchange(me);
108                             if (logger.isLoggable(Level.FINE)) {
109                                 logger.log(Level.FINE,
110                                         "Releasing MessageReceiver:" + id + ",MEP :" + me);
111                             }
112                             receiver.release();
113                         }
114                     } else {
115                         MessageProcessor processor =
116                                 JavaEEServiceEngineContext.getInstance().
117                                 getBridge().getMessageProcessor(me);
118                         processor.setUseCurrentThread(false);
119                         processor.setMessageExchange(me);
120                         if (logger.isLoggable(Level.FINE)) {
121                             logger.log(Level.FINE,
122                                     "Spawning MessageProcessorfor MEP :" + me);
123                         }
124                         processor.process();
125                     }
126                 }
127             } catch (MessagingException ie) {
128                 //Someone has interrupted the acceptor. Gracefully comeout.
129
logger.log(Level.FINE, "Stopping the acceptor thread");
130                 break;
131             }
132         }
133     }
134 }
135
136
Popular Tags