KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > broker > CompositeDestinationBroker


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq.broker;
19
20 import org.apache.activemq.command.ActiveMQDestination;
21 import org.apache.activemq.command.Message;
22 import org.apache.activemq.command.ProducerInfo;
23
24 /**
25  * This broker filter handles composite destinations.
26  *
27  * If a broker operation is invoked using a composite destination, this filter
28  * repeats the operation using each destination of the composite.
29  *
30  * HRC: I think this filter is dangerous to use to with the consumer operations. Multiple
31  * Subscription objects will be associated with a single JMS consumer each having a
32  * different idea of what the current pre-fetch dispatch size is.
33  *
34  * If this is used, then the client has to expect many more messages to be dispatched
35  * than the pre-fetch setting allows.
36  *
37  * @version $Revision: 1.8 $
38  */

39 public class CompositeDestinationBroker extends BrokerFilter {
40     
41     public CompositeDestinationBroker(Broker next) {
42         super(next);
43     }
44
45     /**
46      * A producer may register to send to multiple destinations via a composite destination.
47      */

48     public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception JavaDoc {
49         // The destination may be null.
50
ActiveMQDestination destination = info.getDestination();
51         if( destination!=null && destination.isComposite() ) {
52             ActiveMQDestination[] destinations = destination.getCompositeDestinations();
53             for (int i = 0; i < destinations.length; i++) {
54                 ProducerInfo copy = info.copy();
55                 copy.setDestination(destinations[i]);
56                 next.addProducer(context, copy);
57             }
58         } else {
59             next.addProducer(context, info);
60         }
61     }
62     
63     /**
64      * A producer may de-register from sending to multiple destinations via a composite destination.
65      */

66     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception JavaDoc {
67         // The destination may be null.
68
ActiveMQDestination destination = info.getDestination();
69         if( destination!=null && destination.isComposite() ) {
70             ActiveMQDestination[] destinations = destination.getCompositeDestinations();
71             for (int i = 0; i < destinations.length; i++) {
72                 ProducerInfo copy = info.copy();
73                 copy.setDestination(destinations[i]);
74                 next.removeProducer(context, copy);
75             }
76         } else {
77             next.removeProducer(context, info);
78         }
79     }
80     
81     /**
82      *
83      */

84     public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception JavaDoc {
85         ActiveMQDestination destination = message.getDestination();
86         if( destination.isComposite() ) {
87             ActiveMQDestination[] destinations = destination.getCompositeDestinations();
88             for (int i = 0; i < destinations.length; i++) {
89                 if( i!=0 ) {
90                     message = message.copy();
91                 }
92                 message.setOriginalDestination(destination);
93                 message.setDestination(destinations[i]);
94                 next.send(producerExchange, message);
95             }
96         } else {
97             next.send(producerExchange, message);
98         }
99     }
100     
101 }
102
Popular Tags