KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > apache > activemq > AdvisoryConsumer


1 /**
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements. See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */

18 package org.apache.activemq;
19
20 import javax.jms.JMSException JavaDoc;
21
22 import org.apache.activemq.advisory.AdvisorySupport;
23 import org.apache.activemq.command.ActiveMQDestination;
24 import org.apache.activemq.command.ConsumerId;
25 import org.apache.activemq.command.ConsumerInfo;
26 import org.apache.activemq.command.DataStructure;
27 import org.apache.activemq.command.DestinationInfo;
28 import org.apache.activemq.command.MessageAck;
29 import org.apache.activemq.command.MessageDispatch;
30
31 public class AdvisoryConsumer implements ActiveMQDispatcher {
32
33     private final ActiveMQConnection connection;
34     private ConsumerInfo info;
35     private boolean closed;
36     int deliveredCounter;
37
38     public AdvisoryConsumer(ActiveMQConnection connection, ConsumerId consumerId) throws JMSException JavaDoc {
39         this.connection = connection;
40         info = new ConsumerInfo(consumerId);
41         info.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
42         info.setPrefetchSize(1000);
43         info.setNoLocal(true);
44         
45         this.connection.addDispatcher(info.getConsumerId(), this);
46         this.connection.syncSendPacket(this.info);
47     }
48
49     public void dispose() {
50         if (!closed) {
51             this.connection.removeDispatcher(info.getConsumerId());
52             closed = true;
53         }
54     }
55
56     public void dispatch(MessageDispatch md) {
57         
58         // Auto ack messages when we reach 75% of the prefetch
59
deliveredCounter++;
60         if( deliveredCounter > (0.75 * info.getPrefetchSize()) ) {
61             try {
62                 MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredCounter);
63                 connection.asyncSendPacket(ack);
64                 deliveredCounter = 0;
65             } catch (JMSException JavaDoc e) {
66                 connection.onAsyncException(e);
67             }
68         }
69         
70         DataStructure o = md.getMessage().getDataStructure();
71         if( o!=null && o.getClass() == DestinationInfo.class ) {
72             processDestinationInfo((DestinationInfo) o);
73         } else {
74             connection.onAsyncException(new JMSException JavaDoc("Unexpected message was dispatched to the AdvisoryConsumer: "+md));
75         }
76         
77     }
78
79     private void processDestinationInfo(DestinationInfo dinfo) {
80         ActiveMQDestination dest = dinfo.getDestination();
81         if( dinfo.getOperationType() == DestinationInfo.ADD_OPERATION_TYPE ) {
82             connection.activeTempDestinations.put(dest,dest);
83         } else if( dinfo.getOperationType() == DestinationInfo.REMOVE_OPERATION_TYPE ) {
84             connection.activeTempDestinations.remove(dest);
85         }
86     }
87
88 }
89
Popular Tags