KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > tc > net > protocol > tcm > AbstractMessageChannel


1 /*
2  * All content copyright (c) 2003-2006 Terracotta, Inc., except as may otherwise be noted in a separate copyright notice. All rights reserved.
3  */

4 package com.tc.net.protocol.tcm;
5
6 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
7 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
8 import EDU.oswego.cs.dl.util.concurrent.SynchronizedRef;
9
10 import com.tc.async.api.Sink;
11 import com.tc.bytes.TCByteBuffer;
12 import com.tc.logging.TCLogger;
13 import com.tc.net.MaxConnectionsExceededException;
14 import com.tc.net.TCSocketAddress;
15 import com.tc.net.protocol.NetworkLayer;
16 import com.tc.net.protocol.NetworkStackID;
17 import com.tc.net.protocol.TCNetworkMessage;
18 import com.tc.net.protocol.transport.MessageTransport;
19 import com.tc.util.Assert;
20 import com.tc.util.TCTimeoutException;
21
22 import java.io.IOException JavaDoc;
23 import java.net.UnknownHostException JavaDoc;
24 import java.util.Iterator JavaDoc;
25 import java.util.Map JavaDoc;
26 import java.util.Set JavaDoc;
27
28 /**
29  * @author teck
30  */

31 abstract class AbstractMessageChannel implements MessageChannel, MessageChannelInternal {
32   
33   private final Map attachments = new ConcurrentReaderHashMap();
34   private final Object JavaDoc attachmentLock = new Object JavaDoc();
35   private final Set listeners = new CopyOnWriteArraySet();
36   private final ChannelStatus status = new ChannelStatus();
37   private final SynchronizedRef remoteAddr = new SynchronizedRef(null);
38   private final SynchronizedRef localAddr = new SynchronizedRef(null);
39   private final TCMessageFactory msgFactory;
40   private final TCMessageRouter router;
41   private final TCMessageParser parser;
42   private final TCLogger logger;
43
44   protected NetworkLayer sendLayer;
45
46   AbstractMessageChannel(TCMessageRouter router, TCLogger logger, TCMessageFactory msgFactory) {
47     this.router = router;
48     this.logger = logger;
49     this.msgFactory = msgFactory;
50     this.parser = new TCMessageParser(this.msgFactory);
51   }
52
53   public void addAttachment(String JavaDoc key, Object JavaDoc value, boolean replace) {
54     synchronized (attachmentLock) {
55       boolean exists = attachments.containsKey(key);
56       if (replace || !exists) {
57         attachments.put(key, value);
58       }
59     }
60   }
61
62   public Object JavaDoc removeAttachment(String JavaDoc key) {
63     return this.attachments.remove(key);
64   }
65
66   public Object JavaDoc getAttachment(String JavaDoc key) {
67     return this.attachments.get(key);
68   }
69
70   public boolean isOpen() {
71     return this.status.isOpen();
72   }
73
74   public boolean isClosed() {
75     return this.status.isClosed();
76   }
77
78   public void addListener(ChannelEventListener listener) {
79     if (listener == null) { return; }
80
81     listeners.add(listener);
82   }
83
84   public TCMessage createMessage(TCMessageType type) {
85     TCMessage rv = this.msgFactory.createMessage(this, type);
86     // TODO: set default channel specific information in the TC message header
87

88     return rv;
89   }
90
91   public void routeMessageType(TCMessageType messageType, TCMessageSink dest) {
92     router.routeMessageType(messageType, dest);
93   }
94
95   public void unrouteMessageType(TCMessageType messageType) {
96     router.unrouteMessageType(messageType);
97   }
98
99   public abstract NetworkStackID open() throws MaxConnectionsExceededException, TCTimeoutException, UnknownHostException JavaDoc, IOException JavaDoc;
100
101   /**
102    * Routes a TCMessage to a sink. The hydrate sink will do the hydrate() work
103    */

104   public void routeMessageType(TCMessageType messageType, Sink destSink, Sink hydrateSink) {
105     routeMessageType(messageType, new TCMessageSinkToSedaSink(destSink, hydrateSink));
106   }
107
108   public void close() {
109     synchronized (status) {
110       if (!status.isClosed()) {
111         Assert.assertNotNull(this.sendLayer);
112         this.sendLayer.close();
113       }
114       status.close();
115     }
116   }
117
118   public final boolean isConnected() {
119     return this.sendLayer != null && this.sendLayer.isConnected();
120   }
121
122   public final void setSendLayer(NetworkLayer layer) {
123     this.sendLayer = layer;
124   }
125
126   public final void setReceiveLayer(NetworkLayer layer) {
127     throw new UnsupportedOperationException JavaDoc();
128   }
129
130   public final void send(final TCNetworkMessage message) {
131     if (logger.isDebugEnabled()) {
132       final Runnable JavaDoc logMsg = new Runnable JavaDoc() {
133         public void run() {
134           logger.debug("Message Sent: " + message.toString());
135         }
136       };
137
138       final Runnable JavaDoc existingCallback = message.getSentCallback();
139       final Runnable JavaDoc newCallback;
140
141       if (existingCallback != null) {
142         newCallback = new Runnable JavaDoc() {
143           public void run() {
144             try {
145               existingCallback.run();
146             } catch (Exception JavaDoc e) {
147               logger.error(e);
148             } finally {
149               logMsg.run();
150             }
151           }
152         };
153       } else {
154         newCallback = logMsg;
155       }
156
157       message.setSentCallback(newCallback);
158     }
159
160     this.sendLayer.send(message);
161   }
162
163   public final void receive(TCByteBuffer[] msgData) {
164     this.router.putMessage(parser.parseMessage(this, msgData));
165   }
166
167   protected final ChannelStatus getStatus() {
168     return status;
169   }
170
171   public void notifyTransportDisconnected(MessageTransport transport) {
172     this.remoteAddr.set(null);
173     this.localAddr.set(null);
174     fireTransportDisconnectedEvent();
175   }
176
177   protected void fireTransportDisconnectedEvent() {
178     fireEvent(new ChannelEventImpl(ChannelEventType.TRANSPORT_DISCONNECTED_EVENT, AbstractMessageChannel.this));
179   }
180
181   public void notifyTransportConnected(MessageTransport transport) {
182     this.remoteAddr.set(transport.getRemoteAddress());
183     this.localAddr.set(transport.getLocalAddress());
184     fireEvent(new ChannelEventImpl(ChannelEventType.TRANSPORT_CONNECTED_EVENT, AbstractMessageChannel.this));
185   }
186
187   public void notifyTransportConnectAttempt(MessageTransport transport) {
188     return;
189   }
190
191   public void notifyTransportClosed(MessageTransport transport) {
192     // yeah, we know. We closed it.
193
return;
194   }
195
196   public TCSocketAddress getLocalAddress() {
197     return (TCSocketAddress) this.localAddr.get();
198   }
199
200   public TCSocketAddress getRemoteAddress() {
201     return (TCSocketAddress) this.remoteAddr.get();
202   }
203
204   private void fireEvent(ChannelEventImpl event) {
205     for (Iterator JavaDoc i = listeners.iterator(); i.hasNext();) {
206       ((ChannelEventListener) i.next()).notifyChannelEvent(event);
207     }
208   }
209
210   class ChannelStatus {
211     private ChannelState state;
212
213     public ChannelStatus() {
214       this.state = ChannelState.CLOSED;
215     }
216
217     // this method non-public on purpose. Only the channel should change it's own status
218
synchronized void close() {
219       changeState(ChannelState.CLOSED);
220       fireEvent(new ChannelEventImpl(ChannelEventType.CHANNEL_CLOSED_EVENT, AbstractMessageChannel.this));
221     }
222
223     // this method non-public on purpose. Only the channel should change it's own status
224
synchronized void open() {
225       changeState(ChannelState.OPEN);
226       fireEvent(new ChannelEventImpl(ChannelEventType.CHANNEL_OPENED_EVENT, AbstractMessageChannel.this));
227     }
228
229     synchronized boolean isOpen() {
230       return ChannelState.OPEN.equals(state);
231     }
232
233     synchronized boolean isClosed() {
234       return ChannelState.CLOSED.equals(state);
235     }
236
237     private synchronized void changeState(ChannelState newState) {
238       state = newState;
239     }
240   }
241
242   private static class ChannelState {
243     private static final int STATE_OPEN = 1;
244     private static final int STATE_CLOSED = 2;
245
246     static final ChannelState OPEN = new ChannelState(STATE_OPEN);
247     static final ChannelState CLOSED = new ChannelState(STATE_CLOSED);
248
249     private final int state;
250
251     private ChannelState(int state) {
252       this.state = state;
253     }
254
255     public String JavaDoc toString() {
256       switch (state) {
257         case STATE_OPEN:
258           return "OPEN";
259         case STATE_CLOSED:
260           return "CLOSED";
261         default:
262           return "UNKNOWN";
263       }
264     }
265   }
266 }
Popular Tags