1 10 11 package org.mule.providers.gs.space; 12 13 import net.jini.core.entry.Entry; 14 import net.jini.core.lease.Lease; 15 import net.jini.core.transaction.Transaction; 16 import net.jini.space.JavaSpace; 17 18 import org.mule.config.i18n.Message; 19 import org.mule.config.i18n.Messages; 20 import org.mule.impl.space.AbstractSpace; 21 import org.mule.impl.space.SpaceTransactionException; 22 import org.mule.transaction.TransactionCoordination; 23 import org.mule.transaction.TransactionNotInProgressException; 24 import org.mule.umo.UMOTransaction; 25 import org.mule.umo.space.UMOSpaceException; 26 import org.mule.util.ArrayUtils; 27 28 import com.j_spaces.core.IJSpace; 29 import com.j_spaces.core.client.ExternalEntry; 30 import com.j_spaces.core.client.FinderException; 31 import com.j_spaces.core.client.SpaceFinder; 32 33 import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; 34 import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; 35 36 40 public class GSSpace extends AbstractSpace 41 { 42 private final IJSpace space; 43 private final BlockingQueue queue = new LinkedBlockingQueue(1000); 44 private final long lease; 45 private Entry entryTemplate; 46 private Entry snapshot; 47 48 protected GSSpace(String spaceUrl, boolean enableMonitorEvents) throws FinderException 49 { 50 this(spaceUrl, enableMonitorEvents, Lease.FOREVER); 51 } 52 53 protected GSSpace(String spaceUrl, boolean enableMonitorEvents, long lease) throws FinderException 54 { 55 super(spaceUrl, enableMonitorEvents); 56 this.lease = lease; 57 this.space = (IJSpace)this.findSpace(spaceUrl); 58 } 59 60 protected JavaSpace findSpace(String spaceUrl) throws FinderException 61 { 62 logger.info("Connecting to space: " + spaceUrl); 63 return (JavaSpace)SpaceFinder.find(spaceUrl); 64 } 65 66 public void doPut(Object value) throws UMOSpaceException 67 { 68 doPut(value, lease); 69 } 70 71 public void doPut(Object value, long lease) throws UMOSpaceException 72 { 73 try 74 { 75 Class valueClass = value.getClass(); 76 if (Entry.class.isAssignableFrom(valueClass)) 77 { 78 space.write((Entry)value, getTransaction(), lease); 79 } 80 else if (valueClass.isArray()) 81 { 82 Entry[] entryArr = (Entry[])ArrayUtils.toArrayOfComponentType((Object [])value, Entry.class); 83 space.writeMultiple(entryArr, getTransaction(), lease); 84 } 85 else 86 { 87 space.write(new ExternalEntry(name, new Object []{value}), getTransaction(), lease); 88 } 89 } 90 catch (Exception e) 91 { 92 throw new GSSpaceException(e); 93 } 94 } 95 96 public Object doTake() throws UMOSpaceException 97 { 98 return doTake(Long.MAX_VALUE); 99 } 100 101 public Object doTake(long timeout) throws UMOSpaceException 102 { 103 105 try 106 { 107 if (snapshot == null) 108 { 109 snapshot = space.snapshot(entryTemplate); 110 } 111 112 Object retValue = null; 114 while (retValue == null) 115 { 116 retValue = queue.poll(); 117 if (retValue != null) 118 { 119 continue; 120 } 121 122 Entry[] entries = space.takeMultiple(snapshot, getTransaction(), Integer.MAX_VALUE); 124 if (entries != null && entries.length > 0) 125 { 126 for (int i = 0; i < entries.length; i++) 127 { 128 queue.put(entries[i]); 129 } 130 continue; 131 } 132 133 Object entry = space.take(snapshot, getTransaction(), 5000); 135 if (entry != null) 136 { 137 queue.put(entry); 138 } 139 } 140 return retValue; 141 } 142 catch (Exception e) 143 { 144 try 146 { 147 Thread.sleep(1000); 148 } 149 catch (InterruptedException e1) 150 { 151 e1.printStackTrace(); 152 } 153 throw new GSSpaceException(e); 154 } 155 } 156 157 public Object doTakeNoWait() throws UMOSpaceException 158 { 159 try 160 { 161 return space.takeIfExists(entryTemplate, getTransaction(), 1); 162 } 163 catch (Exception e) 164 { 165 throw new GSSpaceException(e); 166 } 167 } 168 169 protected void doDispose() 170 { 171 } 173 174 public int size() 175 { 176 return -1; 177 } 178 179 public void beginTransaction() throws UMOSpaceException 180 { 181 try 182 { 183 UMOTransaction tx = transactionFactory.beginTransaction(); 184 tx.bindResource(name, space); 185 } 186 catch (org.mule.umo.TransactionException e) 187 { 188 throw new SpaceTransactionException(e); 189 } 190 } 191 192 public void commitTransaction() throws UMOSpaceException 193 { 194 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 195 if (tx == null) 196 { 197 throw new SpaceTransactionException(new TransactionNotInProgressException(new Message( 198 Messages.TX_COMMIT_FAILED))); 199 } 200 try 201 { 202 tx.commit(); 203 } 204 catch (org.mule.umo.TransactionException e) 205 { 206 throw new SpaceTransactionException(e); 207 } 208 } 209 210 public void rollbackTransaction() throws UMOSpaceException 211 { 212 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 213 if (tx == null) 214 { 215 throw new SpaceTransactionException(new TransactionNotInProgressException(new Message( 216 Messages.TX_COMMIT_FAILED))); 217 } 218 try 219 { 220 tx.rollback(); 221 } 222 catch (org.mule.umo.TransactionException e) 223 { 224 throw new SpaceTransactionException(e); 225 } 226 } 227 228 public JavaSpace getJavaSpace() 229 { 230 return space; 231 } 232 233 protected Transaction getTransaction() 234 { 235 UMOTransaction tx = TransactionCoordination.getInstance().getTransaction(); 236 if (tx != null) 237 { 238 return (Transaction)tx.getResource(space); 239 } 240 else 241 { 242 return null; 243 } 244 } 245 246 public Entry getEntryTemplate() 247 { 248 return entryTemplate; 249 } 250 251 public void setEntryTemplate(Entry entryTemplate) 252 { 253 this.entryTemplate = entryTemplate; 254 this.snapshot = null; 255 if (logger.isInfoEnabled()) 256 { 257 logger.info("Space: " + name + " is using receiver template: " + entryTemplate.toString()); 258 } 259 } 260 261 } 262 | Popular Tags |