KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > jofti > cache > adapter > listener > ObjectGridEventListener


1 package com.jofti.cache.adapter.listener;
2
3 import java.util.ArrayList JavaDoc;
4 import java.util.Iterator JavaDoc;
5 import java.util.List JavaDoc;
6 import java.util.Map JavaDoc;
7 import java.util.Properties JavaDoc;
8
9 import org.apache.commons.logging.Log;
10 import org.apache.commons.logging.LogFactory;
11
12 import com.ibm.websphere.objectgrid.ObjectGridException;
13 import com.ibm.websphere.objectgrid.ObjectGridRuntimeException;
14 import com.ibm.websphere.objectgrid.ObjectMap;
15 import com.ibm.websphere.objectgrid.TxID;
16 import com.ibm.websphere.objectgrid.plugins.LogElement;
17 import com.ibm.websphere.objectgrid.plugins.LogSequence;
18 import com.ibm.websphere.objectgrid.plugins.MapEventListener;
19 import com.ibm.websphere.objectgrid.plugins.index.MapIndexInfo;
20 import com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin;
21 import com.jofti.api.Index;
22 import com.jofti.api.IndexQuery;
23 import com.jofti.cache.IBaseAdaptor;
24 import com.jofti.core.GenericIndexFactory;
25 import com.jofti.core.IParsedQuery;
26 import com.jofti.core.InternalIndex;
27 import com.jofti.exception.JoftiException;
28 import com.jofti.util.ObjectProcedureAdapter;
29 import com.jofti.util.OpenHashMap;
30
31 /**
32  * The Class used to provide the connection between the adapter and the Cache for Listener adapters. </p>
33  * The Listener is for ObjectGrid and above. </p>
34  * @author xenephon
35  * @version 1.2
36  * @since 1.2
37  *
38  */

39 public class ObjectGridEventListener implements MapIndexPlugin, MapEventListener {
40
41     IBaseAdaptor base = null;
42     String JavaDoc name;
43     
44     
45      private static Log log = LogFactory
46         .getLog(CoherenceEventListener.class);
47     
48      public ObjectGridEventListener(IBaseAdaptor base, String JavaDoc name) {
49             this.base = base;
50             this.name =name;
51             
52         
53             
54         }
55      
56     /* (non-Javadoc)
57      * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#doBatchUpdate(com.ibm.websphere.objectgrid.TxID, com.ibm.websphere.objectgrid.plugins.LogSequence)
58      */

59     public void doBatchUpdate(TxID arg0, LogSequence arg1)
60             throws ObjectGridRuntimeException {
61         
62         if (log.isDebugEnabled()){
63             log.debug("transaction id:"+arg0 + " number of entries " +arg1.size());
64         }
65         if (arg1.isDirty() && arg1.size() >0)
66         {
67             try {
68                 base.acquireUpdateLock();
69                 
70     // do deletes first
71
Iterator JavaDoc it = arg1.getAllChanges();
72                 
73                 while(it.hasNext()){
74                     LogElement element = (LogElement)it.next();
75                     
76                     switch(element.getType().getCode()){
77                     case LogElement.CODE_DELETE:
78                     case LogElement.CODE_EVICT:
79                         if (log.isDebugEnabled()){
80                             log.debug("delete or eviction for "+ element.getCacheEntry().getKey());
81                         }
82                         elementRemoved(element.getCacheEntry().getKey(), null);
83                         break;
84                     case LogElement.CODE_INSERT:
85                         if (log.isDebugEnabled()){
86                             log.debug("insert for "+ element.getCacheEntry().getKey());
87                         }
88                         elementAdded(element.getCacheEntry().getKey(),element.getCurrentValue());
89                         break;
90                     case LogElement.CODE_UPDATE:
91                         if (log.isDebugEnabled()){
92                             log.debug("update for "+ element.getCacheEntry().getKey());
93                         }
94                         elementUpdated(element.getCacheEntry().getKey(),element.getCurrentValue());
95                         break;
96                     default:
97                         if (log.isDebugEnabled()){
98                             log.debug("event ignored for "+ element.getCacheEntry().getKey());
99                         }
100                         break;
101                     }
102                 }
103     
104                 
105             } catch (JoftiException e){
106                 log.error("bulk update event: Unable to complete update for tx:"+arg0,e);
107              }finally {
108                 base.releaseUpdateLock();
109             }
110         }
111     
112
113     }
114
115     /* (non-Javadoc)
116      * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#getIndexProxy(com.ibm.websphere.objectgrid.plugins.index.MapIndexInfo)
117      */

118     public Object JavaDoc getIndexProxy(MapIndexInfo arg0) {
119         
120         return new BaseIndexProxy(arg0);
121     }
122
123     /* (non-Javadoc)
124      * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#getName()
125      */

126     public String JavaDoc getName() {
127         
128         return name;
129     }
130
131     /* (non-Javadoc)
132      * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#setAttributeName(java.lang.String)
133      */

134     public void setAttributeName(String JavaDoc arg0) {
135         // NO-OP
136

137     }
138
139     /* (non-Javadoc)
140      * @see com.ibm.websphere.objectgrid.plugins.index.MapIndexPlugin#undoBatchUpdate(com.ibm.websphere.objectgrid.TxID, com.ibm.websphere.objectgrid.plugins.LogSequence)
141      */

142     public void undoBatchUpdate(TxID arg0, LogSequence arg1)
143             throws ObjectGridException {
144         if (log.isDebugEnabled()){
145             log.debug("transaction id:"+arg0 + " number of entries " +arg1.size());
146         }
147         List JavaDoc errors = new ArrayList JavaDoc();
148         if (arg1.isDirty() && arg1.size() >0)
149         {
150             try {
151                 base.acquireUpdateLock();
152                 
153     // do deletes first
154
Iterator JavaDoc it = arg1.getAllChanges();
155                 
156                 while(it.hasNext()){
157                     LogElement element = (LogElement)it.next();
158                     try {
159                         switch(element.getUndoType().getCode()){
160                         case LogElement.CODE_DELETE:
161                             if (log.isDebugEnabled()){
162                                 log.debug("delete undo for "+ element.getCacheEntry().getKey());
163                             }
164                             elementRemoved(element.getCacheEntry().getKey(), null);
165                             break;
166                         case LogElement.CODE_INSERT:
167                             if (log.isDebugEnabled()){
168                                 log.debug("insert undo for "+ element.getCacheEntry().getKey());
169                             }
170                             elementAdded(element.getCacheEntry().getKey(),element.getBeforeImage());
171                             break;
172                         case LogElement.CODE_UPDATE:
173                             if (log.isDebugEnabled()){
174                                 log.debug("update undo for "+ element.getCacheEntry().getKey());
175                             }
176                             elementUpdated(element.getCacheEntry().getKey(),element.getBeforeImage());
177                             break;
178                         default:
179                             if (log.isDebugEnabled()){
180                                 log.debug("event ignored for "+ element.getCacheEntry().getKey());
181                             }
182                             break;
183                         }
184                     } catch (Exception JavaDoc e){
185                         log.error("error undoing change "+element);
186                         errors.add(element.getCacheEntry().getKey());
187                     }
188                 }
189     
190                 
191             } catch (JoftiException e){
192                 log.error("bulk update event: Unable to complete update for tx:"+arg0,e);
193              }finally {
194                 base.releaseUpdateLock();
195             }
196              if (errors.size() >0){
197                  throw new ObjectGridException("error undoing transaction changes for:"+ errors);
198              }
199         }
200     
201
202
203     }
204     /* (non-Javadoc)
205      * @see com.ibm.websphere.objectgrid.plugins.MapEventListener#entryEvicted(java.lang.Object, java.lang.Object)
206      */

207     public void entryEvicted(Object JavaDoc arg0, Object JavaDoc arg1) {
208         
209         try {
210             base.acquireUpdateLock();
211             
212             elementRemoved(arg0,arg1);
213             
214         } catch (JoftiException e){
215             log.error("bulk update event: Unable to complete update for tx:"+arg0,e);
216          }finally {
217             base.releaseUpdateLock();
218         }
219         
220         
221     }
222     /* (non-Javadoc)
223      * @see com.ibm.websphere.objectgrid.plugins.MapEventListener#preloadCompleted(java.lang.Throwable)
224      */

225     public void preloadCompleted(Throwable JavaDoc arg0) {
226         // ??
227

228     }
229     
230     private void elementRemoved(Object JavaDoc key, Object JavaDoc value) throws JoftiException {
231
232         key = base.decorateKey(key);
233
234         synchronized(base.getCacheLock(key))
235         {
236             base.getIndex().removeByKey((Comparable JavaDoc) key);
237         }
238
239         if (log.isDebugEnabled()) {
240             log.debug("Remove event: removed from index " + key);
241         }
242
243     }
244     
245     
246     
247     
248     private void elementAdded(Object JavaDoc key, Object JavaDoc value) throws JoftiException {
249
250         key = base.decorateKey(key);
251
252         InternalIndex index = base.getIndex();
253
254         synchronized(base.getCacheLock(key))
255         {
256             // insert into the index
257
index.insert(key, value);
258         }
259
260         if (log.isDebugEnabled()) {
261             log.debug("Add Event: entry added to index " + key + " value: "
262                     + value);
263         }
264
265     }
266     
267     private void elementUpdated(Object JavaDoc key, Object JavaDoc value) throws JoftiException {
268
269         key = base.decorateKey(key);
270
271         InternalIndex index = base.getIndex();
272
273         synchronized(base.getCacheLock(key))
274         {
275             // remove all entries first
276
index.removeByKey(key);
277             // insert into the index
278
index.insert(key, value);
279         }
280
281         if (log.isDebugEnabled()) {
282             log.debug("Add Event: entry added to index " + key + " value: "
283                     + value);
284         }
285
286     }
287
288     /**
289      * <p>
290      * The proxy provides the implementation for the object returned from the getIndexProxy method.
291      * The proxy is repsonsible for mediating the queries with the ObjectGridAdapter and dealing with
292      * modification of the committed result set with the current transaction scoped results.
293      * </p>
294      * <p>
295      * A new query run on the proxy results in a temporary index being built on those entries in the transaction
296      * that have been inserted or updated.
297      * </p>
298      * <p>
299      * The general sequence is to:
300      * <ul>
301      * <li>Run the query against the committed index
302      * <li>remove the entries that have been marked as remove in this transaction
303      * <li> build a temporary index on the inserted and updated values
304      * <li>query the temp index
305      * <li>merge the results
306      * <li>apply any of the paging/maxresults logic
307      * </ul>
308      *
309      * @author steve
310      *
311      */

312     class BaseIndexProxy implements Index{
313
314         MapIndexInfo info =null;
315         
316         BaseIndexProxy(MapIndexInfo info){
317             this.info = info;
318         }
319         
320         /* (non-Javadoc)
321          * @see com.jofti.api.Index#addQuery(java.lang.String, com.jofti.api.IndexQuery)
322          */

323         public IndexQuery addQuery(String JavaDoc name, IndexQuery query) throws JoftiException {
324             
325             return ((Index)base).addQuery(name, query);
326         }
327
328         /* (non-Javadoc)
329          * @see com.jofti.api.Index#getQuery(java.lang.String)
330          */

331         public IndexQuery getQuery(String JavaDoc name) {
332             return ((Index)base).getQuery(name);
333         }
334
335         /* (non-Javadoc)
336          * @see com.jofti.api.Index#query(com.jofti.api.IndexQuery)
337          */

338         public Map JavaDoc query(IndexQuery query) throws JoftiException {
339             
340             
341             if (info ==null){
342                 return ((Index)base).query(query);
343             }
344             // decorate the results here
345
List JavaDoc txChanges = info.getTransactionChanges(true);
346             
347             // loop through and build temp index
348
InternalIndex localIndex = GenericIndexFactory.getInstance().createIndex(base.getIndex().getClass().getName(),
349                     base.getIndex().getIntrospector(), new Properties JavaDoc(), "local");
350             
351             
352             final ObjectMap objectMap = info.getMap();
353             // query temp index
354
final Map JavaDoc res = ((Index)base).query(query);
355             
356             
357             
358             for (int i=0;i<txChanges.size();i++){
359                 LogElement element = (LogElement)txChanges.get(i);
360                 if (element.getType().getCode() == LogElement.CODE_DELETE ||
361                         element.getType().getCode() == LogElement.CODE_UPDATE){
362                     res.remove(element.getCacheEntry().getKey());
363                 }
364                 if (element.getType().getCode() == LogElement.CODE_INSERT ||
365                         element.getType().getCode() == LogElement.CODE_UPDATE){
366                     localIndex.insert(element.getCacheEntry().getKey(), element.getCurrentValue());
367                 }
368             }
369             // merge results
370
IParsedQuery parsedQuery = (IParsedQuery)base.processQuery(query, base.getIndex().getParserManager());
371             
372             OpenHashMap localRes = (OpenHashMap)localIndex.query(parsedQuery);
373             
374             // merge results
375
localRes.forEachPair(new ObjectProcedureAdapter() {
376                 
377                 public boolean apply(Object JavaDoc key, Object JavaDoc value) throws RuntimeException JavaDoc{
378                     try {
379                         res.put(key, objectMap.get(key));
380                     } catch(ObjectGridException e){
381                         log.error("Failure retrieving value in transaction:"+key,e);
382                     }
383                     return true;
384                 }
385             });
386             if (parsedQuery.getMaxResults() >0 || parsedQuery.getFirstResult() >0){
387                 return base.limitResults(res, parsedQuery.getFirstResult(), parsedQuery.getMaxResults() );
388             }else{
389                 return res;
390             }
391         }
392         
393     }
394
395 }
396
Popular Tags