1 21 22 package org.jacorb.notification.queue; 23 24 import org.jacorb.notification.interfaces.Message; 25 26 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock; 27 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock; 28 29 33 public class RWLockEventQueueDecorator implements MessageQueueAdapter 34 { 35 38 private final ReadWriteLock delegateLock_ = new WriterPreferenceReadWriteLock(); 39 40 43 private MessageQueueAdapter delegate_; 44 45 48 public RWLockEventQueueDecorator(MessageQueueAdapter initialDelegate) 49 throws InterruptedException 50 { 51 super(); 52 53 delegateLock_.writeLock().acquire(); 54 55 try 56 { 57 delegate_ = initialDelegate; 58 } finally 59 { 60 delegateLock_.writeLock().release(); 61 } 62 } 63 64 public void replaceDelegate(MessageQueueAdapter newDelegate) throws InterruptedException 65 { 66 delegateLock_.writeLock().acquire(); 67 68 try 69 { 70 if (delegate_.hasPendingMessages()) 71 { 72 Message[] _allMessages = delegate_.getAllMessages(); 73 for (int x = 0; x < _allMessages.length; ++x) 74 { 75 newDelegate.enqeue(_allMessages[x]); 76 } 77 } 78 delegate_ = newDelegate; 79 } finally 80 { 81 delegateLock_.writeLock().release(); 82 } 83 } 84 85 public void enqeue(Message message) throws InterruptedException 86 { 87 delegateLock_.readLock().acquire(); 88 89 try 90 { 91 delegate_.enqeue(message); 92 } finally 93 { 94 delegateLock_.readLock().release(); 95 } 96 } 97 98 public boolean hasPendingMessages() throws InterruptedException 99 { 100 delegateLock_.readLock().acquire(); 101 102 try 103 { 104 return delegate_.hasPendingMessages(); 105 } finally 106 { 107 delegateLock_.readLock().release(); 108 } 109 } 110 111 public int getPendingMessagesCount() throws InterruptedException 112 { 113 delegateLock_.readLock().acquire(); 114 115 try 116 { 117 return delegate_.getPendingMessagesCount(); 118 } finally 119 { 120 delegateLock_.readLock().release(); 121 } 122 } 123 124 public Message getMessageBlocking() throws InterruptedException 125 { 126 delegateLock_.readLock().acquire(); 127 128 try 129 { 130 return delegate_.getMessageBlocking(); 131 } finally 132 { 133 delegateLock_.readLock().release(); 134 } 135 } 136 137 public Message getMessageNoBlock() throws InterruptedException 138 { 139 delegateLock_.readLock().acquire(); 140 141 try 142 { 143 return delegate_.getMessageNoBlock(); 144 } finally 145 { 146 delegateLock_.readLock().release(); 147 } 148 } 149 150 public Message[] getAllMessages() throws InterruptedException 151 { 152 delegateLock_.readLock().acquire(); 153 154 try 155 { 156 return delegate_.getAllMessages(); 157 } finally 158 { 159 delegateLock_.readLock().release(); 160 } 161 } 162 163 public Message[] getUpToMessages(int max) throws InterruptedException 164 { 165 delegateLock_.readLock().acquire(); 166 167 try 168 { 169 return delegate_.getUpToMessages(max); 170 } finally 171 { 172 delegateLock_.readLock().release(); 173 } 174 } 175 176 public Message[] getAtLeastMessages(int min) throws InterruptedException 177 { 178 delegateLock_.readLock().acquire(); 179 180 try 181 { 182 return delegate_.getAtLeastMessages(min); 183 } finally 184 { 185 delegateLock_.readLock().release(); 186 } 187 } 188 189 public void clear() 190 { 191 try 192 { 193 delegateLock_.writeLock().acquire(); 194 } catch (InterruptedException e) 195 { 196 } 198 199 try 200 { 201 delegate_.clear(); 202 } finally 203 { 204 delegateLock_.writeLock().release(); 205 } 206 } 207 208 public String toString() 209 { 210 return delegate_.toString(); 211 } 212 } | Popular Tags |