1 5 package com.tc.object.applicator; 6 7 import com.tc.exception.TCRuntimeException; 8 import com.tc.logging.TCLogger; 9 import com.tc.logging.TCLogging; 10 import com.tc.object.ClientObjectManager; 11 import com.tc.object.ObjectID; 12 import com.tc.object.SerializationUtil; 13 import com.tc.object.TCObject; 14 import com.tc.object.TraversedReferences; 15 import com.tc.object.bytecode.ByteCodeUtil; 16 import com.tc.object.bytecode.Manageable; 17 import com.tc.object.dna.api.DNA; 18 import com.tc.object.dna.api.DNACursor; 19 import com.tc.object.dna.api.DNAWriter; 20 import com.tc.object.dna.api.LogicalAction; 21 import com.tc.object.dna.api.PhysicalAction; 22 import com.tc.object.dna.impl.DNAEncoding; 23 import com.tc.object.tx.optimistic.OptimisticTransactionManager; 24 import com.tc.object.tx.optimistic.TCObjectClone; 25 import com.tc.util.Assert; 26 import com.tc.util.FieldUtils; 27 28 import java.io.IOException ; 29 import java.lang.reflect.Field ; 30 import java.lang.reflect.InvocationTargetException ; 31 import java.lang.reflect.Method ; 32 import java.util.IdentityHashMap ; 33 import java.util.Iterator ; 34 import java.util.Map ; 35 import java.util.Queue ; 36 import java.util.concurrent.LinkedBlockingQueue ; 37 38 public class LinkedBlockingQueueApplicator extends BaseApplicator { 39 private static final TCLogger logger = TCLogging.getLogger(ListApplicator.class); 40 private static final String LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX = LinkedBlockingQueue .class.getName() + "."; 41 private static final String TAKE_LOCK_FIELD_NAME = "takeLock"; 42 private static final String PUT_LOCK_FIELD_NAME = "putLock"; 43 private static final String CAPACITY_FIELD_NAME = "capacity"; 44 private static final String INIT_METHOD_NAME = "init"; 45 private static final String TC_TAKE_METHOD_NAME = ByteCodeUtil.TC_METHOD_PREFIX + "take"; 46 private static final String TC_PUT_METHOD_NAME = ByteCodeUtil.TC_METHOD_PREFIX + "put"; 47 48 private static final Field TAKE_LOCK_FIELD; 49 private static final Field PUT_LOCK_FIELD; 50 private static final Field CAPACITY_FIELD; 51 private static final Method INIT_METHOD; 52 private static final Method TC_TAKE_METHOD; 53 private static final Method TC_PUT_METHOD; 54 55 static { 56 try { 57 TAKE_LOCK_FIELD = LinkedBlockingQueue .class.getDeclaredField(TAKE_LOCK_FIELD_NAME); 58 TAKE_LOCK_FIELD.setAccessible(true); 59 60 PUT_LOCK_FIELD = LinkedBlockingQueue .class.getDeclaredField(PUT_LOCK_FIELD_NAME); 61 PUT_LOCK_FIELD.setAccessible(true); 62 63 CAPACITY_FIELD = LinkedBlockingQueue .class.getDeclaredField(CAPACITY_FIELD_NAME); 64 CAPACITY_FIELD.setAccessible(true); 65 66 INIT_METHOD = LinkedBlockingQueue .class.getDeclaredMethod(INIT_METHOD_NAME, new Class [0]); 67 INIT_METHOD.setAccessible(true); 68 69 TC_TAKE_METHOD = LinkedBlockingQueue .class.getDeclaredMethod(TC_TAKE_METHOD_NAME, new Class [0]); 70 TC_TAKE_METHOD.setAccessible(true); 71 72 TC_PUT_METHOD = LinkedBlockingQueue .class.getDeclaredMethod(TC_PUT_METHOD_NAME, new Class [] { Object .class }); 73 TC_PUT_METHOD.setAccessible(true); 74 } catch (Exception e) { 75 throw new RuntimeException (e); 76 } 77 } 78 79 public LinkedBlockingQueueApplicator(DNAEncoding encoding) { 80 super(encoding); 81 } 82 83 public TraversedReferences getPortableObjects(Object pojo, TraversedReferences addTo) { 84 getPhysicalPortableObjects(pojo, addTo); 85 getLogicalPortableObjects(pojo, addTo); 86 return addTo; 87 } 88 89 private void getLogicalPortableObjects(Object pojo, TraversedReferences addTo) { 90 for (Iterator i = ((Queue ) pojo).iterator(); i.hasNext();) { 91 Object o = i.next(); 92 filterPortableObject(o, addTo); 93 } 94 } 95 96 private void getPhysicalPortableObjects(Object pojo, TraversedReferences addTo) { 97 try { 98 filterPortableObject(TAKE_LOCK_FIELD.get(pojo), addTo); 99 filterPortableObject(PUT_LOCK_FIELD.get(pojo), addTo); 100 filterPortableObject(CAPACITY_FIELD.get(pojo), addTo); 101 } catch (IllegalAccessException e) { 102 throw new TCRuntimeException(e); 103 } 104 } 105 106 private void filterPortableObject(Object value, TraversedReferences addTo) { 107 if (value != null && isPortableReference(value.getClass())) { 108 addTo.addAnonymousReference(value); 109 } 110 } 111 112 public void hydrate(ClientObjectManager objectManager, TCObject tcObject, DNA dna, Object po) throws IOException , 113 ClassNotFoundException { 114 LinkedBlockingQueue queue = (LinkedBlockingQueue ) po; 115 DNACursor cursor = dna.getCursor(); 116 boolean hasPhysicalAction = false; 117 118 Object takeLock = null; 119 Object putLock = null; 120 Object capacity = null; 121 while (cursor.next(encoding)) { 122 Object action = cursor.getAction(); 123 if (action instanceof LogicalAction) { 124 125 LogicalAction logicalAction = (LogicalAction) action; 126 int method = logicalAction.getMethod(); 127 Object [] params = logicalAction.getParameters(); 128 129 131 try { 132 apply(queue, method, params); 133 } catch (IndexOutOfBoundsException ioobe) { 134 logger.error("Error applying update to " + po, ioobe); 135 } 136 } else if (action instanceof PhysicalAction) { 137 if (!hasPhysicalAction) { 138 hasPhysicalAction = true; 139 } 140 PhysicalAction physicalAction = (PhysicalAction) action; 141 Assert.eval(physicalAction.isTruePhysical()); 142 String fieldName = physicalAction.getFieldName(); 143 Object value = physicalAction.getObject(); 144 145 if (fieldName.equals(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + TAKE_LOCK_FIELD_NAME)) { 146 takeLock = objectManager.lookupObject((ObjectID) value); 147 } else if (fieldName.equals(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + PUT_LOCK_FIELD_NAME)) { 148 putLock = objectManager.lookupObject((ObjectID) value); 149 } else if (fieldName.equals(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + CAPACITY_FIELD_NAME)) { 150 capacity = value; 151 } 152 } 153 } 154 155 if (!dna.isDelta()) { 158 Assert.assertTrue(hasPhysicalAction); 159 try { 160 FieldUtils.tcSet(po, takeLock, TAKE_LOCK_FIELD); 161 FieldUtils.tcSet(po, putLock, PUT_LOCK_FIELD); 162 FieldUtils.tcSet(po, capacity, CAPACITY_FIELD); 163 } catch (IllegalAccessException e) { 164 throw new TCRuntimeException(e); 165 } 166 invokeInitMethod(po); 167 } else { 168 Assert.assertFalse(hasPhysicalAction); 169 } 170 171 } 172 173 private void invokeInitMethod(Object po) { 174 try { 175 INIT_METHOD.invoke(po, new Object [0]); 176 } catch (InvocationTargetException e) { 177 throw new TCRuntimeException(e); 178 } catch (IllegalAccessException e) { 179 throw new TCRuntimeException(e); 180 } 181 } 182 183 private void apply(LinkedBlockingQueue queue, int method, Object [] params) { 184 switch (method) { 185 case SerializationUtil.PUT: 186 try { 187 TC_PUT_METHOD.invoke(queue, new Object [] { params[0] }); 188 } catch (InvocationTargetException e) { 189 throw new TCRuntimeException(e); 190 } catch (IllegalAccessException e) { 191 throw new TCRuntimeException(e); 192 } 193 break; 194 case SerializationUtil.TAKE: 195 try { 196 TC_TAKE_METHOD.invoke(queue, new Object [0]); 197 } catch (InvocationTargetException e) { 198 throw new TCRuntimeException(e); 199 } catch (IllegalAccessException e) { 200 throw new TCRuntimeException(e); 201 } 202 break; 203 case SerializationUtil.REMOVE_FIRST_N: 204 int count = ((Integer ) params[0]).intValue(); 206 for (int i = 0; i < count; i++) { 207 try { 208 TC_TAKE_METHOD.invoke(queue, new Object [0]); 209 } catch (InvocationTargetException e) { 210 throw new TCRuntimeException(e); 211 } catch (IllegalAccessException e) { 212 throw new TCRuntimeException(e); 213 } 214 } 215 break; 216 case SerializationUtil.REMOVE_AT: 217 int index = ((Integer ) params[0]).intValue(); 218 Assert.assertTrue(queue.size() > index); 219 int j = 0; 220 for (Iterator i = queue.iterator(); i.hasNext();) { 221 i.next(); 222 if (j == index) { 223 i.remove(); 224 break; 225 } 226 j++; 227 } 228 break; 229 case SerializationUtil.CLEAR: 230 queue.clear(); 231 break; 232 default: 233 throw new AssertionError ("Invalid method:" + method + " state:" + this); 234 } 235 } 236 237 public void dehydrate(ClientObjectManager objectManager, TCObject tcObject, DNAWriter writer, Object pojo) { 238 dehydrateFields(objectManager, tcObject, writer, pojo); 239 dehydrateMembers(objectManager, tcObject, writer, pojo); 240 } 241 242 private void dehydrateFields(ClientObjectManager objectManager, TCObject tcObject, DNAWriter writer, Object pojo) { 243 try { 244 Object takeLock = TAKE_LOCK_FIELD.get(pojo); 245 takeLock = getDehydratableObject(takeLock, objectManager); 246 writer.addPhysicalAction(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + TAKE_LOCK_FIELD_NAME, takeLock); 247 248 Object putLock = PUT_LOCK_FIELD.get(pojo); 249 putLock = getDehydratableObject(putLock, objectManager); 250 writer.addPhysicalAction(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + PUT_LOCK_FIELD_NAME, putLock); 251 252 Object capacity = CAPACITY_FIELD.get(pojo); 253 capacity = getDehydratableObject(capacity, objectManager); 254 writer.addPhysicalAction(LINKED_BLOCKING_QUEUE_FIELD_NAME_PREFIX + CAPACITY_FIELD_NAME, capacity); 255 } catch (IllegalAccessException e) { 256 throw new TCRuntimeException(e); 257 } 258 } 259 260 public void dehydrateMembers(ClientObjectManager objectManager, TCObject tcObject, DNAWriter writer, Object pojo) { 261 Queue queue = (Queue ) pojo; 262 263 for (Iterator i = queue.iterator(); i.hasNext();) { 264 Object value = i.next(); 265 if (!(value instanceof ObjectID)) { 266 if (!objectManager.isPortableInstance(value)) { 267 continue; 268 } 269 value = getDehydratableObject(value, objectManager); 270 } 271 if (value == null) { 272 continue; 273 } 274 writer.addLogicalAction(SerializationUtil.PUT, new Object [] { value }); 275 } 276 } 277 278 public Object getNewInstance(ClientObjectManager objectManager, DNA dna) { 279 throw new UnsupportedOperationException (); 280 } 281 282 @SuppressWarnings ("unchecked") 283 public Map connectedCopy(Object source, Object dest, Map visited, ClientObjectManager objectManager, 284 OptimisticTransactionManager txManager) { 285 Map cloned = new IdentityHashMap (); 286 287 Manageable sourceManageable = (Manageable) source; 288 Manageable destManaged = (Manageable) dest; 289 290 Queue sourceQueue = (Queue ) source; 291 Queue destQueue = (Queue ) dest; 292 293 for (Iterator i = sourceQueue.iterator(); i.hasNext();) { 294 Object v = i.next(); 295 Object copyValue = null; 296 297 copyValue = createCopyIfNecessary(objectManager, visited, cloned, v); 298 destQueue.add(copyValue); 299 } 300 301 destManaged.__tc_managed(new TCObjectClone(sourceManageable.__tc_managed(), txManager)); 302 return cloned; 303 } 304 } 305 | Popular Tags |