KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > object > applicator > LinkedBlockingQueueApplicator


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright
3  * notice. All rights reserved.
4  */

5 package com.tc.object.applicator;
6
7 import com.tc.exception.TCRuntimeException;
8 import com.tc.logging.TCLogger;
9 import com.tc.logging.TCLogging;
10 import com.tc.object.ClientObjectManager;
11 import com.tc.object.ObjectID;
12 import com.tc.object.SerializationUtil;
13 import com.tc.object.TCObject;
14 import com.tc.object.TraversedReferences;
15 import com.tc.object.bytecode.ByteCodeUtil;
16 import com.tc.object.bytecode.Manageable;
17 import com.tc.object.dna.api.DNA;
18 import com.tc.object.dna.api.DNACursor;
19 import com.tc.object.dna.api.DNAWriter;
20 import com.tc.object.dna.api.LogicalAction;
21 import com.tc.object.dna.api.PhysicalAction;
22 import com.tc.object.dna.impl.DNAEncoding;
23 import com.tc.object.tx.optimistic.OptimisticTransactionManager;
24 import com.tc.object.tx.optimistic.TCObjectClone;
25 import com.tc.util.Assert;
26 import com.tc.util.FieldUtils;
27
28 import java.io.IOException JavaDoc;
29 import java.lang.reflect.Field JavaDoc;
30 import java.lang.reflect.InvocationTargetException JavaDoc;
31 import java.lang.reflect.Method JavaDoc;
32 import java.util.IdentityHashMap JavaDoc;
33 import java.util.Iterator JavaDoc;
34 import java.util.Map JavaDoc;
35 import java.util.Queue JavaDoc;
36 import java.util.concurrent.LinkedBlockingQueue JavaDoc;
37
38 public class LinkedBlockingQueueApplicator extends BaseApplicator {
39   private static final TCLogger logger = TCLogging.getLogger(ListApplicator.class);
40   private static final String JavaDoc LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX = LinkedBlockingQueue JavaDoc.class.getName() + ".";
41   private static final String JavaDoc TAKE_LOCK_FIELD_NAME = "takeLock";
42   private static final String JavaDoc PUT_LOCK_FIELD_NAME = "putLock";
43   private static final String JavaDoc CAPACITY_FIELD_NAME = "capacity";
44   private static final String JavaDoc INIT_METHOD_NAME = "init";
45   private static final String JavaDoc TC_TAKE_METHOD_NAME = ByteCodeUtil.TC_METHOD_PREFIX + "take";
46   private static final String JavaDoc TC_PUT_METHOD_NAME = ByteCodeUtil.TC_METHOD_PREFIX + "put";
47
48   private static final Field JavaDoc TAKE_LOCK_FIELD;
49   private static final Field JavaDoc PUT_LOCK_FIELD;
50   private static final Field JavaDoc CAPACITY_FIELD;
51   private static final Method JavaDoc INIT_METHOD;
52   private static final Method JavaDoc TC_TAKE_METHOD;
53   private static final Method JavaDoc TC_PUT_METHOD;
54
55   static {
56     try {
57       TAKE_LOCK_FIELD = LinkedBlockingQueue JavaDoc.class.getDeclaredField(TAKE_LOCK_FIELD_NAME);
58       TAKE_LOCK_FIELD.setAccessible(true);
59
60       PUT_LOCK_FIELD = LinkedBlockingQueue JavaDoc.class.getDeclaredField(PUT_LOCK_FIELD_NAME);
61       PUT_LOCK_FIELD.setAccessible(true);
62
63       CAPACITY_FIELD = LinkedBlockingQueue JavaDoc.class.getDeclaredField(CAPACITY_FIELD_NAME);
64       CAPACITY_FIELD.setAccessible(true);
65
66       INIT_METHOD = LinkedBlockingQueue JavaDoc.class.getDeclaredMethod(INIT_METHOD_NAME, new Class JavaDoc[0]);
67       INIT_METHOD.setAccessible(true);
68
69       TC_TAKE_METHOD = LinkedBlockingQueue JavaDoc.class.getDeclaredMethod(TC_TAKE_METHOD_NAME, new Class JavaDoc[0]);
70       TC_TAKE_METHOD.setAccessible(true);
71
72       TC_PUT_METHOD = LinkedBlockingQueue JavaDoc.class.getDeclaredMethod(TC_PUT_METHOD_NAME, new Class JavaDoc[] { Object JavaDoc.class });
73       TC_PUT_METHOD.setAccessible(true);
74     } catch (Exception JavaDoc e) {
75       throw new RuntimeException JavaDoc(e);
76     }
77   }
78
79   public LinkedBlockingQueueApplicator(DNAEncoding encoding) {
80     super(encoding);
81   }
82
83   public TraversedReferences getPortableObjects(Object JavaDoc pojo, TraversedReferences addTo) {
84     getPhysicalPortableObjects(pojo, addTo);
85     getLogicalPortableObjects(pojo, addTo);
86     return addTo;
87   }
88
89   private void getLogicalPortableObjects(Object JavaDoc pojo, TraversedReferences addTo) {
90     for (Iterator JavaDoc i = ((Queue JavaDoc) pojo).iterator(); i.hasNext();) {
91       Object JavaDoc o = i.next();
92       filterPortableObject(o, addTo);
93     }
94   }
95
96   private void getPhysicalPortableObjects(Object JavaDoc pojo, TraversedReferences addTo) {
97     try {
98       filterPortableObject(TAKE_LOCK_FIELD.get(pojo), addTo);
99       filterPortableObject(PUT_LOCK_FIELD.get(pojo), addTo);
100       filterPortableObject(CAPACITY_FIELD.get(pojo), addTo);
101     } catch (IllegalAccessException JavaDoc e) {
102       throw new TCRuntimeException(e);
103     }
104   }
105
106   private void filterPortableObject(Object JavaDoc value, TraversedReferences addTo) {
107     if (value != null && isPortableReference(value.getClass())) {
108       addTo.addAnonymousReference(value);
109     }
110   }
111
112   public void hydrate(ClientObjectManager objectManager, TCObject tcObject, DNA dna, Object JavaDoc po) throws IOException JavaDoc,
113       ClassNotFoundException JavaDoc {
114     LinkedBlockingQueue JavaDoc queue = (LinkedBlockingQueue JavaDoc) po;
115     DNACursor cursor = dna.getCursor();
116     boolean hasPhysicalAction = false;
117
118     Object JavaDoc takeLock = null;
119     Object JavaDoc putLock = null;
120     Object JavaDoc capacity = null;
121     while (cursor.next(encoding)) {
122       Object JavaDoc action = cursor.getAction();
123       if (action instanceof LogicalAction) {
124
125         LogicalAction logicalAction = (LogicalAction) action;
126         int method = logicalAction.getMethod();
127         Object JavaDoc[] params = logicalAction.getParameters();
128
129         // Since LinkedBlockingQueue supports partial collection, params is not inspected for containing object ids
130

131         try {
132           apply(queue, method, params);
133         } catch (IndexOutOfBoundsException JavaDoc ioobe) {
134           logger.error("Error applying update to " + po, ioobe);
135         }
136       } else if (action instanceof PhysicalAction) {
137         if (!hasPhysicalAction) {
138           hasPhysicalAction = true;
139         }
140         PhysicalAction physicalAction = (PhysicalAction) action;
141         Assert.eval(physicalAction.isTruePhysical());
142         String JavaDoc fieldName = physicalAction.getFieldName();
143         Object JavaDoc value = physicalAction.getObject();
144
145         if (fieldName.equals(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + TAKE_LOCK_FIELD_NAME)) {
146           takeLock = objectManager.lookupObject((ObjectID) value);
147         } else if (fieldName.equals(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + PUT_LOCK_FIELD_NAME)) {
148           putLock = objectManager.lookupObject((ObjectID) value);
149         } else if (fieldName.equals(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + CAPACITY_FIELD_NAME)) {
150           capacity = value;
151         }
152       }
153     }
154
155     // The setting of these physical field can only happen after the logical actions are
156
// applied.
157
if (!dna.isDelta()) {
158       Assert.assertTrue(hasPhysicalAction);
159       try {
160         FieldUtils.tcSet(po, takeLock, TAKE_LOCK_FIELD);
161         FieldUtils.tcSet(po, putLock, PUT_LOCK_FIELD);
162         FieldUtils.tcSet(po, capacity, CAPACITY_FIELD);
163       } catch (IllegalAccessException JavaDoc e) {
164         throw new TCRuntimeException(e);
165       }
166       invokeInitMethod(po);
167     } else {
168       Assert.assertFalse(hasPhysicalAction);
169     }
170
171   }
172
173   private void invokeInitMethod(Object JavaDoc po) {
174     try {
175       INIT_METHOD.invoke(po, new Object JavaDoc[0]);
176     } catch (InvocationTargetException JavaDoc e) {
177       throw new TCRuntimeException(e);
178     } catch (IllegalAccessException JavaDoc e) {
179       throw new TCRuntimeException(e);
180     }
181   }
182
183   private void apply(LinkedBlockingQueue JavaDoc queue, int method, Object JavaDoc[] params) {
184     switch (method) {
185       case SerializationUtil.PUT:
186         try {
187           TC_PUT_METHOD.invoke(queue, new Object JavaDoc[] { params[0] });
188         } catch (InvocationTargetException JavaDoc e) {
189           throw new TCRuntimeException(e);
190         } catch (IllegalAccessException JavaDoc e) {
191           throw new TCRuntimeException(e);
192         }
193         break;
194       case SerializationUtil.TAKE:
195         try {
196           TC_TAKE_METHOD.invoke(queue, new Object JavaDoc[0]);
197         } catch (InvocationTargetException JavaDoc e) {
198           throw new TCRuntimeException(e);
199         } catch (IllegalAccessException JavaDoc e) {
200           throw new TCRuntimeException(e);
201         }
202         break;
203       case SerializationUtil.REMOVE_FIRST_N:
204         // This is caused by drainTo(), which requires a full lock.
205
int count = ((Integer JavaDoc) params[0]).intValue();
206         for (int i = 0; i < count; i++) {
207           try {
208             TC_TAKE_METHOD.invoke(queue, new Object JavaDoc[0]);
209           } catch (InvocationTargetException JavaDoc e) {
210             throw new TCRuntimeException(e);
211           } catch (IllegalAccessException JavaDoc e) {
212             throw new TCRuntimeException(e);
213           }
214         }
215         break;
216       case SerializationUtil.REMOVE_AT:
217         int index = ((Integer JavaDoc) params[0]).intValue();
218         Assert.assertTrue(queue.size() > index);
219         int j = 0;
220         for (Iterator JavaDoc i = queue.iterator(); i.hasNext();) {
221           i.next();
222           if (j == index) {
223             i.remove();
224             break;
225           }
226           j++;
227         }
228         break;
229       case SerializationUtil.CLEAR:
230         queue.clear();
231         break;
232       default:
233         throw new AssertionError JavaDoc("Invalid method:" + method + " state:" + this);
234     }
235   }
236
237   public void dehydrate(ClientObjectManager objectManager, TCObject tcObject, DNAWriter writer, Object JavaDoc pojo) {
238     dehydrateFields(objectManager, tcObject, writer, pojo);
239     dehydrateMembers(objectManager, tcObject, writer, pojo);
240   }
241
242   private void dehydrateFields(ClientObjectManager objectManager, TCObject tcObject, DNAWriter writer, Object JavaDoc pojo) {
243     try {
244       Object JavaDoc takeLock = TAKE_LOCK_FIELD.get(pojo);
245       takeLock = getDehydratableObject(takeLock, objectManager);
246       writer.addPhysicalAction(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + TAKE_LOCK_FIELD_NAME, takeLock);
247
248       Object JavaDoc putLock = PUT_LOCK_FIELD.get(pojo);
249       putLock = getDehydratableObject(putLock, objectManager);
250       writer.addPhysicalAction(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + PUT_LOCK_FIELD_NAME, putLock);
251
252       Object JavaDoc capacity = CAPACITY_FIELD.get(pojo);
253       capacity = getDehydratableObject(capacity, objectManager);
254       writer.addPhysicalAction(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + CAPACITY_FIELD_NAME, capacity);
255     } catch (IllegalAccessException JavaDoc e) {
256       throw new TCRuntimeException(e);
257     }
258   }
259
260   public void dehydrateMembers(ClientObjectManager objectManager, TCObject tcObject, DNAWriter writer, Object JavaDoc pojo) {
261     Queue JavaDoc queue = (Queue JavaDoc) pojo;
262
263     for (Iterator JavaDoc i = queue.iterator(); i.hasNext();) {
264       Object JavaDoc value = i.next();
265       if (!(value instanceof ObjectID)) {
266         if (!objectManager.isPortableInstance(value)) {
267           continue;
268         }
269         value = getDehydratableObject(value, objectManager);
270       }
271       if (value == null) {
272         continue;
273       }
274       writer.addLogicalAction(SerializationUtil.PUT, new Object JavaDoc[] { value });
275     }
276   }
277
278   public Object JavaDoc getNewInstance(ClientObjectManager objectManager, DNA dna) {
279     throw new UnsupportedOperationException JavaDoc();
280   }
281
282   @SuppressWarnings JavaDoc("unchecked")
283   public Map JavaDoc connectedCopy(Object JavaDoc source, Object JavaDoc dest, Map JavaDoc visited, ClientObjectManager objectManager,
284                            OptimisticTransactionManager txManager) {
285     Map JavaDoc cloned = new IdentityHashMap JavaDoc();
286
287     Manageable sourceManageable = (Manageable) source;
288     Manageable destManaged = (Manageable) dest;
289
290     Queue JavaDoc sourceQueue = (Queue JavaDoc) source;
291     Queue JavaDoc destQueue = (Queue JavaDoc) dest;
292
293     for (Iterator JavaDoc i = sourceQueue.iterator(); i.hasNext();) {
294       Object JavaDoc v = i.next();
295       Object JavaDoc copyValue = null;
296
297       copyValue = createCopyIfNecessary(objectManager, visited, cloned, v);
298       destQueue.add(copyValue);
299     }
300
301     destManaged.__tc_managed(new TCObjectClone(sourceManageable.__tc_managed(), txManager));
302     return cloned;
303   }
304 }
305
Popular Tags