1 24 25 package utobcast.basic; 26 27 import java.util.Collections ; 28 import java.util.Map ; 29 30 import org.objectweb.dream.AbstractComponent; 31 import org.objectweb.dream.Push; 32 import org.objectweb.dream.PushException; 33 import org.objectweb.dream.control.activity.Util; 34 import org.objectweb.dream.control.activity.task.AbstractTask; 35 import org.objectweb.dream.control.activity.task.Task; 36 import org.objectweb.dream.message.Message; 37 import org.objectweb.dream.message.MessageType; 38 import org.objectweb.dream.message.MessageTypeImpl; 39 import org.objectweb.dream.message.manager.MessageManager; 40 import org.objectweb.fractal.api.Component; 41 import org.objectweb.fractal.api.NoSuchInterfaceException; 42 import org.objectweb.fractal.api.control.IllegalBindingException; 43 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 44 import org.objectweb.util.monolog.api.BasicLevel; 45 46 49 public class ProducerConsumerImpl extends AbstractComponent implements Push 50 { 51 52 Push outPushItf; 53 MessageManager messageManagerItf; 54 MessageType msgType = new MessageTypeImpl(TestChunk.DEFAULT_NAME, 55 TestChunk.TYPE); 56 57 Task tasks; 58 59 63 66 protected void beforeFirstStart(Component componentItf) 67 throws IllegalLifeCycleException 68 { 69 try 70 { 71 Util.addTask(componentItf, new ProducerTask("Producer task"), 72 Collections.EMPTY_MAP); 73 logger.log(BasicLevel.DEBUG, "task added"); 74 } 75 catch (Exception e) 76 { 77 throw new IllegalLifeCycleException("Can't add task"); 78 } 79 } 80 81 85 89 public void push(Message message, Map context) throws PushException 90 { 91 logger.log(BasicLevel.INFO, "Received message " + message); 92 messageManagerItf.deleteMessage(message); 93 } 94 95 99 103 public synchronized void bindFc(String clientItfName, Object serverItf) 104 throws NoSuchInterfaceException, IllegalBindingException, 105 IllegalLifeCycleException 106 { 107 super.bindFc(clientItfName, serverItf); 108 if (clientItfName.equals(Push.OUT_PUSH_ITF_NAME)) 109 { 110 outPushItf = (Push) serverItf; 111 } 112 else if (clientItfName.equals(MessageManager.ITF_NAME)) 113 { 114 messageManagerItf = (MessageManager) serverItf; 115 } 116 } 117 118 121 public String [] listFc() 122 { 123 return new String []{Push.OUT_PUSH_ITF_NAME, MessageManager.ITF_NAME}; 124 } 125 126 private class ProducerTask extends AbstractTask 127 { 128 129 private String message = "Producer"; 130 131 134 public ProducerTask(String name) 135 { 136 super(name); 137 } 138 139 142 public Object execute(Object hints) throws InterruptedException 143 { 144 try 145 { 146 Message msg = messageManagerItf.createMessage(msgType); 147 TestChunk chunk = (TestChunk) msg.getChunk(TestChunk.DEFAULT_NAME); 148 chunk.setMessage(message); 149 outPushItf.push(msg, null); 150 } 151 catch (Exception e) 152 { 153 e.printStackTrace(); 154 return STOP_EXECUTING; 155 } 156 Thread.sleep(1000); 157 return EXECUTE_AGAIN; 158 } 159 } 160 161 } | Popular Tags |