1 24 25 package org.objectweb.dream.queue; 26 27 import java.util.Collections ; 28 29 import org.objectweb.dream.AbstractComponent; 30 import org.objectweb.dream.Push; 31 import org.objectweb.dream.PushException; 32 import org.objectweb.dream.control.activity.Util; 33 import org.objectweb.dream.control.activity.task.AbstractTask; 34 import org.objectweb.dream.control.activity.task.Task; 35 import org.objectweb.dream.message.Message; 36 import org.objectweb.dream.message.manager.MessageManager; 37 import org.objectweb.fractal.api.Component; 38 import org.objectweb.fractal.api.NoSuchInterfaceException; 39 import org.objectweb.fractal.api.control.IllegalBindingException; 40 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 41 import org.objectweb.util.monolog.api.BasicLevel; 42 43 46 public class PushOutgoingHandlerActiveImpl extends AbstractComponent 47 48 { 49 50 protected MessageManager messageManagerItf; 52 protected Push outPushItf; 53 protected Buffer bufferItf; 54 protected BufferRemoveFirstLast bufferRemoveFirstLastItf; 55 56 protected Task queueTask = new PushPushQueueTask(); 58 59 63 66 protected void beforeFirstStart(Component componentItf) 67 throws IllegalLifeCycleException 68 { 69 try 70 { 71 Util.addTask(componentItf, queueTask, Collections.EMPTY_MAP); 72 } 73 catch (Exception e) 74 { 75 throw new IllegalLifeCycleException("Can't add task"); 76 } 77 } 78 79 class PushPushQueueTask extends AbstractTask 80 { 81 82 85 public PushPushQueueTask() 86 { 87 super("PushPushQueueTask"); 88 } 89 90 93 public Object execute(Object hints) throws InterruptedException 94 { 95 Message msg = bufferItf.remove(); 96 try 97 { 98 outPushItf.push(msg, null); 99 } 100 catch (PushException e) 101 { 102 handlePushException(msg, e); 103 } 104 return EXECUTE_AGAIN; 105 } 106 } 107 108 112 119 protected void handlePushException(Message message, PushException exception) 120 { 121 logger.log(BasicLevel.WARN, "A Push exception has been cached", exception); 122 messageManagerItf.deleteMessage(message); 123 } 124 125 129 133 public synchronized void bindFc(String clientItfName, Object serverItf) 134 throws NoSuchInterfaceException, IllegalBindingException, 135 IllegalLifeCycleException 136 { 137 super.bindFc(clientItfName, serverItf); 138 if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME)) 139 { 140 outPushItf = (Push) serverItf; 141 } 142 else if (clientItfName.equals(Buffer.ITF_NAME)) 143 { 144 bufferItf = (Buffer) serverItf; 145 } 146 else if (clientItfName.equals(BufferRemoveFirstLast.ITF_NAME)) 147 { 148 bufferRemoveFirstLastItf = (BufferRemoveFirstLast) serverItf; 149 } 150 } 151 152 155 public String [] listFc() 156 { 157 return new String []{Push.OUT_PUSH_ITF_NAME, Buffer.ITF_NAME, 158 BufferRemoveFirstLast.ITF_NAME, MessageManager.ITF_NAME}; 159 } 160 161 } | Popular Tags |