1 24 25 package org.objectweb.dream.channel; 26 27 import java.io.IOException ; 28 import java.util.Map ; 29 30 import org.objectweb.dream.AbstractComponent; 31 import org.objectweb.dream.IOPushException; 32 import org.objectweb.dream.Push; 33 import org.objectweb.dream.PushException; 34 import org.objectweb.dream.message.Message; 35 import org.objectweb.dream.message.codec.MessageCodec; 36 import org.objectweb.dream.message.manager.MessageManager; 37 import org.objectweb.fractal.api.NoSuchInterfaceException; 38 import org.objectweb.fractal.api.control.IllegalBindingException; 39 import org.objectweb.fractal.api.control.IllegalLifeCycleException; 40 import org.objectweb.util.monolog.api.BasicLevel; 41 42 50 public class GenericPushChannelOutImpl extends AbstractComponent 51 implements 52 Push 53 { 54 55 59 public static final String WAIT_ACK_OPT_ITF_NAME = "wait-ack-opt"; 60 61 65 protected MessageManager messageManagerItf; 66 protected MessageCodec messageCodecItf; 67 protected SocketManager socketManagerItf; 68 protected WaitByte waitAckOptItf; 69 70 74 77 public void push(Message message, Map context) throws PushException 78 { 79 if (logger.isLoggable(BasicLevel.DEBUG)) 80 { 81 logger.log(BasicLevel.DEBUG, "try to send message -> " + message); 82 } 83 SocketState socketState = null; 84 try 85 { 86 socketState = socketManagerItf.getSocket(message); 87 } 88 catch (InterruptedException e1) 89 { 90 throw new PushException(e1); 91 } 92 catch (IOException e) 93 { 94 throw new IOPushException(e); 95 } 96 try 97 { 98 logger.log(BasicLevel.DEBUG, "write message"); 99 messageCodecItf.encode(socketState, message); 100 101 if (waitAckOptItf != null) 102 { 103 int b = waitAckOptItf.waitByte(socketState.getInput()); 104 if (b != 0) 105 { 106 socketManagerItf.releaseSocket(socketState, false); 107 throw new PushException("Receive nack for message."); 108 } 109 } 110 } 111 catch (IOException e) 112 { 113 socketManagerItf.releaseSocket(socketState, true); 114 throw new IOPushException(e); 115 } 116 117 socketManagerItf.releaseSocket(socketState, false); 118 messageManagerItf.deleteMessage(message); 119 } 120 121 125 128 public String [] listFc() 129 { 130 return new String []{MessageManager.ITF_NAME, MessageCodec.ITF_NAME, 131 SocketManager.ITF_NAME, WAIT_ACK_OPT_ITF_NAME}; 132 } 133 134 138 public synchronized void bindFc(String clientItfName, Object serverItf) 139 throws NoSuchInterfaceException, IllegalBindingException, 140 IllegalLifeCycleException 141 { 142 super.bindFc(clientItfName, serverItf); 143 if (clientItfName.equals(MessageManager.ITF_NAME)) 144 { 145 messageManagerItf = (MessageManager) serverItf; 146 } 147 else if (clientItfName.equals(MessageCodec.ITF_NAME)) 148 { 149 messageCodecItf = (MessageCodec) serverItf; 150 } 151 else if (clientItfName.equals(SocketManager.ITF_NAME)) 152 { 153 socketManagerItf = (SocketManager) serverItf; 154 } 155 else if (clientItfName.equals(WAIT_ACK_OPT_ITF_NAME)) 156 { 157 waitAckOptItf = (WaitByte) serverItf; 158 } 159 } 160 161 164 public void unbindFc(String clientItfName) throws NoSuchInterfaceException, 165 IllegalBindingException, IllegalLifeCycleException 166 { 167 super.unbindFc(clientItfName); 168 if (clientItfName.equals(WAIT_ACK_OPT_ITF_NAME)) 169 { 170 waitAckOptItf = null; 171 } 172 } 173 } | Popular Tags |