1 22 package org.jboss.mq.server; 23 24 import javax.jms.JMSException ; 25 26 import org.jboss.logging.Logger; 27 import org.jboss.mq.SpyDestination; 28 import org.jboss.mq.SpyMessage; 29 import org.jboss.mq.Subscription; 30 import org.jboss.mq.pm.Tx; 31 32 43 public abstract class JMSDestination 44 { 45 SpyDestination destination; 47 ClientConsumer temporaryDestination; 49 JMSDestinationManager server; 51 52 long nextMessageIdCounter = 0; 54 Object nextMessageIdLock = new Object (); 55 56 static long nextSharedMessageIdCounter = 0; 57 static Object nextSharedMessageIdLock = new Object (); 58 59 60 public BasicQueueParameters parameters; 61 62 static Logger cat = Logger.getLogger(JMSDestination.class); 63 64 JMSDestination(SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server, BasicQueueParameters parameters) throws JMSException 65 { 66 destination = dest; 67 temporaryDestination = temporary; 68 this.server = server; 69 this.parameters = parameters; 70 } 71 72 public SpyDestination getSpyDestination() 73 { 74 return destination; 75 } 76 77 public abstract void addSubscriber(Subscription sub) throws JMSException ; 78 79 public abstract void removeSubscriber(Subscription sub) throws JMSException ; 80 81 public abstract void nackMessages(Subscription sub) throws JMSException ; 82 83 public abstract SpyMessage receive(Subscription sub, boolean wait) throws JMSException ; 84 85 public abstract void addReceiver(Subscription sub) throws JMSException ; 86 87 public abstract void removeReceiver(Subscription sub); 88 89 public abstract void restoreMessage(MessageReference message); 90 91 public void restoreMessage(SpyMessage message) 92 { 93 restoreMessage(message, null, Tx.UNKNOWN); 94 } 95 96 103 public abstract void restoreMessage(SpyMessage message, Tx tx, int type); 104 105 public abstract boolean isInUse(); 106 107 public abstract void close() throws JMSException ; 108 public abstract void removeAllMessages() throws JMSException ; 109 110 116 public abstract void acknowledge( 117 org.jboss.mq.AcknowledgementRequest req, 118 org.jboss.mq.Subscription sub, 119 org.jboss.mq.pm.Tx txId) 120 throws javax.jms.JMSException ; 121 122 127 public abstract void addMessage(org.jboss.mq.SpyMessage mes, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException ; 128 129 public abstract MessageCounter[] getMessageCounter(); 130 131 protected static long nextSharedMessageId() 132 { 133 synchronized (nextSharedMessageIdLock) 134 { 135 return nextSharedMessageIdCounter++; 136 } 137 } 138 139 protected static void updateSharedNextMessageId(SpyMessage message) 140 { 141 synchronized (nextSharedMessageIdLock) 142 { 143 nextSharedMessageIdCounter = Math.max(nextSharedMessageIdCounter, message.header.messageId+1); 144 } 145 } 146 147 protected long nextMessageId() 148 { 149 if (parameters.lateClone) 150 return nextSharedMessageId(); 151 152 synchronized (nextMessageIdLock) 153 { 154 return nextMessageIdCounter++; 155 } 156 } 157 158 protected void updateNextMessageId(SpyMessage message) 159 { 160 if (parameters.lateClone) 161 { 162 updateSharedNextMessageId(message); 163 return; 164 } 165 166 synchronized (nextMessageIdLock) 167 { 168 nextMessageIdCounter = Math.max(nextMessageIdCounter, message.header.messageId+1); 169 } 170 } 171 } 172 | Popular Tags |