1 22 package org.jboss.mq.server; 23 24 import java.util.ArrayList ; 25 26 import javax.jms.JMSException ; 27 28 import org.jboss.mq.SpyDestination; 29 import org.jboss.mq.SpyJMSException; 30 import org.jboss.mq.SpyMessage; 31 import org.jboss.mq.Subscription; 32 import org.jboss.mq.pm.Tx; 33 34 45 public class JMSQueue extends JMSDestination 46 { 47 48 public BasicQueue queue; 49 50 public JMSQueue(SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server, BasicQueueParameters parameters) throws JMSException 51 { 52 super(dest, temporary, server, parameters); 53 54 if (temporaryDestination == null) 56 { 57 Throwable error = null; 58 for (int i = 0; i <= parameters.recoveryRetries; ++i) 59 { 60 queue = new PersistentQueue(server, dest, parameters); 62 63 try 64 { 65 server.getPersistenceManager().restoreQueue(this, dest); 67 68 break; 70 } 71 catch (Throwable t) 72 { 73 if (i < parameters.recoveryRetries) 74 cat.warn("Error restoring queue " + queue + " retries=" + i + " of " + parameters.recoveryRetries, t); 75 else 76 error = t; 77 try 78 { 79 queue.stop(); 80 } 81 catch (Throwable ignored) 82 { 83 cat.trace("Ignored error stopping queue " + queue, ignored); 84 } 85 finally 86 { 87 queue = null; 88 } 89 } 90 } 91 92 if (error != null) 93 SpyJMSException.rethrowAsJMSException("Unable to recover queue " + dest + " retries=" + parameters.recoveryRetries, error); 94 } 95 else 96 { 97 queue = new BasicQueue(server, destination.toString(), parameters); 99 } 100 101 queue.createMessageCounter(dest.getName(), null, false, false, parameters.messageCounterHistoryDayLimit); 103 } 104 105 public void addSubscriber(Subscription sub) throws JMSException 106 { 107 queue.addSubscriber(sub); 108 } 109 110 public void removeSubscriber(Subscription sub) 111 { 112 queue.removeSubscriber(sub); 113 } 114 115 public void nackMessages(Subscription sub) 116 { 117 queue.nackMessages(sub); 118 } 119 120 public void addReceiver(Subscription sub) throws JMSException 121 { 122 queue.addReceiver(sub); 123 } 124 125 public void removeReceiver(Subscription sub) 126 { 127 queue.removeReceiver(sub); 128 } 129 130 public void restoreMessage(MessageReference messageRef) 131 { 132 try 133 { 134 SpyMessage spyMessage = messageRef.getMessage(); 135 updateNextMessageId(spyMessage); 136 messageRef.queue = queue; 137 queue.restoreMessage(messageRef); 138 } 139 catch (JMSException e) 140 { 141 cat.error("Could not restore message:", e); 142 } 143 } 144 145 public void restoreMessage(SpyMessage message, Tx txid, int type) 146 { 147 try 148 { 149 updateNextMessageId(message); 150 MessageReference messageRef = server.getMessageCache().add(message, queue, MessageReference.STORED); 151 queue.restoreMessage(messageRef, txid, type); 152 } 153 catch (JMSException e) 154 { 155 cat.error("Could not restore message:", e); 156 } 157 } 158 159 public SpyMessage[] browse(String selector) throws JMSException 160 { 161 return queue.browse(selector); 162 } 163 164 public String toString() 165 { 166 return "JMSDestination:" + destination; 167 } 168 169 public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription sub, org.jboss.mq.pm.Tx txId) 170 throws JMSException 171 { 172 queue.acknowledge(req, txId); 173 } 174 175 public void addMessage(SpyMessage mes, org.jboss.mq.pm.Tx txId) throws JMSException 176 { 177 mes.header.messageId = nextMessageId(); 179 MessageReference message = server.getMessageCache().add(mes, queue, MessageReference.NOT_STORED); 180 queue.addMessage(message, txId); 181 } 182 183 public org.jboss.mq.SpyMessage receive(org.jboss.mq.Subscription sub, boolean wait) throws javax.jms.JMSException 184 { 185 return queue.receive(sub, wait); 186 } 187 188 191 public boolean isInUse() 192 { 193 return queue.isInUse(); 194 } 195 196 199 public void close() throws JMSException 200 { 201 queue.stop(); 202 server.getPersistenceManager().closeQueue(this, getSpyDestination()); 203 } 204 205 208 public void removeAllMessages() throws JMSException 209 { 210 queue.removeAllMessages(); 211 } 212 213 218 public MessageCounter[] getMessageCounter() 219 { 220 ArrayList array = new ArrayList (); 221 222 MessageCounter counter = queue.getMessageCounter(); 223 224 if (counter != null) 225 array.add(counter); 226 227 return (MessageCounter[]) array.toArray(new MessageCounter[0]); 228 } 229 } 230 | Popular Tags |