KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > HTTPClient > StreamDemultiplexor


1 /*
2  * @(#)StreamDemultiplexor.java 0.3-2 18/06/1999
3  *
4  * This file is part of the HTTPClient package
5  * Copyright (C) 1996-1999 Ronald Tschalär
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free
19  * Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
20  * MA 02111-1307, USA
21  *
22  * For questions, suggestions, bug-reports, enhancement-requests etc.
23  * I may be contacted at:
24  *
25  * ronald@innovation.ch
26  *
27  */

28
29 package HTTPClient;
30
31
32 import java.io.*;
33 import java.net.Socket JavaDoc;
34 import java.util.Vector JavaDoc;
35 import java.util.Enumeration JavaDoc;
36
37 /**
38  * This class handles the demultiplexing of input stream. This is needed
39  * for things like keep-alive in HTTP/1.0, persist in HTTP/1.1 and in HTTP-NG.
40  *
41  * @version 0.3-2 18/06/1999
42  * @author Ronald Tschalär
43  */

44
45 class StreamDemultiplexor implements GlobalConstants
46 {
47     /** the protocol were handling request for */
48     private int Protocol;
49
50     /** the connection we're working for */
51     private HTTPConnection Connection;
52
53     /** the input stream to demultiplex */
54     private ExtBufferedInputStream Stream;
55
56     /** the socket this hangs off */
57     private Socket JavaDoc Sock = null;
58
59     /** signals after the closing of which stream to close the socket */
60     private ResponseHandler MarkedForClose;
61
62     /** timer used to close the socket if unused for a given time */
63     private SocketTimeout.TimeoutEntry Timer = null;
64
65     /** timer thread which implements the timers */
66     private static SocketTimeout TimerThread = null;
67
68     /** a Vector to hold the list of response handlers were serving */
69     private LinkedList RespHandlerList;
70
71     /** number of unread bytes in current chunk (if transf-enc == chunked) */
72     private int chunk_len;
73
74     /** the currently set timeout for the socket */
75     private int cur_timeout = 0;
76
77
78     static
79     {
80     TimerThread = new SocketTimeout(60);
81     TimerThread.start();
82     }
83
84
85     // Constructors
86

87     /**
88      * a simple contructor.
89      *
90      * @param protocol the protocol used on this stream.
91      * @param sock the socket which we're to demux.
92      * @param connection the http-connection this socket belongs to.
93      */

94     StreamDemultiplexor(int protocol, Socket JavaDoc sock, HTTPConnection connection)
95         throws IOException
96     {
97     this.Protocol = protocol;
98     this.Connection = connection;
99     RespHandlerList = new LinkedList();
100     init(sock);
101     }
102
103
104     /**
105      * Initializes the demultiplexor with a new socket.
106      *
107      * @param stream the stream to demultiplex
108      */

109     private void init(Socket JavaDoc sock) throws IOException
110     {
111     if (DebugDemux)
112         System.err.println("Demux: Initializing Stream Demultiplexor (" +
113                 this.hashCode() + ")");
114
115     this.Sock = sock;
116     this.Stream = new ExtBufferedInputStream(sock.getInputStream());
117     MarkedForClose = null;
118     chunk_len = -1;
119
120     // start a timer to close the socket after 60 seconds
121
Timer = TimerThread.setTimeout(this);
122     }
123
124
125     // Methods
126

127     /**
128      * Each Response must register with us.
129      */

130     void register(Response resp_handler, Request req) throws RetryException
131     {
132     synchronized(RespHandlerList)
133     {
134         if (Sock == null)
135         throw new RetryException();
136
137         RespHandlerList.addToEnd(
138                 new ResponseHandler(resp_handler, req, this));
139     }
140     }
141
142     /**
143      * creates an input stream for the response.
144      *
145      * @param resp the response structure requesting the stream
146      * @return an InputStream
147      */

148     RespInputStream getStream(Response resp)
149     {
150     ResponseHandler resph;
151     for (resph = (ResponseHandler) RespHandlerList.enumerate();
152          resph != null; resph = (ResponseHandler) RespHandlerList.next())
153     {
154         if (resph.resp == resp) break;
155     }
156
157     if (resph != null)
158         return resph.stream;
159     else
160         return null;
161     }
162
163
164     /**
165      * Restarts the timer thread that will close an unused socket after
166      * 60 seconds.
167      */

168     void restartTimer()
169     {
170     if (Timer != null) Timer.reset();
171     }
172
173
174     /**
175      * reads an array of bytes from the master stream.
176      */

177     int read(byte[] b, int off, int len, ResponseHandler resph, int timeout)
178         throws IOException
179     {
180     if (resph.exception != null)
181         throw (IOException) resph.exception.fillInStackTrace();
182
183     if (resph.eof)
184         return -1;
185
186
187     // read the headers and data for all responses preceding us.
188

189     ResponseHandler head;
190     while ((head = (ResponseHandler) RespHandlerList.getFirst()) != null &&
191         head != resph)
192     {
193         try
194         { head.stream.readAll(timeout); }
195         catch (IOException ioe)
196         {
197         if (ioe instanceof InterruptedIOException)
198             throw ioe;
199         else
200             throw (IOException) resph.exception.fillInStackTrace();
201         }
202     }
203
204
205     // Now we can read from the stream.
206

207     synchronized(this)
208     {
209         if (resph.exception != null)
210         throw (IOException) resph.exception.fillInStackTrace();
211
212         if (DebugDemux)
213         {
214         if (resph.resp.cd_type != CD_HDRS)
215             System.err.println("Demux: Reading for stream " +
216                        resph.stream.hashCode() +
217                        " (" + Thread.currentThread() + ")");
218         }
219
220         if (Timer != null) Timer.hyber();
221
222         try
223         {
224         int rcvd = -1;
225
226         if (timeout != cur_timeout)
227         {
228             if (DebugDemux)
229             {
230             System.err.println("Demux: Setting timeout to " +
231                        timeout + " ms");
232             }
233
234             try
235             { Sock.setSoTimeout(timeout); }
236             catch (Throwable JavaDoc t)
237             { }
238             cur_timeout = timeout;
239         }
240
241         switch (resph.resp.cd_type)
242         {
243             case CD_HDRS:
244             rcvd = Stream.read(b, off, len);
245             if (rcvd == -1)
246                 throw new EOFException("Premature EOF encountered");
247             break;
248
249             case CD_0:
250             rcvd = -1;
251             close(resph);
252             break;
253
254             case CD_CLOSE:
255             rcvd = Stream.read(b, off, len);
256             if (rcvd == -1)
257                 close(resph);
258             break;
259
260             case CD_CONTLEN:
261             int cl = resph.resp.ContentLength;
262             if (len > cl - resph.stream.count)
263                 len = cl - resph.stream.count;
264
265             rcvd = Stream.read(b, off, len);
266             if (rcvd == -1)
267                 throw new EOFException("Premature EOF encountered");
268
269             if (resph.stream.count+rcvd == cl)
270                 close(resph);
271
272             break;
273
274             case CD_CHUNKED:
275             if (chunk_len == -1) // it's a new chunk
276
chunk_len = Codecs.getChunkLength(Stream);
277
278             if (chunk_len > 0) // it's data
279
{
280                 if (len > chunk_len) len = chunk_len;
281                 rcvd = Stream.read(b, off, len);
282                 if (rcvd == -1)
283                 throw new EOFException("Premature EOF encountered");
284                 chunk_len -= rcvd;
285                 if (chunk_len == 0) // got the whole chunk
286
{
287                 Stream.read(); // CR
288
Stream.read(); // LF
289
chunk_len = -1;
290                 }
291             }
292             else // the footers (trailers)
293
{
294                 resph.resp.readTrailers(Stream);
295                 rcvd = -1;
296                 close(resph);
297                 chunk_len = -1;
298             }
299             break;
300
301             case CD_MP_BR:
302             byte[] endbndry = resph.getEndBoundary(Stream);
303             int[] end_cmp = resph.getEndCompiled(Stream);
304
305             rcvd = Stream.read(b, off, len);
306             if (rcvd == -1)
307                 throw new EOFException("Premature EOF encountered");
308
309             int ovf = Stream.pastEnd(endbndry, end_cmp);
310             if (ovf != -1)
311             {
312                 rcvd -= ovf;
313                 Stream.reset();
314                 close(resph);
315             }
316
317             break;
318
319             default:
320             throw new Error JavaDoc("Internal Error in StreamDemultiplexor: " +
321                     "Invalid cd_type " + resph.resp.cd_type);
322         }
323
324         restartTimer();
325         return rcvd;
326
327         }
328         catch (InterruptedIOException ie) // don't intercept this one
329
{
330         restartTimer();
331         throw ie;
332         }
333         catch (IOException ioe)
334         {
335         if (DebugDemux)
336         {
337             System.err.print("Demux: (" + Thread.currentThread() + ") ");
338             ioe.printStackTrace();
339         }
340
341         close(ioe, true);
342         throw resph.exception; // set by retry_requests
343
}
344         catch (ParseException pe)
345         {
346         if (DebugDemux)
347         {
348             System.err.print("Demux: (" + Thread.currentThread() + ") ");
349             pe.printStackTrace();
350         }
351
352         close(new IOException(pe.toString()), true);
353         throw resph.exception; // set by retry_requests
354
}
355     }
356     }
357
358     /**
359      * skips a number of bytes in the master stream. This is done via a
360      * dummy read, as the socket input stream doesn't like skip()'s.
361      */

362     synchronized long skip(long num, ResponseHandler resph) throws IOException
363     {
364     if (resph.exception != null)
365         throw (IOException) resph.exception.fillInStackTrace();
366
367     if (resph.eof)
368         return 0;
369
370     byte[] dummy = new byte[(int) num];
371     int rcvd = read(dummy, 0, (int) num, resph, 0);
372     if (rcvd == -1)
373         return 0;
374     else
375         return rcvd;
376     }
377
378     /**
379      * Determines the number of available bytes.
380      */

381     synchronized int available(ResponseHandler resph) throws IOException
382     {
383     int avail = Stream.available();
384     if (resph == null) return avail;
385
386     if (resph.exception != null)
387         throw (IOException) resph.exception.fillInStackTrace();
388
389     if (resph.eof)
390         return 0;
391
392     switch (resph.resp.cd_type)
393     {
394         case CD_0:
395         return 0;
396         case CD_HDRS:
397         // this is something of a hack; I could return 0, but then
398
// if you were waiting for something on a response that
399
// wasn't first in line (and you didn't try to read the
400
// other response) you'd wait forever. On the other hand,
401
// we might be making a false promise here...
402
return (avail > 0 ? 1 : 0);
403         case CD_CLOSE:
404         return avail;
405         case CD_CONTLEN:
406         int cl = resph.resp.ContentLength;
407         cl -= resph.stream.count;
408         return (avail < cl ? avail : cl);
409         case CD_CHUNKED:
410         return avail; // not perfect...
411
case CD_MP_BR:
412         return avail; // not perfect...
413
default:
414         throw new Error JavaDoc("Internal Error in StreamDemultiplexor: " +
415                 "Invalid cd_type " + resph.resp.cd_type);
416     }
417
418     }
419
420
421     /**
422      * Closes the socket and all associated streams. If <var>exception</var>
423      * is not null then all active requests are retried.
424      *
425      * <P>There are five ways this method may be activated. 1) if an exception
426      * occurs during read or write. 2) if the stream is marked for close but
427      * no responses are outstanding (e.g. due to a timeout). 3) when the
428      * markedForClose response is closed. 4) if all response streams up until
429      * and including the markedForClose response have been closed. 5) if this
430      * demux is finalized.
431      *
432      * @param exception the IOException to be sent to the streams.
433      * @param was_reset if true then the exception is due to a connection
434      * reset; otherwise it means we generated the exception
435      * ourselves and this is a "normal" close.
436      */

437     synchronized void close(IOException exception, boolean was_reset)
438     {
439     if (Sock == null) // already cleaned up
440
return;
441
442     if (DebugDemux)
443         System.err.println("Demux: Closing all streams and socket (" +
444                 this.hashCode() + ")");
445
446     try
447         { Stream.close(); }
448     catch (IOException ioe) { }
449     try
450         { Sock.close(); }
451     catch (IOException ioe) { }
452     Sock = null;
453
454     if (Timer != null)
455     {
456         Timer.kill();
457         Timer = null;
458     }
459
460     Connection.DemuxList.remove(this);
461
462
463     // Here comes the tricky part: redo outstanding requests!
464

465     if (exception != null)
466         synchronized(RespHandlerList)
467         { retry_requests(exception, was_reset); }
468     }
469
470
471     /**
472      * Retries outstanding requests. Well, actually the RetryModule does
473      * that. Here we just throw a RetryException for each request so that
474      * the RetryModule can catch and handle them.
475      *
476      * @param exception the exception that led to this call.
477      * @param was_reset this flag is passed to the RetryException and is
478      * used by the RetryModule to distinguish abnormal closes
479      * from expected closes.
480      */

481     private void retry_requests(IOException exception, boolean was_reset)
482     {
483     RetryException first = null,
484             prev = null;
485     ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
486
487     while (resph != null)
488     {
489         /* if the application is already reading the data then the
490          * response has already been handled. In this case we must
491          * throw the real exception.
492          */

493         if (resph.resp.got_headers)
494         {
495         resph.exception = exception;
496         }
497         else
498         {
499         RetryException tmp = new RetryException(exception.getMessage());
500         if (first == null) first = tmp;
501
502         tmp.request = resph.request;
503         tmp.response = resph.resp;
504         tmp.exception = exception;
505         tmp.conn_reset = was_reset;
506         tmp.first = first;
507         tmp.addToListAfter(prev);
508
509         prev = tmp;
510         resph.exception = tmp;
511         }
512
513         RespHandlerList.remove(resph);
514         resph = (ResponseHandler) RespHandlerList.next();
515     }
516     }
517
518
519     /**
520      * Closes the associated stream. If this one has been markedForClose then
521      * the socket is closed; else closeSocketIfAllStreamsClosed is invoked.
522      */

523     synchronized void close(ResponseHandler resph)
524     {
525     if (resph != (ResponseHandler) RespHandlerList.getFirst())
526         return;
527
528     if (DebugDemux)
529         System.err.println("Demux: Closing stream " +
530                 resph.stream.hashCode() +
531                 " (" + Thread.currentThread() + ")");
532
533     resph.eof = true;
534     RespHandlerList.remove(resph);
535
536     if (resph == MarkedForClose)
537         close(new IOException("Premature end of Keep-Alive"), false);
538     else
539         closeSocketIfAllStreamsClosed();
540     }
541
542
543     /**
544      * Close the socket if all the streams have been closed.
545      *
546      * <P>When a stream reaches eof it is removed from the response handler
547      * list, but when somebody close()'s the response stream it is just
548      * marked as such. This means that all responses in the list have either
549      * not been read at all or only partially read, but they might have been
550      * close()'d meaning that nobody is interested in the data. So If all the
551      * response streams up till and including the one markedForClose have
552      * been close()'d then we can remove them from our list and close the
553      * socket.
554      *
555      * <P>Note: if the response list is emtpy or if no response is
556      * markedForClose then this method does nothing. Specifically it does
557      * not close the socket. We only want to close the socket if we've been
558      * told to do so.
559      *
560      * <P>Also note that there might still be responses in the list after
561      * the markedForClose one. These are due to us having pipelined more
562      * requests to the server than it's willing to serve on a single
563      * connection. These requests will be retried if possible.
564      */

565     synchronized void closeSocketIfAllStreamsClosed()
566     {
567     synchronized(RespHandlerList)
568     {
569         ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
570
571         while (resph != null && resph.stream.closed)
572         {
573         if (resph == MarkedForClose)
574         {
575             // remove all response handlers first
576
ResponseHandler tmp;
577             do
578             {
579             tmp = (ResponseHandler) RespHandlerList.getFirst();
580             RespHandlerList.remove(tmp);
581             }
582             while (tmp != resph);
583
584             // close the socket
585
close(new IOException("Premature end of Keep-Alive"), false);
586             return;
587         }
588
589         resph = (ResponseHandler) RespHandlerList.next();
590         }
591     }
592     }
593
594
595     /**
596      * returns the socket associated with this demux
597      */

598     synchronized Socket JavaDoc getSocket()
599     {
600     if (MarkedForClose != null)
601         return null;
602
603     if (Timer != null) Timer.hyber();
604     return Sock;
605     }
606
607
608     /**
609      * Mark this demux to not accept any more request and to close the
610      * stream after this <var>resp</var>onse or all requests have been
611      * processed, or close immediately if no requests are registered.
612      *
613      * @param response the Response after which the connection should
614      * be closed.
615      */

616     synchronized void markForClose(Response resp)
617     {
618     synchronized(RespHandlerList)
619     {
620         if (RespHandlerList.getFirst() == null) // no active request,
621
{ // so close the socket
622
close(new IOException("Premature end of Keep-Alive"), false);
623         return;
624         }
625     }
626
627     if (Timer != null)
628     {
629         Timer.kill();
630         Timer = null;
631     }
632
633     ResponseHandler resph, lasth = null;
634     for (resph = (ResponseHandler) RespHandlerList.enumerate();
635          resph != null; resph = (ResponseHandler) RespHandlerList.next())
636     {
637         if (resph.resp == resp) // new resp precedes any others
638
{
639         MarkedForClose = resph;
640
641         if (DebugDemux)
642             System.err.println("Demux: stream " +
643                        resp.inp_stream.hashCode() +
644                        " marked for close (" +
645                        Thread.currentThread() + ")");
646
647         closeSocketIfAllStreamsClosed();
648         return;
649         }
650
651         if (MarkedForClose == resph)
652         return; // already marked for closing after an earlier resp
653

654         lasth = resph;
655     }
656
657     if (lasth == null)
658         return;
659
660     MarkedForClose = lasth; // resp == null, so use last resph
661
closeSocketIfAllStreamsClosed();
662
663     if (DebugDemux)
664         System.err.println("Demux: stream " + lasth.stream.hashCode() +
665                    " marked for close (" +
666                    Thread.currentThread() + ")");
667     }
668
669
670     /**
671      * Emergency stop. Closes the socket and notifies the responses that
672      * the requests are aborted.
673      *
674      * @since V0.3
675      */

676     void abort()
677     {
678     if (DebugDemux)
679         System.err.println("Demux: Aborting socket (" +
680                 this.hashCode() + ")");
681
682
683     // notify all responses of abort
684

685     synchronized(RespHandlerList)
686     {
687         for (ResponseHandler resph =
688                 (ResponseHandler) RespHandlerList.enumerate();
689          resph != null;
690          resph = (ResponseHandler) RespHandlerList.next())
691         {
692         if (resph.resp.http_resp != null)
693             resph.resp.http_resp.markAborted();
694         if (resph.exception == null)
695             resph.exception = new IOException("Request aborted by user");
696         }
697
698
699         /* Close the socket.
700          * Note: this duplicates most of close(IOException, boolean). We
701          * do *not* call close() because that is synchronized, but we want
702          * abort() to be asynch.
703          */

704         if (Sock != null)
705         {
706         try
707         {
708             try
709             { Sock.setSoLinger(false, 0); }
710             catch (Throwable JavaDoc t)
711             { }
712
713             try
714             { Stream.close(); }
715             catch (IOException ioe) { }
716             try
717             { Sock.close(); }
718             catch (IOException ioe) { }
719             Sock = null;
720
721             if (Timer != null)
722             {
723             Timer.kill();
724             Timer = null;
725             }
726         }
727         catch (NullPointerException JavaDoc npe)
728             { }
729
730         Connection.DemuxList.remove(this);
731         }
732     }
733     }
734
735
736     /**
737      * A safety net to close the connection.
738      */

739     protected void finalize() throws Throwable JavaDoc
740     {
741     close((IOException) null, false);
742     super.finalize();
743     }
744
745
746     /**
747      * produces a string.
748      * @return a string containing the class name and protocol number
749      */

750     public String JavaDoc toString()
751     {
752     String JavaDoc prot;
753
754     switch (Protocol)
755     {
756         case HTTP:
757         prot = "HTTP"; break;
758         case HTTPS:
759         prot = "HTTPS"; break;
760         case SHTTP:
761         prot = "SHTTP"; break;
762         case HTTP_NG:
763         prot = "HTTP_NG"; break;
764         default:
765         throw new Error JavaDoc("HTTPClient Internal Error: invalid protocol " +
766                 Protocol);
767     }
768
769     return getClass().getName() + "[Protocol=" + prot + "]";
770     }
771 }
772
773
774 /**
775  * This thread is used to implement socket timeouts. It keeps a list of
776  * timer entries and expries them after a given time.
777  */

778 class SocketTimeout extends Thread JavaDoc implements GlobalConstants
779 {
780     /**
781      * This class represents a timer entry. It is used to close an
782      * inactive socket after n seconds. Once running, the timer may be
783      * suspended (hyber()), restarted (reset()), or aborted (kill()).
784      * When the timer expires it invokes markForClose() on the
785      * associated stream demultipexer.
786      */

787     class TimeoutEntry
788     {
789     boolean restart = false,
790         hyber = false,
791         alive = true;
792     StreamDemultiplexor demux;
793     TimeoutEntry next = null,
794              prev = null;
795
796     TimeoutEntry(StreamDemultiplexor demux)
797     {
798         this.demux = demux;
799     }
800
801     void reset()
802     {
803         hyber = false;
804         if (restart) return;
805         restart = true;
806
807         synchronized(time_list)
808         {
809         if (!alive) return;
810
811         // remove from current position
812
next.prev = prev;
813         prev.next = next;
814
815         // and add to end of timeout list
816
next = time_list[current];
817         prev = time_list[current].prev;
818         prev.next = this;
819         next.prev = this;
820         }
821     }
822
823     void hyber()
824     {
825         if (alive) hyber = true;
826     }
827
828     void kill()
829     {
830         alive = false;
831         restart = false;
832         hyber = false;
833
834         synchronized(time_list)
835         {
836         if (prev == null) return;
837         next.prev = prev;
838         prev.next = next;
839         prev = null;
840         }
841     }
842     }
843
844     private TimeoutEntry[] time_list;
845     private int current;
846
847
848     SocketTimeout(int secs)
849     {
850     super("SocketTimeout");
851
852     try { setDaemon(true); }
853     catch (SecurityException JavaDoc se) { } // Oh well...
854
setPriority(MAX_PRIORITY);
855
856     time_list = new TimeoutEntry[secs];
857     for (int idx=0; idx<secs; idx++)
858     {
859         time_list[idx] = new TimeoutEntry(null);
860         time_list[idx].next = time_list[idx].prev = time_list[idx];
861     }
862     current = 0;
863     }
864
865
866     public TimeoutEntry setTimeout(StreamDemultiplexor demux)
867     {
868     TimeoutEntry entry = new TimeoutEntry(demux);
869     synchronized(time_list)
870     {
871         entry.next = time_list[current];
872         entry.prev = time_list[current].prev;
873         entry.prev.next = entry;
874         entry.next.prev = entry;
875     }
876
877     return entry;
878     }
879
880
881     /**
882      * This timer is implemented by sleeping for 1 second and then
883      * checking the timer list.
884      */

885     public void run()
886     {
887     TimeoutEntry marked = null;
888
889     while (true)
890     {
891         try { sleep(1000L); } catch (InterruptedException JavaDoc ie) { }
892
893         synchronized(time_list)
894         {
895         // reset all restart flags
896
for (TimeoutEntry entry = time_list[current].next;
897              entry != time_list[current];
898              entry = entry.next)
899         {
900             entry.restart = false;
901         }
902
903         current++;
904         if (current >= time_list.length)
905             current = 0;
906
907         // remove all expired timers
908
for (TimeoutEntry entry = time_list[current].next;
909              entry != time_list[current];
910              entry = entry.next)
911         {
912             if (entry.alive && !entry.hyber)
913             {
914             TimeoutEntry prev = entry.prev;
915             entry.kill();
916             /* put on death row. Note: we must not invoke
917              * markForClose() here because it is synch'd
918              * and can therefore lead to a deadlock if that
919              * thread is trying to do a reset() or kill()
920              */

921             entry.next = marked;
922             marked = entry;
923             entry = prev;
924             }
925         }
926         }
927
928         while (marked != null)
929         {
930         marked.demux.markForClose(null);
931         marked = marked.next;
932         }
933     }
934     }
935 }
936
937
Popular Tags