KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > servicemix > jbi > nmr > SubscriptionManager


1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.servicemix.jbi.nmr;
18
19 import org.apache.commons.logging.Log;
20 import org.apache.commons.logging.LogFactory;
21 import org.apache.servicemix.JbiConstants;
22 import org.apache.servicemix.MessageExchangeListener;
23 import org.apache.servicemix.components.util.ComponentSupport;
24 import org.apache.servicemix.jbi.framework.Registry;
25 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
26 import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
27
28 import javax.jbi.JBIException;
29 import javax.jbi.messaging.DeliveryChannel;
30 import javax.jbi.messaging.InOnly;
31 import javax.jbi.messaging.MessageExchange;
32 import javax.jbi.messaging.MessagingException;
33 import javax.jbi.messaging.NormalizedMessage;
34
35 import java.util.Iterator JavaDoc;
36 import java.util.List JavaDoc;
37 import java.util.Set JavaDoc;
38
39 /**
40  * Handles publish/subscribe style messaging in the NMR.
41  *
42  *
43  * @version $Revision: 426415 $
44  */

45 public class SubscriptionManager extends ComponentSupport implements MessageExchangeListener {
46     
47     private Registry registry;
48     private String JavaDoc flowName;
49     private static Log log = LogFactory.getLog(SubscriptionManager.class);
50     
51     // SM-229: Avoid StackOverflowException
52
private static final String JavaDoc FROM_SUBSCRIPTION_MANAGER = "org.apache.servicemix.jbi.nmr.from_subman";
53     
54     /**
55      * Initialize the SubscriptionManager
56      * @param broker
57      * @throws JBIException
58      */

59     public void init(Broker broker, Registry registry) throws JBIException {
60         this.registry = registry;
61         broker.getContainer().activateComponent(this, "#SubscriptionManager#");
62     }
63
64     /**
65      * Dispatches the given exchange to all matching subscribers
66      * @param exchange
67      * @return true if dispatched to a matching subscriber(s)
68      *
69      * @throws JBIException
70      */

71     protected boolean dispatchToSubscribers(MessageExchangeImpl exchange) throws JBIException {
72         Boolean JavaDoc source = (Boolean JavaDoc) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
73         if (source == null || !source.booleanValue()) {
74             List JavaDoc list = registry.getMatchingSubscriptionEndpoints(exchange);
75             if (list != null) {
76                 for (int i = 0; i < list.size(); i++) {
77                     InternalEndpoint endpoint = (InternalEndpoint)list.get(i);
78                     dispatchToSubscriber(exchange, endpoint);
79                 }
80             }
81             return list != null && !list.isEmpty();
82         } else {
83             return false;
84         }
85     }
86
87     /**
88      * Dispatches the given message exchange to the given endpoint
89      * @param exchange
90      * @param endpoint
91      * @throws JBIException
92      */

93     protected void dispatchToSubscriber(MessageExchangeImpl exchange, InternalEndpoint endpoint) throws JBIException {
94         if (log.isDebugEnabled() && endpoint != null) {
95             log.debug("Subscription Endpoint: "+endpoint.getEndpointName());
96         }
97         // SM-229: Avoid StackOverflowException
98
Boolean JavaDoc source = (Boolean JavaDoc) exchange.getProperty(FROM_SUBSCRIPTION_MANAGER);
99         if (source == null || !source.booleanValue()) {
100             DeliveryChannel channel = getDeliveryChannel();
101             InOnly me = channel.createExchangeFactory().createInOnlyExchange();
102             // SM-229: Avoid StackOverflowException
103
me.setProperty(FROM_SUBSCRIPTION_MANAGER,Boolean.TRUE);
104             NormalizedMessage in = me.createMessage();
105             getMessageTransformer().transform(me, exchange.getInMessage(), in);
106             me.setInMessage(in);
107             me.setEndpoint(endpoint);
108             Set JavaDoc names = exchange.getPropertyNames();
109             for (Iterator JavaDoc iter = names.iterator(); iter.hasNext();) {
110                 String JavaDoc name = (String JavaDoc) iter.next();
111                 me.setProperty(name, exchange.getProperty(name));
112             }
113             if (Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC))) {
114                 channel.sendSync(me);
115             } else {
116                 channel.send(me);
117             }
118         }
119     }
120
121     public String JavaDoc getFlowName() {
122         return flowName;
123     }
124
125     public void setFlowName(String JavaDoc flowName) {
126         this.flowName = flowName;
127     }
128
129     public void onMessageExchange(MessageExchange exchange) throws MessagingException {
130         // We should only receive done exchanges from subscribers
131
// but we need that so that they can be dequeued
132
}
133    
134 }
135
Popular Tags