1 10 11 package org.mule.providers.stream; 12 13 import java.io.InputStream ; 14 import java.io.PrintStream ; 15 16 import org.apache.commons.lang.SystemUtils; 17 import org.mule.impl.MuleMessage; 18 import org.mule.providers.PollingMessageReceiver; 19 import org.mule.umo.UMOComponent; 20 import org.mule.umo.UMOMessage; 21 import org.mule.umo.endpoint.UMOEndpoint; 22 import org.mule.umo.lifecycle.InitialisationException; 23 import org.mule.umo.provider.UMOConnector; 24 25 29 public class StreamMessageReceiver extends PollingMessageReceiver 30 { 31 public static final int DEFAULT_BUFFER_SIZE = 4096; 32 33 private int bufferSize = DEFAULT_BUFFER_SIZE; 34 private InputStream inputStream; 35 private StreamConnector connector; 36 37 public StreamMessageReceiver(UMOConnector connector, 38 UMOComponent component, 39 UMOEndpoint endpoint, 40 Long checkFrequency) throws InitialisationException 41 { 42 super(connector, component, endpoint, checkFrequency); 43 44 this.connector = (StreamConnector)connector; 45 String streamName = endpoint.getEndpointURI().getAddress(); 46 if (StreamConnector.STREAM_SYSTEM_IN.equalsIgnoreCase(streamName)) 47 { 48 inputStream = System.in; 49 } 50 else 51 { 52 inputStream = this.connector.getInputStream(); 53 } 54 55 if (connector instanceof SystemStreamConnector) 57 { 58 SystemStreamConnector ssc = (SystemStreamConnector)connector; 59 60 String promptMessage = (String )endpoint.getProperties().get("promptMessage"); 61 if (promptMessage != null) 62 { 63 ssc.setPromptMessage(promptMessage); 64 } 65 } 66 } 67 68 public void doConnect() throws Exception 69 { 70 if (connector instanceof SystemStreamConnector) 71 { 72 SystemStreamConnector ssc = (SystemStreamConnector)connector; 73 DelayedMessageWriter writer = new DelayedMessageWriter(ssc); 74 writer.start(); 75 } 76 } 77 78 public void doDisconnect() throws Exception 79 { 80 } 82 83 88 public void poll() 89 { 90 try 91 { 92 byte[] inputBuffer = new byte[bufferSize]; 93 int len = inputStream.read(inputBuffer); 94 95 if (len == -1) 96 { 97 return; 98 } 99 100 StringBuffer fullBuffer = new StringBuffer (bufferSize); 101 while (len > 0) 102 { 103 fullBuffer.append(new String (inputBuffer, 0, len)); 104 len = 0; if (inputStream.available() > 0) 106 { 107 len = inputStream.read(inputBuffer); 108 } 109 } 110 111 String finalMessageString; 113 int noCRLFLength = fullBuffer.length() - SystemUtils.LINE_SEPARATOR.length(); 114 if (fullBuffer.indexOf(SystemUtils.LINE_SEPARATOR, noCRLFLength) != -1) 115 { 116 finalMessageString = fullBuffer.substring(0, noCRLFLength); 117 } 118 else 119 { 120 finalMessageString = fullBuffer.toString(); 121 } 122 123 UMOMessage umoMessage = new MuleMessage(connector.getMessageAdapter(finalMessageString)); 124 routeMessage(umoMessage, endpoint.isSynchronous()); 125 126 doConnect(); 127 } 128 catch (Exception e) 129 { 130 handleException(e); 131 } 132 } 133 134 public InputStream getInputStream() 135 { 136 return inputStream; 137 } 138 139 public void setInputStream(InputStream inputStream) 140 { 141 this.inputStream = inputStream; 142 } 143 144 public int getBufferSize() 145 { 146 return bufferSize; 147 } 148 149 public void setBufferSize(int bufferSize) 150 { 151 this.bufferSize = bufferSize; 152 } 153 154 private class DelayedMessageWriter extends Thread 155 { 156 private long delay = 0; 157 private SystemStreamConnector ssc; 158 159 public DelayedMessageWriter(SystemStreamConnector ssc) 160 { 161 this.delay = ssc.getMessageDelayTime(); 162 this.ssc = ssc; 163 } 164 165 public void run() 166 { 167 if (delay > 0) 168 { 169 try 170 { 171 sleep(delay); 173 } 174 catch (InterruptedException e1) 175 { 176 } 178 } 179 ((PrintStream )ssc.getOutputStream()).println(); 180 ((PrintStream )ssc.getOutputStream()).print(ssc.getPromptMessage()); 181 } 182 } 183 } 184 | Popular Tags |