1 24 25 package queues; 26 27 import java.util.Collections ; 28 29 import org.objectweb.dream.AbstractComponent; 30 import org.objectweb.dream.Push; 31 import org.objectweb.dream.control.activity.Util; 32 import org.objectweb.dream.control.activity.task.AbstractTask; 33 import org.objectweb.dream.message.ChunkAlreadyExistsException; 34 import org.objectweb.dream.message.Message; 35 import org.objectweb.dream.message.MessageType; 36 import org.objectweb.dream.message.MessageTypeImpl; 37 import org.objectweb.dream.message.manager.MessageManager; 38 import org.objectweb.dream.queue.SequenceNumberChunk; 39 import org.objectweb.fractal.api.Component; 40 import org.objectweb.fractal.api.NoSuchInterfaceException; 41 import org.objectweb.fractal.api.control.IllegalBindingException; 42 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 43 import org.objectweb.util.monolog.api.BasicLevel; 44 45 49 public class PushGeneratorImpl extends AbstractComponent 50 { 51 52 Push outPushItf; 53 MessageManager messageManagerItf; 54 MessageType msgType; 55 int[] sns = {0, 1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 56 12, 13, 14, 15, 2, 3, 4, 5, 6 }; 57 protected static final Object LOCK = new Object (); 58 int counter = 0; 59 60 63 public PushGeneratorImpl() 64 { 65 try 66 { 67 msgType = new MessageTypeImpl(HelloWorldChunk.DEFAULT_NAME, 68 HelloWorldChunk.TYPE, SequenceNumberChunk.DEFAULT_NAME, 69 SequenceNumberChunk.TYPE); 70 } 71 catch (ChunkAlreadyExistsException e) 72 { 73 } 75 } 76 77 81 84 protected void beforeFirstStart(Component componentItf) 85 throws IllegalLifeCycleException 86 { 87 try 88 { 89 Util.addTask(componentItf, new GeneratorTask("Push generator task1"), 90 Collections.EMPTY_MAP); 91 Util.addTask(componentItf, new GeneratorTask("Push generator task2"), 92 Collections.EMPTY_MAP); 93 logger.log(BasicLevel.DEBUG, "tasks added"); 94 } 95 catch (Exception e) 96 { 97 throw new IllegalLifeCycleException("Can't add task"); 98 } 99 } 100 101 105 109 public synchronized void bindFc(String clientItfName, Object serverItf) 110 throws NoSuchInterfaceException, IllegalBindingException, 111 IllegalLifeCycleException 112 { 113 super.bindFc(clientItfName, serverItf); 114 if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME)) 115 { 116 outPushItf = (Push) serverItf; 117 } 118 else if (clientItfName.equals(MessageManager.ITF_NAME)) 119 { 120 messageManagerItf = (MessageManager) serverItf; 121 } 122 } 123 124 127 public String [] listFc() 128 { 129 return new String []{Push.OUT_PUSH_ITF_NAME, MessageManager.ITF_NAME}; 130 } 131 132 136 private class GeneratorTask extends AbstractTask 137 { 138 139 private String message; 140 141 144 public GeneratorTask(String message) 145 { 146 super(message); 147 this.message = message; 148 } 149 150 153 public Object execute(Object hints) throws InterruptedException  154 { 155 try 156 { 157 Message msg = messageManagerItf.createMessage(msgType); 158 HelloWorldChunk chunk = (HelloWorldChunk) msg 159 .getChunk(HelloWorldChunk.DEFAULT_NAME); 160 SequenceNumberChunk snchunk = (SequenceNumberChunk) msg 161 .getChunk(SequenceNumberChunk.DEFAULT_NAME); 162 synchronized (LOCK) 163 { 164 if (counter < 20) 165 { 166 chunk.setMessage("Message " + message); 168 snchunk.setSequenceNumber(sns[counter]); 169 counter++; 170 outPushItf.push(msg, null); 171 } 172 else 173 { 174 chunk.setMessage("Message " + message); 175 snchunk.setSequenceNumber(counter++); 176 outPushItf.push(msg, null); 177 } 178 } 179 } 180 catch (Exception e) 181 { 182 e.printStackTrace(); 183 return STOP_EXECUTING; 184 } 185 Thread.sleep(1000); 186 return EXECUTE_AGAIN; 187 } 188 } 189 } | Popular Tags |