1 7 package org.jboss.remoting.stream; 8 9 import java.io.InputStream ; 10 import java.net.InetAddress ; 11 import java.net.UnknownHostException ; 12 import javax.management.MBeanServer ; 13 import org.jboss.logging.Logger; 14 import org.jboss.remoting.InvocationRequest; 15 import org.jboss.remoting.InvokerLocator; 16 import org.jboss.remoting.ServerInvocationHandler; 17 import org.jboss.remoting.ServerInvoker; 18 import org.jboss.remoting.callback.InvokerCallbackHandler; 19 import org.jboss.remoting.transport.Connector; 20 21 35 public class StreamServer 36 { 37 private InputStream streamSource = null; 38 39 private String transport = "socket"; 40 private String host = "localhost"; 41 private int port = 5405; 42 43 private Connector connector = null; 44 45 private static final Logger log = Logger.getLogger(StreamServer.class); 46 47 public static final String STREAM_TRANSPORT_KEY = "remoting.stream.transport"; 48 public static final String STREAM_HOST_KEY = "remoting.stream.host"; 49 public static final String STREAM_PORT_KEY = "remoting.stream.port"; 50 51 52 59 public StreamServer(InputStream stream) throws Exception 60 { 61 this.streamSource = stream; 62 String locatorURI = getLocatorURI(); 63 setupServer(locatorURI); 64 } 65 66 private String getLocatorURI() 67 { 68 transport = System.getProperty(STREAM_TRANSPORT_KEY, transport); 70 try 71 { 72 host = InetAddress.getLocalHost().getHostName(); 73 } 74 catch(UnknownHostException e) 75 { 76 try 77 { 78 host = InetAddress.getLocalHost().getHostAddress(); 79 } 80 catch(UnknownHostException e1) 81 { 82 log.error("Stream server could not determine local host or address."); 83 } 84 } 85 host = System.getProperty(STREAM_HOST_KEY, host); 86 87 String sPort = System.getProperty(STREAM_PORT_KEY, "" + port); 88 try 89 { 90 port = Integer.parseInt(sPort); 91 } 92 catch(NumberFormatException e) 93 { 94 log.error("Stream server could not convert specified port " + sPort + " to a number."); 95 } 96 97 return transport + "://" + host + ":" + port; 98 } 99 100 106 public String getInvokerLocator() throws Exception 107 { 108 String locator = null; 109 110 if(connector != null) 111 { 112 locator = connector.getInvokerLocator(); 113 } 114 return locator; 115 } 116 117 public void setupServer(String locatorURI) throws Exception 118 { 119 InvokerLocator locator = new InvokerLocator(locatorURI); 120 121 connector = new Connector(); 122 connector.setInvokerLocator(locator.getLocatorURI()); 123 connector.create(); 124 125 ServerInvocationHandler invocationHandler = new Handler(); 126 connector.addInvocationHandler("stream", invocationHandler); 127 128 connector.start(); 129 130 } 131 132 136 public class Handler implements ServerInvocationHandler 137 { 138 public Object invoke(InvocationRequest invocation) throws Throwable 139 { 140 Object obj = invocation.getParameter(); 141 142 if(obj instanceof StreamCallPayload) 144 { 145 StreamCallPayload payload = (StreamCallPayload) obj; 146 String method = payload.getMethod(); 147 148 if(StreamHandler.READ.equals(method)) 149 { 150 int i = streamSource.read(); 151 return new Integer (i); 152 } 153 else if(StreamHandler.AVAILABLE.equals(method)) 154 { 155 int i = streamSource.available(); 156 return new Integer (i); 157 } 158 else if(StreamHandler.CLOSE.equals(method)) 159 { 160 streamSource.close(); 161 } 162 else if(StreamHandler.RESET.equals(method)) 163 { 164 streamSource.reset(); 165 } 166 else if(StreamHandler.MARKSUPPORTED.equals(method)) 167 { 168 boolean b = streamSource.markSupported(); 169 return new Boolean (b); 170 } 171 else if(StreamHandler.MARKREADLIMIT.equals(method)) 172 { 173 Object [] param = payload.getParams(); 174 Integer intr = (Integer ) param[0]; 175 int readLimit = intr.intValue(); 176 streamSource.mark(readLimit); 177 } 178 else if(StreamHandler.SKIP.equals(method)) 179 { 180 Object [] param = payload.getParams(); 181 Long lg = (Long ) param[0]; 182 long n = lg.longValue(); 183 long ret = streamSource.skip(n); 184 return new Long (ret); 185 } 186 else if(StreamHandler.READBYTEARRAY.equals(method)) 187 { 188 Object [] param = payload.getParams(); 189 byte[] byteParam = (byte[]) param[0]; 190 int i = streamSource.read(byteParam); 191 StreamCallPayload ret = new StreamCallPayload(StreamHandler.READBYTEARRAY); 192 ret.setParams(new Object []{byteParam, new Integer (i)}); 193 return ret; 194 } 195 else 196 { 197 throw new Exception ("Unsupported method call - " + method); 198 } 199 } 200 else 201 { 202 log.error("Can not process invocation request because is not of type StreamCallPayload."); 203 throw new Exception ("Invalid payload type. Must be of type StreamCallPayload."); 204 } 205 return null; 206 } 207 208 214 public void addListener(InvokerCallbackHandler callbackHandler) 215 { 216 } 218 219 225 public void removeListener(InvokerCallbackHandler callbackHandler) 226 { 227 } 229 230 235 public void setMBeanServer(MBeanServer server) 236 { 237 } 239 240 245 public void setInvoker(ServerInvoker invoker) 246 { 247 } 249 } 250 251 } | Popular Tags |