1 22 package org.jboss.mq.il.http; 23 24 import java.util.ArrayList ; 25 import java.util.HashMap ; 26 import java.util.Iterator ; 27 import java.util.Map ; 28 29 import org.jboss.logging.Logger; 30 31 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 32 33 41 public class HTTPClientILStorageQueue 42 { 43 44 private static Logger log = Logger.getLogger(HTTPClientILStorageQueue.class); 45 private static HTTPClientILStorageQueue instance = null; 46 private Map map = new HashMap (); 47 private Object queueLock = new Object (); 48 49 56 private static long id = 100; private static Object idLock = new Object (); 58 59 private HTTPClientILStorageQueue() 60 { 61 if (log.isTraceEnabled()) 62 { 63 log.trace("created"); 64 } 65 } 66 67 public static synchronized HTTPClientILStorageQueue getInstance() 68 { 69 if (log.isTraceEnabled()) 70 { 71 log.trace("getInstance()"); 72 } 73 if (instance == null) 74 { 75 instance = new HTTPClientILStorageQueue(); 76 } 77 return instance; 78 } 79 80 public void put(HTTPILRequest request, String clientIlId) throws InterruptedException 81 { 82 if (log.isTraceEnabled()) 83 { 84 log.trace("put(HTTPILRequest " + request.toString() + ", String " + clientIlId + ")"); 85 } 86 if (clientIlId == null) 87 { 88 log.warn("A request was put in a storage queue for a null ClientIl."); 89 90 return; 91 } 92 synchronized(this.queueLock) 93 { 94 if (this.map.containsKey(clientIlId)) 95 { 96 if (log.isDebugEnabled()) 97 { 98 log.debug("ClientIL #" + clientIlId + " has existing storage queue, adding request to it."); 99 } 100 LinkedQueue queue = (LinkedQueue)this.map.get(clientIlId); 101 queue.put(request); 102 } 103 else 104 { 105 if (log.isDebugEnabled()) 106 { 107 log.debug("ClientIL #" + clientIlId + " doesn't have a storage queue. Creating one and adding the request."); 108 } 109 LinkedQueue queue = new LinkedQueue(); 110 queue.put(request); 111 this.map.put(clientIlId, queue); 112 } 113 } 114 } 115 116 public HTTPILRequest[] get(String clientIlId, long timeout) 117 { 118 if (log.isTraceEnabled()) 119 { 120 log.trace("get(String " + clientIlId + ")"); 121 } 122 123 if (clientIlId == null) 124 { 125 log.warn("A get was issued with a null clientIL Id."); 126 } 127 128 LinkedQueue queue; 129 synchronized(queueLock) 130 { 131 queue = (LinkedQueue)this.map.get(clientIlId); 132 if (queue == null) 133 { 134 if (log.isDebugEnabled()) 135 { 136 log.debug("ClientIL #" + clientIlId + " doesn't have a storage queue. Creating new one."); 137 } 138 queue = new LinkedQueue(); 139 this.map.put(clientIlId, queue); } 141 } 142 ArrayList messageList = new ArrayList (); 143 try 144 { 145 if (log.isDebugEnabled()) 146 { 147 log.debug("Polling the queue for " + String.valueOf(timeout) + " milliseconds on behalf of clientIL #" + clientIlId + "."); 148 } 149 Object object = queue.poll(timeout); 150 if (object != null) 151 { 152 if (log.isDebugEnabled()) 153 { 154 log.debug("Poll returned a HTTPILRequest, adding it to our list of requests to deliver to clientIL #" + clientIlId + "."); 155 } 156 messageList.add(object); 157 while ((object = queue.poll(0)) != null) 158 { 159 if (log.isDebugEnabled()) 160 { 161 log.debug("We had a request, so we're are going to see if there are any more for us, but we're not going to block this time."); 162 } 163 messageList.add(object); 164 if (log.isDebugEnabled()) 165 { 166 log.debug("Added request."); 167 } 168 } 169 } 170 } 171 catch (InterruptedException exception) 172 { 173 log.debug("An interruptedException was triggered. We'll just deliver what we have to the client and try again next time."); 174 } 175 if (log.isDebugEnabled()) 176 log.debug("Returning " + String.valueOf(messageList.size()) + " requests to clientIL #" + clientIlId + "."); 177 return this.createArrayFromList(messageList); } 179 180 public void purgeEntry(String clientIlId) 181 { 182 if (log.isTraceEnabled()) 183 { 184 log.trace("purgeEntry(String " + clientIlId + ")"); 185 } 186 Object entry; 187 synchronized(this.queueLock) 188 { 189 entry = this.map.remove(clientIlId); 190 } 191 if (entry != null && log.isDebugEnabled()) 192 { 193 log.debug("Purged storage queue entry for ClientIL #" + clientIlId + "."); 194 } 195 196 } 197 198 public String getID() 199 { 200 if (log.isTraceEnabled()) 201 { 202 log.trace("getID()"); 203 } 204 synchronized(idLock) 205 { 206 return String.valueOf(++id); 207 } 208 } 209 210 private HTTPILRequest[] createArrayFromList(ArrayList list) 211 { 212 if (log.isTraceEnabled()) 213 { 214 log.trace("createArrayFromList(ArrayList length=" + String.valueOf(list.size()) + ")"); 215 } 216 HTTPILRequest[] requests = new HTTPILRequest[list.size()]; 217 Iterator itemList = list.iterator(); 218 int i = 0; 219 while (itemList.hasNext()) 220 { 221 requests[i] = (HTTPILRequest)itemList.next(); 222 i++; 223 } 224 return requests; 225 } 226 } 227 | Popular Tags |