1 4 package com.tc.objectserver.handler; 5 6 import com.tc.async.api.AbstractEventHandler; 7 import com.tc.async.api.ConfigurationContext; 8 import com.tc.async.api.EventContext; 9 import com.tc.async.api.Sink; 10 import com.tc.io.TCByteBufferOutputStream; 11 import com.tc.logging.TCLogger; 12 import com.tc.net.protocol.tcm.MessageChannel; 13 import com.tc.net.protocol.tcm.TCMessageType; 14 import com.tc.object.dna.impl.ObjectStringSerializer; 15 import com.tc.object.msg.RequestManagedObjectResponseMessage; 16 import com.tc.object.net.DSOChannelManager; 17 import com.tc.object.net.NoSuchChannelException; 18 import com.tc.objectserver.api.ObjectManager; 19 import com.tc.objectserver.context.ManagedObjectRequestContext; 20 import com.tc.objectserver.core.api.ManagedObject; 21 import com.tc.objectserver.core.api.ServerConfigurationContext; 22 import com.tc.objectserver.l1.api.ClientStateManager; 23 import com.tc.util.sequence.Sequence; 24 import com.tc.util.sequence.SimpleSequence; 25 26 import gnu.trove.THashSet; 27 28 import java.util.Collection ; 29 import java.util.HashSet ; 30 import java.util.Iterator ; 31 import java.util.LinkedList ; 32 import java.util.Set ; 33 34 37 public class RespondToObjectRequestHandler extends AbstractEventHandler { 38 39 private static final int MAX_OBJECTS_TO_LOOKUP = 50; 41 42 private DSOChannelManager channelManager; 43 private ObjectManager objectManager; 44 private ClientStateManager stateManager; 45 private TCLogger logger; 46 private Sequence batchIDSequence = new SimpleSequence(); 47 private Sink managedObjectRequestSink; 48 49 public void handleEvent(EventContext context) { 50 long batchID = batchIDSequence.next(); 51 ManagedObjectRequestContext morc = (ManagedObjectRequestContext) context; 52 Collection objs = morc.getObjects(); 53 LinkedList objectsInOrder = new LinkedList (); 54 55 createNewLookupRequestsIfNecessary(morc); 57 58 Collection requestedObjectIDs = morc.getLookupIDs(); 59 Set ids = new HashSet(Math.max((int) (objs.size() / .75f) + 1, 16)); 60 for (Iterator i = objs.iterator(); i.hasNext();) { 61 ManagedObject mo = (ManagedObject) i.next(); 62 ids.add(mo.getID()); 63 if (requestedObjectIDs.contains(mo.getID())) { 64 objectsInOrder.addLast(mo); 65 } else { 66 objectsInOrder.addFirst(mo); 67 } 68 } 69 70 try { 71 MessageChannel channel = channelManager.getActiveChannel(morc.getChannelID()); 72 73 Set newIds = stateManager.addReferences(morc.getChannelID(), ids); 75 int sendCount = 0; 76 int batches = 0; 77 ObjectStringSerializer serializer = new ObjectStringSerializer(); 78 TCByteBufferOutputStream out = new TCByteBufferOutputStream(); 79 for (Iterator i = objectsInOrder.iterator(); i.hasNext();) { 80 81 ManagedObject m = (ManagedObject) i.next(); 82 i.remove(); 83 if (newIds.contains(m.getID())) { 88 m.toDNA(out, serializer); 89 sendCount++; 90 } else if (morc.getLookupIDs().contains(m.getID())) { 91 } 93 objectManager.releaseReadOnly(m); 94 95 if (sendCount > 1000 || (sendCount > 0 && !i.hasNext())) { 96 batches++; 97 RequestManagedObjectResponseMessage responseMessage = (RequestManagedObjectResponseMessage) channel 98 .createMessage(TCMessageType.REQUEST_MANAGED_OBJECT_RESPONSE_MESSAGE); 99 responseMessage.initialize(out.toArray(), sendCount, serializer, batchID, i.hasNext() ? 0 : batches); 100 responseMessage.send(); 101 if (i.hasNext()) { 102 sendCount = 0; 103 serializer = new ObjectStringSerializer(); 104 out = new TCByteBufferOutputStream(); 105 } 106 } 107 } 108 } catch (NoSuchChannelException e) { 109 logger.info("Not sending response because channel is disconnected: " + morc.getChannelID() 110 + ". Releasing all checked-out objects..."); 111 for (Iterator i = objectsInOrder.iterator(); i.hasNext();) { 112 objectManager.releaseReadOnly((ManagedObject) i.next()); 113 } 114 return; 115 } 116 } 117 118 private void createNewLookupRequestsIfNecessary(ManagedObjectRequestContext morc) { 119 int maxRequestDepth = morc.getMaxRequestDepth(); 120 Set oids = morc.getLookupPendingObjectIDs(); 121 if (oids.isEmpty()) { return; } 122 if (logger.isDebugEnabled()) { 123 logger.debug("Creating Server initiated requests for : " + morc.getChannelID() + " org request Id length = " 124 + morc.getLookupIDs().size() + " Reachable object(s) to be looked up length = " 125 + oids.size()); 126 } 127 if (oids.size() <= MAX_OBJECTS_TO_LOOKUP) { 128 this.managedObjectRequestSink.add(new ManagedObjectRequestContext(morc.getChannelID(), morc.getRequestID(), oids, 129 -1, morc.getSink())); 130 } else { 131 Set split = new HashSet(MAX_OBJECTS_TO_LOOKUP); 133 for (Iterator i = oids.iterator(); i.hasNext();) { 134 split.add(i.next()); 135 if (split.size() >= MAX_OBJECTS_TO_LOOKUP) { 136 this.managedObjectRequestSink.add(new ManagedObjectRequestContext(morc.getChannelID(), morc.getRequestID(), 137 split, -1, morc.getSink())); 138 if (i.hasNext()) split = new THashSet(maxRequestDepth); 139 } 140 } 141 } 142 } 143 144 public void initialize(ConfigurationContext context) { 145 super.initialize(context); 146 ServerConfigurationContext oscc = (ServerConfigurationContext) context; 147 this.channelManager = oscc.getChannelManager(); 148 this.objectManager = oscc.getObjectManager(); 149 this.logger = oscc.getLogger(getClass()); 150 this.stateManager = oscc.getClientStateManager(); 151 this.managedObjectRequestSink = oscc.getStage(ServerConfigurationContext.MANAGED_OBJECT_REQUEST_STAGE).getSink(); 152 } 153 } 154 | Popular Tags |