1 10 11 package org.mule.util.queue; 12 13 import org.mule.util.xa.AbstractXAResourceManager; 14 import org.mule.util.xa.DefaultXASession; 15 16 import java.io.IOException ; 17 18 24 class TransactionalQueueSession extends DefaultXASession implements QueueSession 25 { 26 27 protected TransactionalQueueManager queueManager; 28 29 public TransactionalQueueSession(AbstractXAResourceManager resourceManager, 30 TransactionalQueueManager queueManager) 31 { 32 super(resourceManager); 33 this.queueManager = queueManager; 34 } 35 36 41 public Queue getQueue(String name) 42 { 43 QueueInfo queue = queueManager.getQueue(name); 44 return new QueueImpl(queue); 45 } 46 47 protected class QueueImpl implements Queue 48 { 49 50 protected QueueInfo queue; 51 52 public QueueImpl(QueueInfo queue) 53 { 54 this.queue = queue; 55 } 56 57 public void put(Object item) throws InterruptedException 58 { 59 offer(item, Long.MAX_VALUE); 60 } 61 62 public boolean offer(Object item, long timeout) throws InterruptedException 63 { 64 if (localContext != null) 65 { 66 return ((TransactionalQueueManager.QueueTransactionContext)localContext).offer(queue, item, 67 timeout); 68 } 69 else 70 { 71 try 72 { 73 Object id = queueManager.doStore(queue, item); 74 try 75 { 76 if (!queue.offer(id, 0, timeout)) 77 { 78 queueManager.doRemove(queue, item); 79 return false; 80 } 81 else 82 { 83 return true; 84 } 85 } 86 catch (InterruptedException e) 87 { 88 queueManager.doRemove(queue, item); 89 throw e; 90 } 91 } 92 catch (IOException e) 93 { 94 throw new RuntimeException (e); 95 } 96 } 97 } 98 99 public Object take() throws InterruptedException 100 { 101 return poll(Long.MAX_VALUE); 102 } 103 104 public Object poll(long timeout) throws InterruptedException 105 { 106 try 107 { 108 if (localContext != null) 109 { 110 return ((TransactionalQueueManager.QueueTransactionContext)localContext).poll(queue, 111 timeout); 112 } 113 else 114 { 115 Object id = queue.poll(timeout); 116 if (id != null) 117 { 118 Object item = queueManager.doLoad(queue, id); 119 queueManager.doRemove(queue, id); 120 return item; 121 } 122 return null; 123 } 124 } 125 catch (IOException e) 126 { 127 throw new RuntimeException (e); 128 } 129 } 130 131 public Object peek() throws InterruptedException 132 { 133 try 134 { 135 if (localContext != null) 136 { 137 return ((TransactionalQueueManager.QueueTransactionContext)localContext).peek(queue); 138 } 139 else 140 { 141 Object id = queue.peek(); 142 if (id != null) 143 { 144 Object item = queueManager.doLoad(queue, id); 145 queueManager.doRemove(queue, id); 146 return item; 147 } 148 return null; 149 } 150 } 151 catch (IOException e) 152 { 153 throw new RuntimeException (e); 154 } 155 } 156 157 public int size() 158 { 159 if (localContext != null) 160 { 161 return ((TransactionalQueueManager.QueueTransactionContext)localContext).size(queue); 162 } 163 else 164 { 165 return queue.list.size(); 166 } 167 } 168 169 } 170 } 171 | Popular Tags |