1 22 package org.jboss.mq.pm.none; 23 import javax.jms.JMSException ; 24 import javax.management.ObjectName ; 25 26 import org.jboss.mq.SpyDestination; 27 import org.jboss.mq.SpyMessage; 28 import org.jboss.mq.pm.CacheStore; 29 import org.jboss.mq.pm.Tx; 30 import org.jboss.mq.pm.TxManager; 31 import org.jboss.mq.server.JMSDestination; 32 import org.jboss.mq.server.MessageCache; 33 import org.jboss.mq.server.MessageReference; 34 import org.jboss.system.ServiceMBeanSupport; 35 36 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; 37 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; 38 39 51 public class PersistenceManager 52 extends ServiceMBeanSupport 53 implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager, CacheStore 54 { 55 57 59 60 SynchronizedLong nextTransactionid = new SynchronizedLong(0l); 61 62 63 ObjectName delegateName; 64 65 66 org.jboss.mq.pm.PersistenceManager delegate; 67 68 69 TxManager txManager; 70 71 72 ConcurrentHashMap cache = new ConcurrentHashMap(); 73 74 76 78 84 public ObjectName getDelegatePM() 85 { 86 return delegateName; 87 } 88 89 95 public void setDelegatePM(ObjectName delegateName) 96 { 97 this.delegateName = delegateName; 98 } 99 100 102 public void add(MessageReference message, Tx txId) throws JMSException 103 { 104 if (delegate != null && message.inMemory() == false) 105 delegate.add(message, txId); 106 } 107 108 public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException 109 { 110 if (delegate != null) 111 delegate.closeQueue(jmsDest, dest); 112 } 113 114 public void commitPersistentTx(Tx txId) throws JMSException 115 { 116 if (delegate != null) 117 delegate.commitPersistentTx(txId); 118 } 119 120 public Tx createPersistentTx() throws JMSException 121 { 122 if (delegate != null) 123 return delegate.createPersistentTx(); 124 125 Tx tx = new Tx(nextTransactionid.increment()); 126 return tx; 127 } 128 129 public MessageCache getMessageCacheInstance() 130 { 131 if (delegate != null) 132 return delegate.getMessageCacheInstance(); 133 134 throw new UnsupportedOperationException ("This is now set on the destination manager"); 135 } 136 137 public TxManager getTxManager() 138 { 139 return txManager; 140 } 141 142 public void remove(MessageReference message, Tx txId) throws JMSException 143 { 144 if (delegate != null && message.inMemory() == false) 145 delegate.remove(message, txId); 146 } 147 148 public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException 149 { 150 if (delegate != null) 151 delegate.restoreQueue(jmsDest, dest); 152 } 153 154 public void rollbackPersistentTx(Tx txId) throws JMSException 155 { 156 if (delegate != null) 157 delegate.rollbackPersistentTx(txId); 158 } 159 160 public void update(MessageReference message, Tx txId) throws JMSException 161 { 162 if (delegate != null && message.inMemory() == false) 163 delegate.update(message, txId); 164 } 165 166 168 public Object getInstance() 169 { 170 return this; 171 } 172 173 176 public ObjectName getMessageCache() 177 { 178 if (delegateName != null) 179 { 180 try 181 { 182 return (ObjectName ) server.getAttribute(delegateName, "MessageCache"); 183 } 184 catch (Exception e) 185 { 186 log.trace("Unable to retrieve message cache from delegate", e); 187 } 188 } 189 throw new UnsupportedOperationException ("This is now set on the destination manager"); 190 } 191 192 195 public void setMessageCache(ObjectName messageCache) 196 { 197 throw new UnsupportedOperationException ("This is now set on the destination manager"); 198 } 199 200 202 public SpyMessage loadFromStorage(MessageReference mh) throws JMSException 203 { 204 if (delegate == null || mh.inMemory()) 205 return (SpyMessage) cache.get(mh); 206 else 207 return ((CacheStore) delegate).loadFromStorage(mh); 208 } 209 210 public void removeFromStorage(MessageReference mh) throws JMSException 211 { 212 if (delegate == null || mh.inMemory()) 213 { 214 cache.remove(mh); 215 mh.setStored(MessageReference.NOT_STORED); 216 } 217 else 218 ((CacheStore) delegate).removeFromStorage(mh); 219 220 } 221 222 public void saveToStorage(MessageReference mh, SpyMessage message) throws JMSException 223 { 224 if (delegate == null || mh.inMemory()) 225 { 226 cache.put(mh, message); 227 mh.setStored(MessageReference.STORED); 228 } 229 else 230 ((CacheStore) delegate).saveToStorage(mh, message); 231 } 232 233 235 protected void startService() throws Exception 236 { 237 if (delegateName != null) 239 { 240 delegate = (org.jboss.mq.pm.PersistenceManager) getServer().getAttribute(delegateName, "Instance"); 241 if ((delegate instanceof CacheStore) == false) 242 throw new UnsupportedOperationException ("The delegate persistence manager must also be a CacheStore"); 243 txManager = delegate.getTxManager(); 244 } 245 else 246 { 247 txManager = new TxManager(this); 248 } 249 } 250 251 253 } 255 | Popular Tags |