1 24 25 package org.objectweb.dream.queue; 26 27 import java.util.Map ; 28 29 import org.objectweb.dream.AbstractComponent; 30 import org.objectweb.dream.InterruptedPushException; 31 import org.objectweb.dream.Push; 32 import org.objectweb.dream.PushException; 33 import org.objectweb.dream.message.Message; 34 import org.objectweb.dream.message.manager.MessageManager; 35 import org.objectweb.dream.util.Error; 36 import org.objectweb.fractal.api.NoSuchInterfaceException; 37 import org.objectweb.fractal.api.control.IllegalBindingException; 38 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 39 40 55 public abstract class AbstractPushIncomingHandlerOverflowImpl 56 extends 57 AbstractComponent implements Push, PushQueueAttributeController 58 { 59 60 protected static final int BLOCK_OVERFLOW_POLICY_ID = 0; 62 protected static final int DROP_QUEUE_MESSAGE_OVERFLOW_POLICY_ID = 1; 63 protected static final int DROP_FIRST_OVERFLOW_POLICY_ID = 2; 64 protected static final int DROP_LAST_OVERFLOW_POLICY_ID = 3; 65 protected static final int DROP_PROCESSED_MESSAGE_OVERFLOW_POLICY_ID = 4; 66 protected static final int EXCEPTION_OVERFLOW_POLICY_ID = 5; 67 68 protected String overflowPolicy; 69 protected short overflowPolicyId; 70 71 protected MessageManager messageManagerItf; 73 protected Buffer bufferItf; 74 protected BufferRemoveFirstLast bufferRemoveFirstLastItf; 75 protected BufferAddFirstLast bufferAddFirstLastItf; 76 77 81 85 public void push(Message message, Map context) throws PushException 86 { 87 try 88 { 89 if (overflowPolicyId != BLOCK_OVERFLOW_POLICY_ID && !canAdd(message)) 90 { 91 switch (overflowPolicyId) 92 { 93 case DROP_QUEUE_MESSAGE_OVERFLOW_POLICY_ID : { 94 messageManagerItf.deleteMessage(bufferItf.remove()); 95 break; 96 } 97 case DROP_FIRST_OVERFLOW_POLICY_ID : { 98 if (bufferRemoveFirstLastItf != null) 99 { 100 messageManagerItf.deleteMessage(bufferRemoveFirstLastItf 101 .removeFirst()); 102 } 103 else 104 { 105 throw new UnsupportedOperationException ( 106 "Unable to remove first element of the queue."); 107 } 108 break; 109 } 110 case DROP_LAST_OVERFLOW_POLICY_ID : { 111 if (bufferRemoveFirstLastItf != null) 112 { 113 messageManagerItf.deleteMessage(bufferRemoveFirstLastItf 114 .removeLast()); 115 } 116 else 117 { 118 throw new UnsupportedOperationException ( 119 "Unable to remove last element of the queue."); 120 } 121 break; 122 } 123 case DROP_PROCESSED_MESSAGE_OVERFLOW_POLICY_ID : { 124 messageManagerItf.deleteMessage(message); 125 break; 126 } 127 case EXCEPTION_OVERFLOW_POLICY_ID : { 128 throw new BufferOverflowException("Overflow in buffer"); 129 } 130 default : { 131 Error.error("Unknown overflow policy" + overflowPolicyId, logger); 132 } 133 } 134 } 135 else 136 { 137 doAdd(message); 138 } 139 } 140 catch (InterruptedException e) 141 { 142 throw new InterruptedPushException("Interrupted while adding a message", 143 e); 144 } 145 } 146 147 151 157 protected abstract boolean canAdd(Message message); 158 159 167 protected abstract void doAdd(Message message) throws InterruptedException ; 168 169 173 176 public String getOverflowPolicy() 177 { 178 return overflowPolicy; 179 } 180 181 184 public synchronized void setOverflowPolicy(String policy) 185 { 186 if (policy.equals(BLOCK_OVERFLOW_POLICY)) 189 { 190 overflowPolicy = BLOCK_OVERFLOW_POLICY; 191 overflowPolicyId = BLOCK_OVERFLOW_POLICY_ID; 192 } 193 else if (policy.equals(DROP_QUEUE_MESSAGE_OVERFLOW_POLICY)) 194 { 195 overflowPolicy = DROP_QUEUE_MESSAGE_OVERFLOW_POLICY; 196 overflowPolicyId = DROP_QUEUE_MESSAGE_OVERFLOW_POLICY_ID; 197 } 198 else if (policy.equals(DROP_FIRST_OVERFLOW_POLICY)) 199 { 200 overflowPolicy = DROP_FIRST_OVERFLOW_POLICY; 201 overflowPolicyId = DROP_FIRST_OVERFLOW_POLICY_ID; 202 } 203 else if (policy.equals(DROP_LAST_OVERFLOW_POLICY)) 204 { 205 overflowPolicy = DROP_LAST_OVERFLOW_POLICY; 206 overflowPolicyId = DROP_LAST_OVERFLOW_POLICY_ID; 207 } 208 else if (policy.equals(DROP_PROCESSED_MESSAGE_OVERFLOW_POLICY)) 209 { 210 overflowPolicy = DROP_PROCESSED_MESSAGE_OVERFLOW_POLICY; 211 overflowPolicyId = DROP_PROCESSED_MESSAGE_OVERFLOW_POLICY_ID; 212 } 213 else if (policy.equals(EXCEPTION_OVERFLOW_POLICY)) 214 { 215 overflowPolicy = EXCEPTION_OVERFLOW_POLICY; 216 overflowPolicyId = EXCEPTION_OVERFLOW_POLICY_ID; 217 } 218 else 219 { 220 throw new IllegalArgumentException ("Unknown overflow policy : " + policy); 221 } 222 } 223 224 228 232 public synchronized void bindFc(String clientItfName, Object serverItf) 233 throws NoSuchInterfaceException, IllegalBindingException, 234 IllegalLifeCycleException 235 { 236 super.bindFc(clientItfName, serverItf); 237 if (clientItfName.equals(MessageManager.ITF_NAME)) 238 { 239 messageManagerItf = (MessageManager) serverItf; 240 } 241 else if (clientItfName.equals(Buffer.ITF_NAME)) 242 { 243 bufferItf = (Buffer) serverItf; 244 } 245 else if (clientItfName.equals(BufferRemoveFirstLast.ITF_NAME)) 246 { 247 bufferRemoveFirstLastItf = (BufferRemoveFirstLast) serverItf; 248 } 249 else if (clientItfName.equals(BufferAddFirstLast.ITF_NAME)) 250 { 251 bufferAddFirstLastItf = (BufferAddFirstLast) serverItf; 252 } 253 } 254 255 258 public String [] listFc() 259 { 260 return new String []{MessageManager.ITF_NAME, Buffer.ITF_NAME, 261 BufferRemoveFirstLast.ITF_NAME, BufferAddFirstLast.ITF_NAME}; 262 } 263 264 } | Popular Tags |