1 package com.jofti.cache.adapter.listener; 2 3 import java.util.ArrayList ; 4 import java.util.Iterator ; 5 import java.util.List ; 6 import java.util.Map ; 7 import java.util.Properties ; 8 9 import org.apache.commons.logging.Log; 10 import org.apache.commons.logging.LogFactory; 11 12 import com.ibm.websphere.objectgrid.ObjectGridException; 13 import com.ibm.websphere.objectgrid.ObjectGridRuntimeException; 14 import com.ibm.websphere.objectgrid.ObjectMap; 15 import com.ibm.websphere.objectgrid.TxID; 16 import com.ibm.websphere.objectgrid.plugins.LogElement; 17 import com.ibm.websphere.objectgrid.plugins.LogSequence; 18 import com.ibm.websphere.objectgrid.plugins.MapEventListener; 19 import com.ibm.websphere.objectgrid.plugins.index.MapIndexInfo; 20 import com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin; 21 import com.jofti.api.Index; 22 import com.jofti.api.IndexQuery; 23 import com.jofti.cache.IBaseAdaptor; 24 import com.jofti.core.GenericIndexFactory; 25 import com.jofti.core.IParsedQuery; 26 import com.jofti.core.InternalIndex; 27 import com.jofti.exception.JoftiException; 28 import com.jofti.util.ObjectProcedureAdapter; 29 import com.jofti.util.OpenHashMap; 30 31 39 public class ObjectGridEventListener implements MapIndexPlugin, MapEventListener { 40 41 IBaseAdaptor base = null; 42 String name; 43 44 45 private static Log log = LogFactory 46 .getLog(CoherenceEventListener.class); 47 48 public ObjectGridEventListener(IBaseAdaptor base, String name) { 49 this.base = base; 50 this.name =name; 51 52 53 54 } 55 56 59 public void doBatchUpdate(TxID arg0, LogSequence arg1) 60 throws ObjectGridRuntimeException { 61 62 if (log.isDebugEnabled()){ 63 log.debug("transaction id:"+arg0 + " number of entries " +arg1.size()); 64 } 65 if (arg1.isDirty() && arg1.size() >0) 66 { 67 try { 68 base.acquireUpdateLock(); 69 70 Iterator it = arg1.getAllChanges(); 72 73 while(it.hasNext()){ 74 LogElement element = (LogElement)it.next(); 75 76 switch(element.getType().getCode()){ 77 case LogElement.CODE_DELETE: 78 case LogElement.CODE_EVICT: 79 if (log.isDebugEnabled()){ 80 log.debug("delete or eviction for "+ element.getCacheEntry().getKey()); 81 } 82 elementRemoved(element.getCacheEntry().getKey(), null); 83 break; 84 case LogElement.CODE_INSERT: 85 if (log.isDebugEnabled()){ 86 log.debug("insert for "+ element.getCacheEntry().getKey()); 87 } 88 elementAdded(element.getCacheEntry().getKey(),element.getCurrentValue()); 89 break; 90 case LogElement.CODE_UPDATE: 91 if (log.isDebugEnabled()){ 92 log.debug("update for "+ element.getCacheEntry().getKey()); 93 } 94 elementUpdated(element.getCacheEntry().getKey(),element.getCurrentValue()); 95 break; 96 default: 97 if (log.isDebugEnabled()){ 98 log.debug("event ignored for "+ element.getCacheEntry().getKey()); 99 } 100 break; 101 } 102 } 103 104 105 } catch (JoftiException e){ 106 log.error("bulk update event: Unable to complete update for tx:"+arg0,e); 107 }finally { 108 base.releaseUpdateLock(); 109 } 110 } 111 112 113 } 114 115 118 public Object getIndexProxy(MapIndexInfo arg0) { 119 120 return new BaseIndexProxy(arg0); 121 } 122 123 126 public String getName() { 127 128 return name; 129 } 130 131 134 public void setAttributeName(String arg0) { 135 137 } 138 139 142 public void undoBatchUpdate(TxID arg0, LogSequence arg1) 143 throws ObjectGridException { 144 if (log.isDebugEnabled()){ 145 log.debug("transaction id:"+arg0 + " number of entries " +arg1.size()); 146 } 147 List errors = new ArrayList (); 148 if (arg1.isDirty() && arg1.size() >0) 149 { 150 try { 151 base.acquireUpdateLock(); 152 153 Iterator it = arg1.getAllChanges(); 155 156 while(it.hasNext()){ 157 LogElement element = (LogElement)it.next(); 158 try { 159 switch(element.getUndoType().getCode()){ 160 case LogElement.CODE_DELETE: 161 if (log.isDebugEnabled()){ 162 log.debug("delete undo for "+ element.getCacheEntry().getKey()); 163 } 164 elementRemoved(element.getCacheEntry().getKey(), null); 165 break; 166 case LogElement.CODE_INSERT: 167 if (log.isDebugEnabled()){ 168 log.debug("insert undo for "+ element.getCacheEntry().getKey()); 169 } 170 elementAdded(element.getCacheEntry().getKey(),element.getBeforeImage()); 171 break; 172 case LogElement.CODE_UPDATE: 173 if (log.isDebugEnabled()){ 174 log.debug("update undo for "+ element.getCacheEntry().getKey()); 175 } 176 elementUpdated(element.getCacheEntry().getKey(),element.getBeforeImage()); 177 break; 178 default: 179 if (log.isDebugEnabled()){ 180 log.debug("event ignored for "+ element.getCacheEntry().getKey()); 181 } 182 break; 183 } 184 } catch (Exception e){ 185 log.error("error undoing change "+element); 186 errors.add(element.getCacheEntry().getKey()); 187 } 188 } 189 190 191 } catch (JoftiException e){ 192 log.error("bulk update event: Unable to complete update for tx:"+arg0,e); 193 }finally { 194 base.releaseUpdateLock(); 195 } 196 if (errors.size() >0){ 197 throw new ObjectGridException("error undoing transaction changes for:"+ errors); 198 } 199 } 200 201 202 203 } 204 207 public void entryEvicted(Object arg0, Object arg1) { 208 209 try { 210 base.acquireUpdateLock(); 211 212 elementRemoved(arg0,arg1); 213 214 } catch (JoftiException e){ 215 log.error("bulk update event: Unable to complete update for tx:"+arg0,e); 216 }finally { 217 base.releaseUpdateLock(); 218 } 219 220 221 } 222 225 public void preloadCompleted(Throwable arg0) { 226 228 } 229 230 private void elementRemoved(Object key, Object value) throws JoftiException { 231 232 key = base.decorateKey(key); 233 234 synchronized(base.getCacheLock(key)) 235 { 236 base.getIndex().removeByKey((Comparable ) key); 237 } 238 239 if (log.isDebugEnabled()) { 240 log.debug("Remove event: removed from index " + key); 241 } 242 243 } 244 245 246 247 248 private void elementAdded(Object key, Object value) throws JoftiException { 249 250 key = base.decorateKey(key); 251 252 InternalIndex index = base.getIndex(); 253 254 synchronized(base.getCacheLock(key)) 255 { 256 index.insert(key, value); 258 } 259 260 if (log.isDebugEnabled()) { 261 log.debug("Add Event: entry added to index " + key + " value: " 262 + value); 263 } 264 265 } 266 267 private void elementUpdated(Object key, Object value) throws JoftiException { 268 269 key = base.decorateKey(key); 270 271 InternalIndex index = base.getIndex(); 272 273 synchronized(base.getCacheLock(key)) 274 { 275 index.removeByKey(key); 277 index.insert(key, value); 279 } 280 281 if (log.isDebugEnabled()) { 282 log.debug("Add Event: entry added to index " + key + " value: " 283 + value); 284 } 285 286 } 287 288 312 class BaseIndexProxy implements Index{ 313 314 MapIndexInfo info =null; 315 316 BaseIndexProxy(MapIndexInfo info){ 317 this.info = info; 318 } 319 320 323 public IndexQuery addQuery(String name, IndexQuery query) throws JoftiException { 324 325 return ((Index)base).addQuery(name, query); 326 } 327 328 331 public IndexQuery getQuery(String name) { 332 return ((Index)base).getQuery(name); 333 } 334 335 338 public Map query(IndexQuery query) throws JoftiException { 339 340 341 if (info ==null){ 342 return ((Index)base).query(query); 343 } 344 List txChanges = info.getTransactionChanges(true); 346 347 InternalIndex localIndex = GenericIndexFactory.getInstance().createIndex(base.getIndex().getClass().getName(), 349 base.getIndex().getIntrospector(), new Properties (), "local"); 350 351 352 final ObjectMap objectMap = info.getMap(); 353 final Map res = ((Index)base).query(query); 355 356 357 358 for (int i=0;i<txChanges.size();i++){ 359 LogElement element = (LogElement)txChanges.get(i); 360 if (element.getType().getCode() == LogElement.CODE_DELETE || 361 element.getType().getCode() == LogElement.CODE_UPDATE){ 362 res.remove(element.getCacheEntry().getKey()); 363 } 364 if (element.getType().getCode() == LogElement.CODE_INSERT || 365 element.getType().getCode() == LogElement.CODE_UPDATE){ 366 localIndex.insert(element.getCacheEntry().getKey(), element.getCurrentValue()); 367 } 368 } 369 IParsedQuery parsedQuery = (IParsedQuery)base.processQuery(query, base.getIndex().getParserManager()); 371 372 OpenHashMap localRes = (OpenHashMap)localIndex.query(parsedQuery); 373 374 localRes.forEachPair(new ObjectProcedureAdapter() { 376 377 public boolean apply(Object key, Object value) throws RuntimeException { 378 try { 379 res.put(key, objectMap.get(key)); 380 } catch(ObjectGridException e){ 381 log.error("Failure retrieving value in transaction:"+key,e); 382 } 383 return true; 384 } 385 }); 386 if (parsedQuery.getMaxResults() >0 || parsedQuery.getFirstResult() >0){ 387 return base.limitResults(res, parsedQuery.getFirstResult(), parsedQuery.getMaxResults() ); 388 }else{ 389 return res; 390 } 391 } 392 393 } 394 395 } 396 | Popular Tags |