1 24 25 package queues; 26 27 import java.util.Collections ; 28 29 import org.objectweb.dream.AbstractComponent; 30 import org.objectweb.dream.Pull; 31 import org.objectweb.dream.control.activity.Util; 32 import org.objectweb.dream.control.activity.task.AbstractTask; 33 import org.objectweb.dream.control.activity.task.Task; 34 import org.objectweb.dream.message.Message; 35 import org.objectweb.dream.message.manager.MessageManager; 36 import org.objectweb.dream.queue.SequenceNumberChunk; 37 import org.objectweb.fractal.api.Component; 38 import org.objectweb.fractal.api.NoSuchInterfaceException; 39 import org.objectweb.fractal.api.control.IllegalBindingException; 40 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 41 import org.objectweb.util.monolog.api.BasicLevel; 42 43 47 public class PullGeneratorImpl extends AbstractComponent 48 { 49 50 Pull inPullItf; 51 MessageManager messageManagerItf; 52 53 Task tasks; 54 55 59 62 protected void beforeFirstStart(Component componentItf) 63 throws IllegalLifeCycleException 64 { 65 try 66 { 67 Util.addTask(componentItf, new GeneratorTask("Pull generator task1"), 68 Collections.EMPTY_MAP); 69 Util.addTask(componentItf, new GeneratorTask("Pull generator task2"), 70 Collections.EMPTY_MAP); 71 logger.log(BasicLevel.DEBUG, "tasks added"); 72 } 73 catch (Exception e) 74 { 75 throw new IllegalLifeCycleException("Can't add task"); 76 } 77 } 78 79 83 87 public synchronized void bindFc(String clientItfName, Object serverItf) 88 throws NoSuchInterfaceException, IllegalBindingException, 89 IllegalLifeCycleException 90 { 91 super.bindFc(clientItfName, serverItf); 92 if (clientItfName.equals(Pull.IN_PULL_ITF_NAME)) 93 { 94 inPullItf = (Pull) serverItf; 95 } 96 else if (clientItfName.equals(MessageManager.ITF_NAME)) 97 { 98 messageManagerItf = (MessageManager) serverItf; 99 } 100 } 101 102 105 public String [] listFc() 106 { 107 return new String []{Pull.IN_PULL_ITF_NAME, MessageManager.ITF_NAME}; 108 } 109 110 114 private class GeneratorTask extends AbstractTask 115 { 116 117 120 public GeneratorTask(String message) 121 { 122 super(message); 123 } 124 125 128 public Object execute(Object hints) throws InterruptedException 129 { 130 try 131 { 132 Message msg = inPullItf.pull(null); 133 if (msg == null) 134 { 135 logger.log(BasicLevel.INFO, "Pulled null message"); 136 } 137 else 138 { 139 HelloWorldChunk chunk = (HelloWorldChunk) msg 140 .getChunk(HelloWorldChunk.DEFAULT_NAME); 141 SequenceNumberChunk snchunk = (SequenceNumberChunk) msg 142 .getChunk(SequenceNumberChunk.DEFAULT_NAME); 143 logger.log(BasicLevel.INFO, "Sequence number = " 144 + snchunk.getSequenceNumber() + " msg = " + chunk.getMessage()); 145 messageManagerItf.deleteMessage(msg); 146 } 147 } 148 catch (Exception e) 149 { 150 e.printStackTrace(); 151 return STOP_EXECUTING; 152 } 153 Thread.sleep(1000); 154 return EXECUTE_AGAIN; 155 } 156 } 157 } | Popular Tags |