KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > mule > providers > stream > StreamMessageReceiver


1 /*
2  * $Id: StreamMessageReceiver.java 3937 2006-11-20 16:04:25Z lajos $
3  * --------------------------------------------------------------------------------------
4  * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
5  *
6  * The software in this package is published under the terms of the MuleSource MPL
7  * license, a copy of which has been included with this distribution in the
8  * LICENSE.txt file.
9  */

10
11 package org.mule.providers.stream;
12
13 import java.io.InputStream JavaDoc;
14 import java.io.PrintStream JavaDoc;
15
16 import org.apache.commons.lang.SystemUtils;
17 import org.mule.impl.MuleMessage;
18 import org.mule.providers.PollingMessageReceiver;
19 import org.mule.umo.UMOComponent;
20 import org.mule.umo.UMOMessage;
21 import org.mule.umo.endpoint.UMOEndpoint;
22 import org.mule.umo.lifecycle.InitialisationException;
23 import org.mule.umo.provider.UMOConnector;
24
25 /**
26  * <code>StreamMessageReceiver</code> is a listener for events from Mule components
27  * which then simply passes the events on to the target components.
28  */

29 public class StreamMessageReceiver extends PollingMessageReceiver
30 {
31     public static final int DEFAULT_BUFFER_SIZE = 4096;
32
33     private int bufferSize = DEFAULT_BUFFER_SIZE;
34     private InputStream JavaDoc inputStream;
35     private StreamConnector connector;
36
37     public StreamMessageReceiver(UMOConnector connector,
38                                  UMOComponent component,
39                                  UMOEndpoint endpoint,
40                                  Long JavaDoc checkFrequency) throws InitialisationException
41     {
42         super(connector, component, endpoint, checkFrequency);
43
44         this.connector = (StreamConnector)connector;
45         String JavaDoc streamName = endpoint.getEndpointURI().getAddress();
46         if (StreamConnector.STREAM_SYSTEM_IN.equalsIgnoreCase(streamName))
47         {
48             inputStream = System.in;
49         }
50         else
51         {
52             inputStream = this.connector.getInputStream();
53         }
54
55         // apply connector-specific properties
56
if (connector instanceof SystemStreamConnector)
57         {
58             SystemStreamConnector ssc = (SystemStreamConnector)connector;
59
60             String JavaDoc promptMessage = (String JavaDoc)endpoint.getProperties().get("promptMessage");
61             if (promptMessage != null)
62             {
63                 ssc.setPromptMessage(promptMessage);
64             }
65         }
66     }
67
68     public void doConnect() throws Exception JavaDoc
69     {
70         if (connector instanceof SystemStreamConnector)
71         {
72             SystemStreamConnector ssc = (SystemStreamConnector)connector;
73             DelayedMessageWriter writer = new DelayedMessageWriter(ssc);
74             writer.start();
75         }
76     }
77
78     public void doDisconnect() throws Exception JavaDoc
79     {
80         // noop
81
}
82
83     /*
84      * (non-Javadoc)
85      *
86      * @see org.mule.util.timer.TimeEventListener#timeExpired(org.mule.util.timer.TimeEvent)
87      */

88     public void poll()
89     {
90         try
91         {
92             byte[] inputBuffer = new byte[bufferSize];
93             int len = inputStream.read(inputBuffer);
94
95             if (len == -1)
96             {
97                 return;
98             }
99
100             StringBuffer JavaDoc fullBuffer = new StringBuffer JavaDoc(bufferSize);
101             while (len > 0)
102             {
103                 fullBuffer.append(new String JavaDoc(inputBuffer, 0, len));
104                 len = 0; // mark as read
105
if (inputStream.available() > 0)
106                 {
107                     len = inputStream.read(inputBuffer);
108                 }
109             }
110
111             // remove any trailing CR/LF
112
String JavaDoc finalMessageString;
113             int noCRLFLength = fullBuffer.length() - SystemUtils.LINE_SEPARATOR.length();
114             if (fullBuffer.indexOf(SystemUtils.LINE_SEPARATOR, noCRLFLength) != -1)
115             {
116                 finalMessageString = fullBuffer.substring(0, noCRLFLength);
117             }
118             else
119             {
120                 finalMessageString = fullBuffer.toString();
121             }
122
123             UMOMessage umoMessage = new MuleMessage(connector.getMessageAdapter(finalMessageString));
124             routeMessage(umoMessage, endpoint.isSynchronous());
125
126             doConnect();
127         }
128         catch (Exception JavaDoc e)
129         {
130             handleException(e);
131         }
132     }
133
134     public InputStream JavaDoc getInputStream()
135     {
136         return inputStream;
137     }
138
139     public void setInputStream(InputStream JavaDoc inputStream)
140     {
141         this.inputStream = inputStream;
142     }
143
144     public int getBufferSize()
145     {
146         return bufferSize;
147     }
148
149     public void setBufferSize(int bufferSize)
150     {
151         this.bufferSize = bufferSize;
152     }
153
154     private class DelayedMessageWriter extends Thread JavaDoc
155     {
156         private long delay = 0;
157         private SystemStreamConnector ssc;
158
159         public DelayedMessageWriter(SystemStreamConnector ssc)
160         {
161             this.delay = ssc.getMessageDelayTime();
162             this.ssc = ssc;
163         }
164
165         public void run()
166         {
167             if (delay > 0)
168             {
169                 try
170                 {
171                     // Allow all other console message to be printed out first
172
sleep(delay);
173                 }
174                 catch (InterruptedException JavaDoc e1)
175                 {
176                     // ignore
177
}
178             }
179             ((PrintStream JavaDoc)ssc.getOutputStream()).println();
180             ((PrintStream JavaDoc)ssc.getOutputStream()).print(ssc.getPromptMessage());
181         }
182     }
183 }
184
Popular Tags