KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > sun > corba > se > impl > transport > SocketOrChannelConnectionImpl


1 /*
2  * @(#)SocketOrChannelConnectionImpl.java 1.92 07/08/23
3  *
4  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
5  * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
6  */

7
8 package com.sun.corba.se.impl.transport;
9
10 import java.io.IOException JavaDoc;
11 import java.net.InetSocketAddress JavaDoc;
12 import java.net.Socket JavaDoc;
13 import java.nio.ByteBuffer JavaDoc;
14 import java.nio.channels.SelectableChannel JavaDoc;
15 import java.nio.channels.SelectionKey JavaDoc;
16 import java.nio.channels.SocketChannel JavaDoc;
17 import java.security.AccessController JavaDoc;
18 import java.security.PrivilegedAction JavaDoc;
19 import java.util.Collections JavaDoc;
20 import java.util.Hashtable JavaDoc;
21 import java.util.HashMap JavaDoc;
22 import java.util.Map JavaDoc;
23
24 import org.omg.CORBA.COMM_FAILURE JavaDoc;
25 import org.omg.CORBA.CompletionStatus JavaDoc;
26 import org.omg.CORBA.DATA_CONVERSION JavaDoc;
27 import org.omg.CORBA.INTERNAL JavaDoc;
28 import org.omg.CORBA.MARSHAL JavaDoc;
29 import org.omg.CORBA.OBJECT_NOT_EXIST JavaDoc;
30 import org.omg.CORBA.SystemException JavaDoc;
31
32 import com.sun.org.omg.SendingContext.CodeBase;
33
34 import com.sun.corba.se.pept.broker.Broker;
35 import com.sun.corba.se.pept.encoding.InputObject;
36 import com.sun.corba.se.pept.encoding.OutputObject;
37 import com.sun.corba.se.pept.protocol.MessageMediator;
38 import com.sun.corba.se.pept.transport.Acceptor;
39 import com.sun.corba.se.pept.transport.Connection;
40 import com.sun.corba.se.pept.transport.ConnectionCache;
41 import com.sun.corba.se.pept.transport.ContactInfo;
42 import com.sun.corba.se.pept.transport.EventHandler;
43 import com.sun.corba.se.pept.transport.InboundConnectionCache;
44 import com.sun.corba.se.pept.transport.OutboundConnectionCache;
45 import com.sun.corba.se.pept.transport.ResponseWaitingRoom;
46 import com.sun.corba.se.pept.transport.Selector;
47
48 import com.sun.corba.se.spi.ior.IOR;
49 import com.sun.corba.se.spi.ior.iiop.GIOPVersion;
50 import com.sun.corba.se.spi.logging.CORBALogDomains;
51 import com.sun.corba.se.spi.orb.ORB ;
52 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
53 import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
54 import com.sun.corba.se.spi.orbutil.threadpool.Work;
55 import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
56 import com.sun.corba.se.spi.transport.CorbaContactInfo;
57 import com.sun.corba.se.spi.transport.CorbaConnection;
58 import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
59 import com.sun.corba.se.spi.transport.ReadTimeouts;
60
61 import com.sun.corba.se.impl.encoding.CachedCodeBase;
62 import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;
63 import com.sun.corba.se.impl.encoding.CDROutputObject;
64 import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;
65 import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;
66 import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;
67 import com.sun.corba.se.impl.logging.ORBUtilSystemException;
68 import com.sun.corba.se.impl.orbutil.ORBConstants;
69 import com.sun.corba.se.impl.orbutil.ORBUtility;
70 import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
71 import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;
72 import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;
73
74 /**
75  * @author Harold Carr
76  */

77 public class SocketOrChannelConnectionImpl
78     extends
79     EventHandlerBase
80     implements
81         CorbaConnection,
82     Work
83 {
84     public static boolean dprintWriteLocks = false;
85
86     //
87
// New transport.
88
//
89

90     protected long enqueueTime;
91
92     protected SocketChannel JavaDoc socketChannel;
93     public SocketChannel JavaDoc getSocketChannel()
94     {
95     return socketChannel;
96     }
97
98     // REVISIT:
99
// protected for test: genericRPCMSGFramework.IIOPConnection constructor.
100
protected CorbaContactInfo contactInfo;
101     protected Acceptor acceptor;
102     protected ConnectionCache connectionCache;
103
104     //
105
// From iiop.Connection.java
106
//
107

108     protected Socket JavaDoc socket; // The socket used for this connection.
109
protected long timeStamp = 0;
110     protected boolean isServer = false;
111
112     // Start at some value other than zero since this is a magic
113
// value in some protocols.
114
protected int requestId = 5;
115     protected CorbaResponseWaitingRoom responseWaitingRoom;
116     protected int state;
117     protected java.lang.Object JavaDoc stateEvent = new java.lang.Object JavaDoc();
118     protected java.lang.Object JavaDoc writeEvent = new java.lang.Object JavaDoc();
119     protected boolean writeLocked;
120     protected int serverRequestCount = 0;
121     
122     // Server request map: used on the server side of Connection
123
// Maps request ID to IIOPInputStream.
124
Map JavaDoc serverRequestMap = null;
125
126     // This is a flag associated per connection telling us if the
127
// initial set of sending contexts were sent to the receiver
128
// already...
129
protected boolean postInitialContexts = false;
130  
131     // Remote reference to CodeBase server (supplies
132
// FullValueDescription, among other things)
133
protected IOR codeBaseServerIOR;
134
135     // CodeBase cache for this connection. This will cache remote operations,
136
// handle connecting, and ensure we don't do any remote operations until
137
// necessary.
138
protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this);
139
140     protected ORBUtilSystemException wrapper ;
141
142     // transport read timeout values
143
protected ReadTimeouts readTimeouts;
144
145     protected boolean shouldReadGiopHeaderOnly;
146
147     // A message mediator used when shouldReadGiopHeaderOnly is
148
// true to maintain request message state across execution in a
149
// SelectorThread and WorkerThread.
150
protected CorbaMessageMediator partialMessageMediator = null;
151
152     // Used in genericRPCMSGFramework test.
153
protected SocketOrChannelConnectionImpl(ORB orb)
154     {
155     this.orb = orb;
156     wrapper = ORBUtilSystemException.get( orb,
157         CORBALogDomains.RPC_TRANSPORT ) ;
158
159     setWork(this);
160     responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this);
161     setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts());
162     }
163
164     // Both client and servers.
165
protected SocketOrChannelConnectionImpl(ORB orb,
166                         boolean useSelectThreadToWait,
167                         boolean useWorkerThread)
168     {
169     this(orb) ;
170     setUseSelectThreadToWait(useSelectThreadToWait);
171     setUseWorkerThreadForEvent(useWorkerThread);
172     }
173
174     // Client constructor.
175
public SocketOrChannelConnectionImpl(ORB orb,
176                      CorbaContactInfo contactInfo,
177                      boolean useSelectThreadToWait,
178                      boolean useWorkerThread,
179                      String JavaDoc socketType,
180                      String JavaDoc hostname,
181                      int port)
182     {
183     this(orb, useSelectThreadToWait, useWorkerThread);
184
185     this.contactInfo = contactInfo;
186
187     try {
188         socket = orb.getORBData().getSocketFactory()
189         .createSocket(socketType,
190                   new InetSocketAddress JavaDoc(hostname, port));
191         socketChannel = socket.getChannel();
192
193         if (socketChannel != null) {
194         boolean isBlocking = !useSelectThreadToWait;
195         socketChannel.configureBlocking(isBlocking);
196         } else {
197         // IMPORTANT: non-channel-backed sockets must use
198
// dedicated reader threads.
199
setUseSelectThreadToWait(false);
200         }
201         if (orb.transportDebugFlag) {
202         dprint(".initialize: connection created: " + socket);
203         }
204     } catch (Throwable JavaDoc t) {
205         throw wrapper.connectFailure(t, socketType, hostname,
206                      Integer.toString(port));
207     }
208     state = OPENING;
209     }
210
211     // Client-side convenience.
212
public SocketOrChannelConnectionImpl(ORB orb,
213                      CorbaContactInfo contactInfo,
214                      String JavaDoc socketType,
215                      String JavaDoc hostname,
216                      int port)
217     {
218     this(orb, contactInfo,
219          orb.getORBData().connectionSocketUseSelectThreadToWait(),
220          orb.getORBData().connectionSocketUseWorkerThreadForEvent(),
221          socketType, hostname, port);
222     }
223
224     // Server-side constructor.
225
public SocketOrChannelConnectionImpl(ORB orb,
226                      Acceptor acceptor,
227                      Socket JavaDoc socket,
228                      boolean useSelectThreadToWait,
229                      boolean useWorkerThread)
230     {
231     this(orb, useSelectThreadToWait, useWorkerThread);
232
233     this.socket = socket;
234     socketChannel = socket.getChannel();
235     if (socketChannel != null) {
236         // REVISIT
237
try {
238         boolean isBlocking = !useSelectThreadToWait;
239         socketChannel.configureBlocking(isBlocking);
240         } catch (IOException JavaDoc e) {
241         RuntimeException JavaDoc rte = new RuntimeException JavaDoc();
242         rte.initCause(e);
243         throw rte;
244         }
245     }
246     this.acceptor = acceptor;
247
248     serverRequestMap = Collections.synchronizedMap(new HashMap JavaDoc());
249         isServer = true;
250
251     state = ESTABLISHED;
252     }
253
254     // Server-side convenience
255
public SocketOrChannelConnectionImpl(ORB orb,
256                      Acceptor acceptor,
257                      Socket JavaDoc socket)
258     {
259     this(orb, acceptor, socket,
260          (socket.getChannel() == null
261           ? false
262           : orb.getORBData().connectionSocketUseSelectThreadToWait()),
263          (socket.getChannel() == null
264           ? false
265           : orb.getORBData().connectionSocketUseWorkerThreadForEvent()));
266     }
267
268     ////////////////////////////////////////////////////
269
//
270
// framework.transport.Connection
271
//
272

273     public boolean shouldRegisterReadEvent()
274     {
275     return true;
276     }
277
278     public boolean shouldRegisterServerReadEvent()
279     {
280     return true;
281     }
282
283     public boolean read()
284     {
285     try {
286         if (orb.transportDebugFlag) {
287         dprint(".read->: " + this);
288         }
289         CorbaMessageMediator messageMediator = readBits();
290         if (messageMediator != null) {
291         // Null can happen when client closes stream
292
// causing purgecalls.
293
return dispatch(messageMediator);
294         }
295         return true;
296     } finally {
297         if (orb.transportDebugFlag) {
298         dprint(".read<-: " + this);
299         }
300     }
301     }
302
303     protected CorbaMessageMediator readBits()
304     {
305     try {
306
307         if (orb.transportDebugFlag) {
308         dprint(".readBits->: " + this);
309         }
310
311         MessageMediator messageMediator;
312         // REVISIT - use common factory base class.
313
if (contactInfo != null) {
314         messageMediator =
315             contactInfo.createMessageMediator(orb, this);
316         } else if (acceptor != null) {
317         messageMediator = acceptor.createMessageMediator(orb, this);
318         } else {
319         throw
320             new RuntimeException JavaDoc("SocketOrChannelConnectionImpl.readBits");
321         }
322         return (CorbaMessageMediator) messageMediator;
323
324     } catch (ThreadDeath JavaDoc td) {
325         if (orb.transportDebugFlag) {
326         dprint(".readBits: " + this + ": ThreadDeath: " + td, td);
327         }
328         try {
329         purgeCalls(wrapper.connectionAbort(td), false, false);
330         } catch (Throwable JavaDoc t) {
331         if (orb.transportDebugFlag) {
332             dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t);
333         }
334         }
335         throw td;
336     } catch (Throwable JavaDoc ex) {
337         if (orb.transportDebugFlag) {
338         dprint(".readBits: " + this + ": Throwable: " + ex, ex);
339         }
340
341         try {
342         if (ex instanceof INTERNAL JavaDoc) {
343             sendMessageError(GIOPVersion.DEFAULT_VERSION);
344         }
345         } catch (IOException JavaDoc e) {
346         if (orb.transportDebugFlag) {
347             dprint(".readBits: " + this +
348                ": sendMessageError: IOException: " + e, e);
349         }
350         }
351         // REVISIT - make sure reader thread is killed.
352
orb.getTransportManager().getSelector(0).unregisterForEvent(this);
353         // Notify anyone waiting.
354
purgeCalls(wrapper.connectionAbort(ex), true, false);
355         // REVISIT
356
//keepRunning = false;
357
// REVISIT - if this is called after purgeCalls then
358
// the state of the socket is ABORT so the writeLock
359
// in close throws an exception. It is ignored but
360
// causes IBM (screen scraping) tests to fail.
361
//close();
362
} finally {
363         if (orb.transportDebugFlag) {
364         dprint(".readBits<-: " + this);
365         }
366     }
367     return null;
368     }
369
370     protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator)
371     {
372     try {
373
374         if (orb.transportDebugFlag) {
375         dprint(".finishReadingBits->: " + this);
376         }
377
378         // REVISIT - use common factory base class.
379
if (contactInfo != null) {
380         messageMediator =
381             contactInfo.finishCreatingMessageMediator(orb, this, messageMediator);
382         } else if (acceptor != null) {
383         messageMediator =
384             acceptor.finishCreatingMessageMediator(orb, this, messageMediator);
385         } else {
386         throw
387             new RuntimeException JavaDoc("SocketOrChannelConnectionImpl.finishReadingBits");
388         }
389         return (CorbaMessageMediator) messageMediator;
390
391     } catch (ThreadDeath JavaDoc td) {
392         if (orb.transportDebugFlag) {
393         dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td);
394         }
395         try {
396         purgeCalls(wrapper.connectionAbort(td), false, false);
397         } catch (Throwable JavaDoc t) {
398         if (orb.transportDebugFlag) {
399             dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t);
400         }
401         }
402         throw td;
403     } catch (Throwable JavaDoc ex) {
404         if (orb.transportDebugFlag) {
405         dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex);
406         }
407
408         try {
409         if (ex instanceof INTERNAL JavaDoc) {
410             sendMessageError(GIOPVersion.DEFAULT_VERSION);
411         }
412         } catch (IOException JavaDoc e) {
413         if (orb.transportDebugFlag) {
414             dprint(".finishReadingBits: " + this +
415                ": sendMessageError: IOException: " + e, e);
416         }
417         }
418         // REVISIT - make sure reader thread is killed.
419
orb.getTransportManager().getSelector(0).unregisterForEvent(this);
420         // Notify anyone waiting.
421
purgeCalls(wrapper.connectionAbort(ex), true, false);
422         // REVISIT
423
//keepRunning = false;
424
// REVISIT - if this is called after purgeCalls then
425
// the state of the socket is ABORT so the writeLock
426
// in close throws an exception. It is ignored but
427
// causes IBM (screen scraping) tests to fail.
428
//close();
429
} finally {
430         if (orb.transportDebugFlag) {
431         dprint(".finishReadingBits<-: " + this);
432         }
433     }
434     return null;
435     }
436
437     protected boolean dispatch(CorbaMessageMediator messageMediator)
438     {
439     try {
440         if (orb.transportDebugFlag) {
441         dprint(".dispatch->: " + this);
442         }
443
444         //
445
// NOTE:
446
//
447
// This call is the transition from the tranport block
448
// to the protocol block.
449
//
450

451         boolean result =
452         messageMediator.getProtocolHandler()
453         .handleRequest(messageMediator);
454
455         return result;
456
457     } catch (ThreadDeath JavaDoc td) {
458         if (orb.transportDebugFlag) {
459         dprint(".dispatch: ThreadDeath", td );
460         }
461         try {
462         purgeCalls(wrapper.connectionAbort(td), false, false);
463         } catch (Throwable JavaDoc t) {
464         if (orb.transportDebugFlag) {
465             dprint(".dispatch: purgeCalls: Throwable", t);
466         }
467         }
468         throw td;
469     } catch (Throwable JavaDoc ex) {
470         if (orb.transportDebugFlag) {
471         dprint(".dispatch: Throwable", ex ) ;
472         }
473
474         try {
475         if (ex instanceof INTERNAL JavaDoc) {
476             sendMessageError(GIOPVersion.DEFAULT_VERSION);
477         }
478         } catch (IOException JavaDoc e) {
479         if (orb.transportDebugFlag) {
480             dprint(".dispatch: sendMessageError: IOException", e);
481         }
482         }
483         purgeCalls(wrapper.connectionAbort(ex), false, false);
484         // REVISIT
485
//keepRunning = false;
486
} finally {
487         if (orb.transportDebugFlag) {
488         dprint(".dispatch<-: " + this);
489         }
490     }
491
492     return true;
493     }
494
495     public boolean shouldUseDirectByteBuffers()
496     {
497     return getSocketChannel() != null;
498     }
499
500     public ByteBuffer JavaDoc read(int size, int offset, int length, long max_wait_time)
501     throws IOException JavaDoc
502     {
503     if (shouldUseDirectByteBuffers()) {
504     
505         ByteBuffer JavaDoc byteBuffer =
506         orb.getByteBufferPool().getByteBuffer(size);
507
508         if (orb.transportDebugFlag) {
509         // print address of ByteBuffer gotten from pool
510
int bbAddress = System.identityHashCode(byteBuffer);
511         StringBuffer JavaDoc sb = new StringBuffer JavaDoc(80);
512         sb.append(".read: got ByteBuffer id (");
513         sb.append(bbAddress).append(") from ByteBufferPool.");
514         String JavaDoc msgStr = sb.toString();
515         dprint(msgStr);
516         }
517         
518         byteBuffer.position(offset);
519         byteBuffer.limit(size);
520         
521         readFully(byteBuffer, length, max_wait_time);
522
523         return byteBuffer;
524     }
525
526     byte[] buf = new byte[size];
527     readFully(getSocket().getInputStream(), buf,
528           offset, length, max_wait_time);
529     ByteBuffer JavaDoc byteBuffer = ByteBuffer.wrap(buf);
530     byteBuffer.limit(size);
531     return byteBuffer;
532     }
533
534     public ByteBuffer JavaDoc read(ByteBuffer JavaDoc byteBuffer, int offset,
535                        int length, long max_wait_time)
536     throws IOException JavaDoc
537     {
538     int size = offset + length;
539     if (shouldUseDirectByteBuffers()) {
540
541         if (! byteBuffer.isDirect()) {
542         throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
543         }
544         if (size > byteBuffer.capacity()) {
545         if (orb.transportDebugFlag) {
546             // print address of ByteBuffer being released
547
int bbAddress = System.identityHashCode(byteBuffer);
548             StringBuffer JavaDoc bbsb = new StringBuffer JavaDoc(80);
549             bbsb.append(".read: releasing ByteBuffer id (")
550             .append(bbAddress).append(") to ByteBufferPool.");
551             String JavaDoc bbmsg = bbsb.toString();
552             dprint(bbmsg);
553         }
554         orb.getByteBufferPool().releaseByteBuffer(byteBuffer);
555         byteBuffer = orb.getByteBufferPool().getByteBuffer(size);
556         }
557         byteBuffer.position(offset);
558         byteBuffer.limit(size);
559         readFully(byteBuffer, length, max_wait_time);
560         byteBuffer.position(0);
561         byteBuffer.limit(size);
562         return byteBuffer;
563     }
564     if (byteBuffer.isDirect()) {
565         throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
566     }
567     byte[] buf = new byte[size];
568     readFully(getSocket().getInputStream(), buf,
569           offset, length, max_wait_time);
570     return ByteBuffer.wrap(buf);
571     }
572
573     public void readFully(ByteBuffer JavaDoc byteBuffer, int size, long max_wait_time)
574     throws IOException JavaDoc
575     {
576         int n = 0;
577     int bytecount = 0;
578     long time_to_wait = readTimeouts.get_initial_time_to_wait();
579     long total_time_in_wait = 0;
580
581     // The reading of data incorporates a strategy to detect a
582
// rogue client. The strategy is implemented as follows. As
583
// long as data is being read, at least 1 byte or more, we
584
// assume we have a well behaved client. If no data is read,
585
// then we sleep for a time to wait, re-calculate a new time to
586
// wait which is lengthier than the previous time spent waiting.
587
// Then, if the total time spent waiting does not exceed a
588
// maximum time we are willing to wait, we attempt another
589
// read. If the maximum amount of time we are willing to
590
// spend waiting for more data is exceeded, we throw an
591
// IOException.
592

593     // NOTE: Reading of GIOP headers are treated with a smaller
594
// maximum time to wait threshold. Based on extensive
595
// performance testing, all GIOP headers are being
596
// read in 1 read access.
597

598     do {
599         bytecount = getSocketChannel().read(byteBuffer);
600
601         if (bytecount < 0) {
602         throw new IOException JavaDoc("End-of-stream");
603         }
604         else if (bytecount == 0) {
605         try {
606             Thread.sleep(time_to_wait);
607             total_time_in_wait += time_to_wait;
608             time_to_wait =
609             (long)(time_to_wait*readTimeouts.get_backoff_factor());
610         }
611         catch (InterruptedException JavaDoc ie) {
612             // ignore exception
613
if (orb.transportDebugFlag) {
614             dprint("readFully(): unexpected exception "
615                 + ie.toString());
616             }
617         }
618         }
619         else {
620         n += bytecount;
621         }
622     }
623     while (n < size && total_time_in_wait < max_wait_time);
624
625     if (n < size && total_time_in_wait >= max_wait_time)
626     {
627         // failed to read entire message
628
throw wrapper.transportReadTimeoutExceeded(new Integer JavaDoc(size),
629                                       new Integer JavaDoc(n), new Long JavaDoc(max_wait_time),
630                       new Long JavaDoc(total_time_in_wait));
631     }
632
633     getConnectionCache().stampTime(this);
634     }
635
636     // To support non-channel connections.
637
public void readFully(java.io.InputStream JavaDoc is, byte[] buf,
638               int offset, int size, long max_wait_time)
639     throws IOException JavaDoc
640     {
641         int n = 0;
642     int bytecount = 0;
643     long time_to_wait = readTimeouts.get_initial_time_to_wait();
644     long total_time_in_wait = 0;
645
646     // The reading of data incorporates a strategy to detect a
647
// rogue client. The strategy is implemented as follows. As
648
// long as data is being read, at least 1 byte or more, we
649
// assume we have a well behaved client. If no data is read,
650
// then we sleep for a time to wait, re-calculate a new time to
651
// wait which is lengthier than the previous time spent waiting.
652
// Then, if the total time spent waiting does not exceed a
653
// maximum time we are willing to wait, we attempt another
654
// read. If the maximum amount of time we are willing to
655
// spend waiting for more data is exceeded, we throw an
656
// IOException.
657

658     // NOTE: Reading of GIOP headers are treated with a smaller
659
// maximum time to wait threshold. Based on extensive
660
// performance testing, all GIOP headers are being
661
// read in 1 read access.
662

663     do {
664         bytecount = is.read(buf, offset + n, size - n);
665         if (bytecount < 0) {
666         throw new IOException JavaDoc("End-of-stream");
667         }
668         else if (bytecount == 0) {
669         try {
670             Thread.sleep(time_to_wait);
671             total_time_in_wait += time_to_wait;
672             time_to_wait =
673             (long)(time_to_wait*readTimeouts.get_backoff_factor());
674         }
675         catch (InterruptedException JavaDoc ie) {
676             // ignore exception
677
if (orb.transportDebugFlag) {
678             dprint("readFully(): unexpected exception "
679                 + ie.toString());
680             }
681         }
682         }
683         else {
684         n += bytecount;
685         }
686     }
687     while (n < size && total_time_in_wait < max_wait_time);
688
689     if (n < size && total_time_in_wait >= max_wait_time)
690     {
691         // failed to read entire message
692
throw wrapper.transportReadTimeoutExceeded(new Integer JavaDoc(size),
693                                       new Integer JavaDoc(n), new Long JavaDoc(max_wait_time),
694                       new Long JavaDoc(total_time_in_wait));
695     }
696
697     getConnectionCache().stampTime(this);
698     }
699
700     public void write(ByteBuffer JavaDoc byteBuffer)
701     throws IOException JavaDoc
702     {
703     if (shouldUseDirectByteBuffers()) {
704         /* NOTE: cannot perform this test. If one ask for a
705            ByteBuffer from the pool which is bigger than the size
706            of ByteBuffers managed by the pool, then the pool will
707            return a HeapByteBuffer.
708         if (byteBuffer.hasArray()) {
709         throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket();
710         }
711         */

712             // IMPORTANT: For non-blocking SocketChannels, there's no guarantee
713
// all bytes are written on first write attempt.
714
do {
715                 getSocketChannel().write(byteBuffer);
716             }
717             while (byteBuffer.hasRemaining());
718
719     } else {
720         if (! byteBuffer.hasArray()) {
721         throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket();
722         }
723         byte[] tmpBuf = byteBuffer.array();
724         getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit());
725         getSocket().getOutputStream().flush();
726     }
727     
728     // TimeStamp connection to indicate it has been used
729
// Note granularity of connection usage is assumed for
730
// now to be that of a IIOP packet.
731
getConnectionCache().stampTime(this);
732     }
733
734     /**
735      * Note:it is possible for this to be called more than once
736      */

737     public synchronized void close()
738     {
739     try {
740         if (orb.transportDebugFlag) {
741         dprint(".close->: " + this);
742         }
743         writeLock();
744
745         // REVISIT It will be good to have a read lock on the reader thread
746
// before we proceed further, to avoid the reader thread (server side)
747
// from processing requests. This avoids the risk that a new request
748
// will be accepted by ReaderThread while the ListenerThread is
749
// attempting to close this connection.
750

751         if (isBusy()) { // we are busy!
752
writeUnlock();
753         if (orb.transportDebugFlag) {
754             dprint(".close: isBusy so no close: " + this);
755         }
756         return;
757         }
758
759         try {
760         try {
761             sendCloseConnection(GIOPVersion.V1_0);
762         } catch (Throwable JavaDoc t) {
763             wrapper.exceptionWhenSendingCloseConnection(t);
764         }
765
766         synchronized ( stateEvent ){
767             state = CLOSE_SENT;
768             stateEvent.notifyAll();
769         }
770
771         // stop the reader without causing it to do purgeCalls
772
//Exception ex = new Exception();
773
//reader.stop(ex); // REVISIT
774

775         // NOTE: !!!!!!
776
// This does writeUnlock().
777
purgeCalls(wrapper.connectionRebind(), false, true);
778
779         } catch (Exception JavaDoc ex) {
780         if (orb.transportDebugFlag) {
781             dprint(".close: exception: " + this, ex);
782         }
783         }
784         try {
785         Selector selector = orb.getTransportManager().getSelector(0);
786         selector.unregisterForEvent(this);
787         if (socketChannel != null) {
788             socketChannel.close();
789         }
790         socket.close();
791         } catch (IOException JavaDoc e) {
792         if (orb.transportDebugFlag) {
793             dprint(".close: " + this, e);
794         }
795         }
796     } finally {
797         if (orb.transportDebugFlag) {
798         dprint(".close<-: " + this);
799         }
800     }
801     }
802
803     public Acceptor getAcceptor()
804     {
805     return acceptor;
806     }
807
808     public ContactInfo getContactInfo()
809     {
810     return contactInfo;
811     }
812
813     public EventHandler getEventHandler()
814     {
815     return this;
816     }
817
818     public OutputObject createOutputObject(MessageMediator messageMediator)
819     {
820     // REVISIT - remove this method from Connection and all it subclasses.
821
throw new RuntimeException JavaDoc("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called.");
822     }
823
824     // This is used by the GIOPOutputObject in order to
825
// throw the correct error when handling code sets.
826
// Can we determine if we are on the server side by
827
// other means? XREVISIT
828
public boolean isServer()
829     {
830         return isServer;
831     }
832
833     public boolean isBusy()
834     {
835         if (serverRequestCount > 0 ||
836         getResponseWaitingRoom().numberRegistered() > 0)
837         {
838             return true;
839     } else {
840             return false;
841     }
842     }
843
844     public long getTimeStamp()
845     {
846     return timeStamp;
847     }
848
849     public void setTimeStamp(long time)
850     {
851     timeStamp = time;
852     }
853
854     public void setState(String JavaDoc stateString)
855     {
856     synchronized (stateEvent) {
857         if (stateString.equals("ESTABLISHED")) {
858         state = ESTABLISHED;
859         stateEvent.notifyAll();
860         } else {
861         // REVISIT: ASSERT
862
}
863     }
864     }
865
866     /**
867      * Sets the writeLock for this connection.
868      * If the writeLock is already set by someone else, block till the
869      * writeLock is released and can set by us.
870      * IMPORTANT: this connection's lock must be acquired before
871      * setting the writeLock and must be unlocked after setting the writeLock.
872      */

873     public void writeLock()
874     {
875       try {
876         if (dprintWriteLocks && orb.transportDebugFlag) {
877         dprint(".writeLock->: " + this);
878     }
879         // Keep looping till we can set the writeLock.
880
while ( true ) {
881         int localState = state;
882         switch ( localState ) {
883         
884         case OPENING:
885         synchronized (stateEvent) {
886             if (state != OPENING) {
887             // somebody has changed 'state' so be careful
888
break;
889             }
890             try {
891             stateEvent.wait();
892             } catch (InterruptedException JavaDoc ie) {
893             if (orb.transportDebugFlag) {
894                 dprint(".writeLock: OPENING InterruptedException: " + this);
895             }
896             }
897         }
898         // Loop back
899
break;
900         
901         case ESTABLISHED:
902         synchronized (writeEvent) {
903             if (!writeLocked) {
904             writeLocked = true;
905             return;
906             }
907         
908             try {
909             // do not stay here too long if state != ESTABLISHED
910
// Bug 4752117
911
while (state == ESTABLISHED && writeLocked) {
912                 writeEvent.wait(100);
913             }
914             } catch (InterruptedException JavaDoc ie) {
915             if (orb.transportDebugFlag) {
916                 dprint(".writeLock: ESTABLISHED InterruptedException: " + this);
917             }
918             }
919         }
920         // Loop back
921
break;
922         
923         //
924
// XXX
925
// Need to distinguish between client and server roles
926
// here probably.
927
//
928
case ABORT:
929         synchronized ( stateEvent ){
930                     if (state != ABORT) {
931                         break;
932                     }
933             throw wrapper.writeErrorSend() ;
934         }
935          
936         case CLOSE_RECVD:
937         // the connection has been closed or closing
938
// ==> throw rebind exception
939
synchronized ( stateEvent ){
940                     if (state != CLOSE_RECVD) {
941                         break;
942                     }
943             throw wrapper.connectionCloseRebind() ;
944         }
945         
946         default:
947         if (orb.transportDebugFlag) {
948             dprint(".writeLock: default: " + this);
949         }
950         // REVISIT
951
throw new RuntimeException JavaDoc(".writeLock: bad state");
952         }
953         }
954       } finally {
955         if (dprintWriteLocks && orb.transportDebugFlag) {
956         dprint(".writeLock<-: " + this);
957     }
958       }
959     }
960
961     public void writeUnlock()
962     {
963     try {
964         if (dprintWriteLocks && orb.transportDebugFlag) {
965         dprint(".writeUnlock->: " + this);
966         }
967         synchronized (writeEvent) {
968         writeLocked = false;
969         writeEvent.notify(); // wake up one guy waiting to write
970
}
971     } finally {
972         if (dprintWriteLocks && orb.transportDebugFlag) {
973         dprint(".writeUnlock<-: " + this);
974         }
975     }
976     }
977
978     // Assumes the caller handles writeLock and writeUnlock
979
public void sendWithoutLock(OutputObject outputObject)
980     {
981         // Don't we need to check for CloseConnection
982
// here? REVISIT
983

984         // XREVISIT - Shouldn't the MessageMediator
985
// be the one to handle writing the data here?
986

987         try {
988
989             // Write the fragment/message
990

991         CDROutputObject cdrOutputObject = (CDROutputObject) outputObject;
992         cdrOutputObject.writeTo(this);
993         // REVISIT - no flush?
994
//socket.getOutputStream().flush();
995

996         } catch (IOException JavaDoc e1) {
997
998             /*
999              * ADDED(Ram J) 10/13/2000 In the event of an IOException, try
1000             * sending a CancelRequest for regular requests / locate requests
1001             */

1002
1003            // Since IIOPOutputStream's msgheader is set only once, and not
1004
// altered during sending multiple fragments, the original
1005
// msgheader will always have the requestId.
1006
// REVISIT This could be optimized to send a CancelRequest only
1007
// if any fragments had been sent already.
1008

1009        /* REVISIT: MOVE TO SUBCONTRACT
1010            Message msg = os.getMessage();
1011            if (msg.getType() == Message.GIOPRequest ||
1012                    msg.getType() == Message.GIOPLocateRequest) {
1013                GIOPVersion requestVersion = msg.getGIOPVersion();
1014                int requestId = MessageBase.getRequestId(msg);
1015                try {
1016                    sendCancelRequest(requestVersion, requestId);
1017                } catch (IOException e2) {
1018                    // most likely an abortive connection closure.
1019                    // ignore, since nothing more can be done.
1020            if (orb.transportDebugFlag) {
1021            
1022                }
1023            }
1024        */

1025
1026            // REVISIT When a send failure happens, purgeCalls() need to be
1027
// called to ensure that the connection is properly removed from
1028
// further usage (ie., cancelling pending requests with COMM_FAILURE
1029
// with an appropriate minor_code CompletionStatus.MAY_BE).
1030

1031            // Relying on the IIOPOutputStream (as noted below) is not
1032
// sufficient as it handles COMM_FAILURE only for the final
1033
// fragment (during invoke processing). Note that COMM_FAILURE could
1034
// happen while sending the initial fragments.
1035
// Also the IIOPOutputStream does not properly close the connection.
1036
// It simply removes the connection from the table. An orderly
1037
// closure is needed (ie., cancel pending requests on the connection
1038
// COMM_FAILURE as well.
1039

1040            // IIOPOutputStream will cleanup the connection info when it
1041
// sees this exception.
1042
SystemException JavaDoc exc = wrapper.writeErrorSend(e1);
1043        purgeCalls(exc, false, true);
1044        throw exc;
1045        }
1046    }
1047
1048    public void registerWaiter(MessageMediator messageMediator)
1049    {
1050        responseWaitingRoom.registerWaiter(messageMediator);
1051    }
1052
1053    public void unregisterWaiter(MessageMediator messageMediator)
1054    {
1055        responseWaitingRoom.unregisterWaiter(messageMediator);
1056    }
1057
1058    public InputObject waitForResponse(MessageMediator messageMediator)
1059    {
1060    return responseWaitingRoom.waitForResponse(messageMediator);
1061    }
1062
1063    public void setConnectionCache(ConnectionCache connectionCache)
1064    {
1065    this.connectionCache = connectionCache;
1066    }
1067
1068    public ConnectionCache getConnectionCache()
1069    {
1070    return connectionCache;
1071    }
1072
1073    ////////////////////////////////////////////////////
1074
//
1075
// EventHandler methods
1076
//
1077

1078    public void setUseSelectThreadToWait(boolean x)
1079    {
1080    useSelectThreadToWait = x;
1081    // REVISIT - Reading of a GIOP header only is information
1082
// that should be passed into the constructor
1083
// from the SocketOrChannelConnection factory.
1084
setReadGiopHeaderOnly(shouldUseSelectThreadToWait());
1085    }
1086
1087    public void handleEvent()
1088    {
1089    if (orb.transportDebugFlag) {
1090        dprint(".handleEvent->: " + this);
1091    }
1092    getSelectionKey().interestOps(getSelectionKey().interestOps() &
1093                      (~ getInterestOps()));
1094
1095    if (shouldUseWorkerThreadForEvent()) {
1096        Throwable JavaDoc throwable = null;
1097        try {
1098        int poolToUse = 0;
1099        if (shouldReadGiopHeaderOnly()) {
1100            partialMessageMediator = readBits();
1101            poolToUse =
1102            partialMessageMediator.getThreadPoolToUse();
1103        }
1104
1105        if (orb.transportDebugFlag) {
1106            dprint(".handleEvent: addWork to pool: " + poolToUse);
1107        }
1108        orb.getThreadPoolManager().getThreadPool(poolToUse)
1109            .getWorkQueue(0).addWork(getWork());
1110        } catch (NoSuchThreadPoolException e) {
1111        throwable = e;
1112        } catch (NoSuchWorkQueueException e) {
1113        throwable = e;
1114        }
1115        // REVISIT: need to close connection.
1116
if (throwable != null) {
1117        if (orb.transportDebugFlag) {
1118            dprint(".handleEvent: " + throwable);
1119        }
1120        INTERNAL JavaDoc i = new INTERNAL JavaDoc("NoSuchThreadPoolException");
1121        i.initCause(throwable);
1122        throw i;
1123        }
1124    } else {
1125        if (orb.transportDebugFlag) {
1126        dprint(".handleEvent: doWork");
1127        }
1128        getWork().doWork();
1129    }
1130    if (orb.transportDebugFlag) {
1131        dprint(".handleEvent<-: " + this);
1132    }
1133    }
1134
1135    public SelectableChannel JavaDoc getChannel()
1136    {
1137    return socketChannel;
1138    }
1139
1140    public int getInterestOps()
1141    {
1142    return SelectionKey.OP_READ;
1143    }
1144
1145    // public Acceptor getAcceptor() - already defined above.
1146

1147    public Connection getConnection()
1148    {
1149    return this;
1150    }
1151
1152    ////////////////////////////////////////////////////
1153
//
1154
// Work methods.
1155
//
1156

1157    public String JavaDoc getName()
1158    {
1159    return this.toString();
1160    }
1161
1162    public void doWork()
1163    {
1164    try {
1165        if (orb.transportDebugFlag) {
1166        dprint(".doWork->: " + this);
1167        }
1168
1169        // IMPORTANT: Sanity checks on SelectionKeys such as
1170
// SelectorKey.isValid() should not be done
1171
// here.
1172
//
1173

1174            if (!shouldReadGiopHeaderOnly()) {
1175                read();
1176            }
1177        else {
1178            // get the partialMessageMediator
1179
// created by SelectorThread
1180
CorbaMessageMediator messageMediator =
1181                                     this.getPartialMessageMediator();
1182
1183            // read remaining info needed in a MessageMediator
1184
messageMediator = finishReadingBits(messageMediator);
1185
1186            if (messageMediator != null) {
1187            // Null can happen when client closes stream
1188
// causing purgecalls.
1189
dispatch(messageMediator);
1190            }
1191            }
1192    } catch (Throwable JavaDoc t) {
1193        if (orb.transportDebugFlag) {
1194        dprint(".doWork: ignoring Throwable: "
1195               + t
1196               + " " + this);
1197        }
1198    } finally {
1199        if (orb.transportDebugFlag) {
1200        dprint(".doWork<-: " + this);
1201        }
1202    }
1203    }
1204
1205    public void setEnqueueTime(long timeInMillis)
1206    {
1207    enqueueTime = timeInMillis;
1208    }
1209
1210    public long getEnqueueTime()
1211    {
1212    return enqueueTime;
1213    }
1214
1215    ////////////////////////////////////////////////////
1216
//
1217
// spi.transport.CorbaConnection.
1218
//
1219

1220    // IMPORTANT: Reader Threads must NOT read Giop header only.
1221
public boolean shouldReadGiopHeaderOnly() {
1222    return shouldReadGiopHeaderOnly;
1223    }
1224
1225    protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) {
1226    shouldReadGiopHeaderOnly = shouldReadHeaderOnly;
1227    }
1228
1229    public ResponseWaitingRoom getResponseWaitingRoom()
1230    {
1231    return responseWaitingRoom;
1232    }
1233
1234    // REVISIT - inteface defines isServer but already defined in
1235
// higher interface.
1236

1237    public void serverRequestMapPut(int requestId,
1238                    CorbaMessageMediator messageMediator)
1239    {
1240    serverRequestMap.put(new Integer JavaDoc(requestId), messageMediator);
1241    }
1242
1243    public CorbaMessageMediator serverRequestMapGet(int requestId)
1244    {
1245    return (CorbaMessageMediator)
1246        serverRequestMap.get(new Integer JavaDoc(requestId));
1247    }
1248
1249    public void serverRequestMapRemove(int requestId)
1250    {
1251    serverRequestMap.remove(new Integer JavaDoc(requestId));
1252    }
1253
1254
1255    // REVISIT: this is also defined in:
1256
// com.sun.corba.se.spi.legacy.connection.Connection
1257
public java.net.Socket JavaDoc getSocket()
1258    {
1259    return socket;
1260    }
1261
1262    /** It is possible for a Close Connection to have been
1263     ** sent here, but we will not check for this. A "lazy"
1264     ** Exception will be thrown in the Worker thread after the
1265     ** incoming request has been processed even though the connection
1266     ** is closed before the request is processed. This is o.k because
1267     ** it is a boundary condition. To prevent it we would have to add
1268     ** more locks which would reduce performance in the normal case.
1269     **/

1270    public synchronized void serverRequestProcessingBegins()
1271    {
1272        serverRequestCount++;
1273    }
1274
1275    public synchronized void serverRequestProcessingEnds()
1276    {
1277        serverRequestCount--;
1278    }
1279
1280    //
1281
//
1282
//
1283

1284    public synchronized int getNextRequestId()
1285    {
1286    return requestId++;
1287    }
1288
1289    // Negotiated code sets for char and wchar data
1290
protected CodeSetComponentInfo.CodeSetContext codeSetContext = null;
1291
1292    public ORB getBroker()
1293    {
1294        return orb;
1295    }
1296
1297    public CodeSetComponentInfo.CodeSetContext getCodeSetContext() {
1298        // Needs to be synchronized for the following case when the client
1299
// doesn't send the code set context twice, and we have two threads
1300
// in ServerRequestDispatcher processCodeSetContext.
1301
//
1302
// Thread A checks to see if there is a context, there is none, so
1303
// it calls setCodeSetContext, getting the synch lock.
1304
// Thread B checks to see if there is a context. If we didn't synch,
1305
// it might decide to outlaw wchar/wstring.
1306
if (codeSetContext == null) {
1307            synchronized(this) {
1308                return codeSetContext;
1309            }
1310        }
1311
1312        return codeSetContext;
1313    }
1314
1315    public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) {
1316        // Double check whether or not we need to do this
1317
if (codeSetContext == null) {
1318            
1319            if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null ||
1320                OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) {
1321                // If the client says it's negotiated a code set that
1322
// isn't a fallback and we never said we support, then
1323
// it has a bug.
1324
throw wrapper.badCodesetsFromClient() ;
1325            }
1326
1327            codeSetContext = csc;
1328        }
1329    }
1330
1331    //
1332
// from iiop.IIOPConnection.java
1333
//
1334

1335    // Map request ID to an InputObject.
1336
// This is so the client thread can start unmarshaling
1337
// the reply and remove it from the out_calls map while the
1338
// ReaderThread can still obtain the input stream to give
1339
// new fragments. Only the ReaderThread touches the clientReplyMap,
1340
// so it doesn't incur synchronization overhead.
1341

1342    public MessageMediator clientRequestMapGet(int requestId)
1343    {
1344    return responseWaitingRoom.getMessageMediator(requestId);
1345    }
1346
1347    protected MessageMediator clientReply_1_1;
1348
1349    public void clientReply_1_1_Put(MessageMediator x)
1350    {
1351    clientReply_1_1 = x;
1352    }
1353
1354    public MessageMediator clientReply_1_1_Get()
1355    {
1356    return clientReply_1_1;
1357    }
1358
1359    public void clientReply_1_1_Remove()
1360    {
1361    clientReply_1_1 = null;
1362    }
1363
1364    protected MessageMediator serverRequest_1_1;
1365
1366    public void serverRequest_1_1_Put(MessageMediator x)
1367    {
1368    serverRequest_1_1 = x;
1369    }
1370
1371    public MessageMediator serverRequest_1_1_Get()
1372    {
1373    return serverRequest_1_1;
1374    }
1375
1376    public void serverRequest_1_1_Remove()
1377    {
1378    serverRequest_1_1 = null;
1379    }
1380
1381    protected String JavaDoc getStateString( int state )
1382    {
1383        synchronized ( stateEvent ){
1384            switch (state) {
1385            case OPENING : return "OPENING" ;
1386            case ESTABLISHED : return "ESTABLISHED" ;
1387            case CLOSE_SENT : return "CLOSE_SENT" ;
1388            case CLOSE_RECVD : return "CLOSE_RECVD" ;
1389            case ABORT : return "ABORT" ;
1390            default : return "???" ;
1391            }
1392        }
1393    }
1394    
1395    public synchronized boolean isPostInitialContexts() {
1396        return postInitialContexts;
1397    }
1398
1399    // Can never be unset...
1400
public synchronized void setPostInitialContexts(){
1401        postInitialContexts = true;
1402    }
1403    
1404    /**
1405     * Wake up the outstanding requests on the connection, and hand them
1406     * COMM_FAILURE exception with a given minor code.
1407     *
1408     * Also, delete connection from connection table and
1409     * stop the reader thread.
1410
1411     * Note that this should only ever be called by the Reader thread for
1412     * this connection.
1413     *
1414     * @param minor_code The minor code for the COMM_FAILURE major code.
1415     * @param die Kill the reader thread (this thread) before exiting.
1416     */

1417    public void purgeCalls(SystemException JavaDoc systemException,
1418               boolean die, boolean lockHeld)
1419    {
1420    int minor_code = systemException.minor;
1421
1422    try{
1423        if (orb.transportDebugFlag) {
1424        dprint(".purgeCalls->: "
1425               + minor_code + "/" + die + "/" + lockHeld
1426               + " " + this);
1427        }
1428
1429        // If this invocation is a result of ThreadDeath caused
1430
// by a previous execution of this routine, just exit.
1431

1432        synchronized ( stateEvent ){
1433        if ((state == ABORT) || (state == CLOSE_RECVD)) {
1434            if (orb.transportDebugFlag) {
1435            dprint(".purgeCalls: exiting since state is: "
1436                   + getStateString(state)
1437                   + " " + this);
1438            }
1439            return;
1440        }
1441        }
1442
1443        // Grab the writeLock (freeze the calls)
1444
try {
1445        if (!lockHeld) {
1446            writeLock();
1447        }
1448        } catch (SystemException JavaDoc ex) {
1449        if (orb.transportDebugFlag)
1450            dprint(".purgeCalls: SystemException" + ex
1451               + "; continuing " + this);
1452        }
1453
1454        // Mark the state of the connection
1455
// and determine the request status
1456
org.omg.CORBA.CompletionStatus JavaDoc completion_status;
1457        synchronized ( stateEvent ){
1458        if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) {
1459            state = CLOSE_RECVD;
1460            systemException.completed = CompletionStatus.COMPLETED_NO;
1461        } else {
1462            state = ABORT;
1463            systemException.completed = CompletionStatus.COMPLETED_MAYBE;
1464        }
1465        stateEvent.notifyAll();
1466        }
1467    
1468        try {
1469        socket.getInputStream().close();
1470        socket.getOutputStream().close();
1471        socket.close();
1472        } catch (Exception JavaDoc ex) {
1473        if (orb.transportDebugFlag) {
1474            dprint(".purgeCalls: Exception closing socket: " + ex
1475               + " " + this);
1476        }
1477        }
1478
1479        // Signal all threads with outstanding requests on this
1480
// connection and give them the SystemException;
1481

1482        responseWaitingRoom.signalExceptionToAllWaiters(systemException);
1483
1484        if (contactInfo != null) {
1485        ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo);
1486        } else if (acceptor != null) {
1487        ((InboundConnectionCache)getConnectionCache()).remove(this);
1488        }
1489
1490        //
1491
// REVISIT: Stop the reader thread
1492
//
1493

1494        // Signal all the waiters of the writeLock.
1495
// There are 4 types of writeLock waiters:
1496
// 1. Send waiters:
1497
// 2. SendReply waiters:
1498
// 3. cleanUp waiters:
1499
// 4. purge_call waiters:
1500
//
1501

1502        writeUnlock();
1503
1504    } finally {
1505        if (orb.transportDebugFlag) {
1506        dprint(".purgeCalls<-: "
1507               + minor_code + "/" + die + "/" + lockHeld
1508               + " " + this);
1509        }
1510    }
1511    }
1512
1513    /*************************************************************************
1514    * The following methods are for dealing with Connection cleaning for
1515    * better scalability of servers in high network load conditions.
1516    **************************************************************************/

1517
1518    public void sendCloseConnection(GIOPVersion giopVersion)
1519    throws IOException JavaDoc
1520    {
1521        Message msg = MessageBase.createCloseConnection(giopVersion);
1522    sendHelper(giopVersion, msg);
1523    }
1524
1525    public void sendMessageError(GIOPVersion giopVersion)
1526    throws IOException JavaDoc
1527    {
1528        Message msg = MessageBase.createMessageError(giopVersion);
1529    sendHelper(giopVersion, msg);
1530    }
1531
1532    /**
1533     * Send a CancelRequest message. This does not lock the connection, so the
1534     * caller needs to ensure this method is called appropriately.
1535     * @exception IOException - could be due to abortive connection closure.
1536     */

1537    public void sendCancelRequest(GIOPVersion giopVersion, int requestId)
1538    throws IOException JavaDoc
1539    {
1540
1541        Message msg = MessageBase.createCancelRequest(giopVersion, requestId);
1542    sendHelper(giopVersion, msg);
1543    }
1544
1545    protected void sendHelper(GIOPVersion giopVersion, Message msg)
1546    throws IOException JavaDoc
1547    {
1548    // REVISIT: See comments in CDROutputObject constructor.
1549
CDROutputObject outputObject =
1550        new CDROutputObject((ORB)orb, null, giopVersion, this, msg,
1551                ORBConstants.STREAM_FORMAT_VERSION_1);
1552        msg.write(outputObject);
1553
1554    outputObject.writeTo(this);
1555    }
1556
1557    public void sendCancelRequestWithLock(GIOPVersion giopVersion,
1558                      int requestId)
1559    throws IOException JavaDoc
1560    {
1561    writeLock();
1562    try {
1563        sendCancelRequest(giopVersion, requestId);
1564    } finally {
1565        writeUnlock();
1566    }
1567    }
1568
1569    // Begin Code Base methods ---------------------------------------
1570
//
1571
// Set this connection's code base IOR. The IOR comes from the
1572
// SendingContext. This is an optional service context, but all
1573
// JavaSoft ORBs send it.
1574
//
1575
// The set and get methods don't need to be synchronized since the
1576
// first possible get would occur during reading a valuetype, and
1577
// that would be after the set.
1578

1579    // Sets this connection's code base IOR. This is done after
1580
// getting the IOR out of the SendingContext service context.
1581
// Our ORBs always send this, but it's optional in CORBA.
1582

1583    public final void setCodeBaseIOR(IOR ior) {
1584        codeBaseServerIOR = ior;
1585    }
1586
1587    public final IOR getCodeBaseIOR() {
1588        return codeBaseServerIOR;
1589    }
1590
1591    // Get a CodeBase stub to use in unmarshaling. The CachedCodeBase
1592
// won't connect to the remote codebase unless it's necessary.
1593
public final CodeBase getCodeBase() {
1594        return cachedCodeBase;
1595    }
1596
1597    // End Code Base methods -----------------------------------------
1598

1599    // set transport read thresholds
1600
protected void setReadTimeouts(ReadTimeouts readTimeouts) {
1601    this.readTimeouts = readTimeouts;
1602    }
1603
1604    protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) {
1605        partialMessageMediator = messageMediator;
1606    }
1607
1608    protected CorbaMessageMediator getPartialMessageMediator() {
1609    return partialMessageMediator;
1610    }
1611
1612    public String JavaDoc toString()
1613    {
1614        synchronized ( stateEvent ){
1615            return
1616        "SocketOrChannelConnectionImpl[" + " "
1617        + (socketChannel == null ?
1618           socket.toString() : socketChannel.toString()) + " "
1619        + getStateString( state ) + " "
1620        + shouldUseSelectThreadToWait() + " "
1621        + shouldUseWorkerThreadForEvent() + " "
1622        + shouldReadGiopHeaderOnly()
1623        + "]" ;
1624        }
1625    }
1626    
1627    // Must be public - used in encoding.
1628
public void dprint(String JavaDoc msg)
1629    {
1630    ORBUtility.dprint("SocketOrChannelConnectionImpl", msg);
1631    }
1632
1633    protected void dprint(String JavaDoc msg, Throwable JavaDoc t)
1634    {
1635    dprint(msg);
1636    t.printStackTrace(System.out);
1637    }
1638}
1639
1640// End of file.
1641
Popular Tags