1 10 11 package org.mule.impl.space; 12 13 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; 14 15 import org.mule.umo.space.UMOSpaceException; 16 import org.mule.util.queue.Queue; 17 import org.mule.util.queue.QueueConfiguration; 18 import org.mule.util.queue.QueueManager; 19 import org.mule.util.queue.QueuePersistenceStrategy; 20 import org.mule.util.queue.QueueSession; 21 import org.mule.util.queue.TransactionalQueueManager; 22 import org.mule.util.xa.ResourceManagerException; 23 import org.mule.util.xa.ResourceManagerSystemException; 24 25 import java.util.Iterator ; 26 import java.util.List ; 27 28 32 public class DefaultSpace extends AbstractSpace 33 { 34 35 private Queue space; 36 private QueueSession session; 37 private QueueManager queueManager; 38 private boolean localQueueManager = false; 39 private List children = new CopyOnWriteArrayList(); 40 private String parentName; 41 42 DefaultSpace(String name, QueuePersistenceStrategy ps, int capacity) 43 throws ResourceManagerSystemException 44 { 45 this(name, ps, true, capacity); 46 } 47 48 public DefaultSpace(String name, QueuePersistenceStrategy ps, boolean enableMonitorEvents, int capacity) 49 throws ResourceManagerSystemException 50 { 51 super(name, enableMonitorEvents); 52 queueManager = new TransactionalQueueManager(); 53 localQueueManager = true; 54 queueManager.setPersistenceStrategy(ps); 55 queueManager.setQueueConfiguration(name, new QueueConfiguration(capacity, !ps.isTransient())); 56 session = queueManager.getQueueSession(); 57 space = session.getQueue(name); 58 queueManager.start(); 59 } 60 61 public DefaultSpace(String name, QueueManager qm, boolean enableMonitorEvents) 62 { 63 super(name, enableMonitorEvents); 64 queueManager = qm; 65 session = queueManager.getQueueSession(); 66 space = session.getQueue(name); 67 } 68 69 77 protected DefaultSpace(String name, 78 String parentName, 79 QueueManager queueManager, 80 QueueSession session, 81 boolean enableMonitorEvents) 82 { 83 super(parentName + "." + name, enableMonitorEvents); 84 this.queueManager = queueManager; 85 this.session = session; 86 space = session.getQueue(parentName); 87 localQueueManager = false; 88 this.parentName = parentName; 89 } 90 91 public void doPut(Object item) throws UMOSpaceException 92 { 93 try 94 { 95 space.put(item); 96 } 97 catch (InterruptedException e) 98 { 99 } 101 } 102 103 public void doPut(Object item, long lease) throws UMOSpaceException 104 { 105 try 106 { 107 space.put(item); 108 } 109 catch (InterruptedException e) 110 { 111 } 113 } 114 115 public Object doTake() throws UMOSpaceException 116 { 117 try 118 { 119 return space.take(); 120 } 121 catch (InterruptedException e) 122 { 123 return null; 125 } 126 } 127 128 public Object doTake(long timeout) throws UMOSpaceException 129 { 130 try 131 { 132 return space.poll(timeout); 133 } 134 catch (InterruptedException e) 135 { 136 return null; 138 } 139 } 140 141 public Object doTakeNoWait() throws UMOSpaceException 142 { 143 try 144 { 145 return space.poll(1L); 146 } 147 catch (InterruptedException e) 148 { 149 return null; 151 } 152 } 153 154 protected void doDispose() 155 { 156 for (Iterator iterator = children.iterator(); iterator.hasNext();) 157 { 158 DefaultSpace childSpace = (DefaultSpace)iterator.next(); 159 childSpace.dispose(); 160 } 161 children.clear(); 162 if (localQueueManager) 163 { 164 try 165 { 166 queueManager.stop(); 167 } 168 catch (ResourceManagerSystemException e) 169 { 170 logger.warn(e); 171 } 172 } 173 } 174 175 public int size() 176 { 177 return space.size(); 178 } 179 180 public void beginTransaction() throws UMOSpaceException 181 { 182 try 183 { 184 session.begin(); 185 } 186 catch (ResourceManagerException e) 187 { 188 throw new SpaceTransactionException(e); 189 } 190 } 191 192 public void commitTransaction() throws UMOSpaceException 193 { 194 try 195 { 196 session.commit(); 197 } 198 catch (ResourceManagerException e) 199 { 200 throw new SpaceTransactionException(e); 201 } 202 } 203 204 public void rollbackTransaction() throws UMOSpaceException 205 { 206 try 207 { 208 session.rollback(); 209 } 210 catch (ResourceManagerException e) 211 { 212 throw new SpaceTransactionException(e); 213 } 214 } 215 216 public DefaultSpace createChild(String name) 217 { 218 DefaultSpace child = new DefaultSpace(name, (parentName == null ? this.name : parentName), 219 queueManager, session, enableMonitorEvents); 220 logger.info("created child space: " + child.getName()); 221 children.add(child); 222 return child; 223 } 224 225 public String getParentName() 226 { 227 return parentName; 228 } 229 } 230 | Popular Tags |