KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > jacorb > notification > servant > ProxyPushSupplierImpl


1 package org.jacorb.notification.servant;
2
3 /*
4  * JacORB - a free Java ORB
5  *
6  * Copyright (C) 1997-2004 Gerald Brose.
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the Free
20  * Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */

22
23 import java.util.List JavaDoc;
24
25 import org.apache.avalon.framework.configuration.Configuration;
26 import org.apache.avalon.framework.configuration.ConfigurationException;
27 import org.jacorb.notification.OfferManager;
28 import org.jacorb.notification.SubscriptionManager;
29 import org.jacorb.notification.engine.MessagePushOperation;
30 import org.jacorb.notification.engine.PushTaskExecutorFactory;
31 import org.jacorb.notification.engine.TaskProcessor;
32 import org.jacorb.notification.interfaces.Message;
33 import org.jacorb.notification.interfaces.MessageConsumer;
34 import org.jacorb.notification.util.CollectionsWrapper;
35 import org.omg.CORBA.ORB JavaDoc;
36 import org.omg.CosEventChannelAdmin.AlreadyConnected;
37 import org.omg.CosEventComm.Disconnected;
38 import org.omg.CosEventComm.PushConsumer;
39 import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
40 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierHelper;
41 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierOperations;
42 import org.omg.CosNotifyChannelAdmin.ProxyPushSupplierPOATie;
43 import org.omg.CosNotifyChannelAdmin.ProxyType;
44 import org.omg.PortableServer.POA JavaDoc;
45 import org.omg.PortableServer.Servant JavaDoc;
46
47 /**
48  * @author Alphonse Bendt
49  * @version $Id: ProxyPushSupplierImpl.java,v 1.17 2005/04/27 10:45:46 alphonse.bendt Exp $
50  */

51
52 public class ProxyPushSupplierImpl extends AbstractProxyPushSupplier implements
53         ProxyPushSupplierOperations
54 {
55     private class PushAnyOperation extends MessagePushOperation
56     {
57         public PushAnyOperation(Message message) {
58             super(message);
59         }
60
61         public void invokePush() throws Disconnected {
62             deliverMessageInternal(message_);
63         }
64     }
65     
66     private PushConsumer pushConsumer_;
67     
68     private long timeSpent_;
69
70     // //////////////////////////////////////
71

72     public ProxyPushSupplierImpl(IAdmin admin, ORB JavaDoc orb, POA JavaDoc poa, Configuration conf,
73             TaskProcessor taskProcessor, PushTaskExecutorFactory pushTaskExecutorFactory, OfferManager offerManager,
74             SubscriptionManager subscriptionManager, ConsumerAdmin consumerAdmin)
75             throws ConfigurationException
76     {
77         super(admin, orb, poa, conf, taskProcessor, pushTaskExecutorFactory, offerManager,
78                 subscriptionManager, consumerAdmin);
79     }
80
81     public ProxyType MyType()
82     {
83         return ProxyType.PUSH_ANY;
84     }
85
86     public void disconnect_push_supplier()
87     {
88         destroy();
89     }
90
91     protected void disconnectClient()
92     {
93         pushConsumer_.disconnect_push_consumer();
94
95         pushConsumer_ = null;
96     }
97
98     
99
100     private void deliverMessageWithRetry(final Message message)
101     {
102         try
103         {
104             deliverMessageInternal(message);
105         } catch (Throwable JavaDoc e)
106         {
107             PushAnyOperation _failedOperation = new PushAnyOperation(message);
108
109             handleFailedPushOperation(_failedOperation, e);
110         }
111     }
112
113     void deliverMessageInternal(final Message message) throws Disconnected
114     {
115         long now = System.currentTimeMillis();
116         pushConsumer_.push(message.toAny());
117         timeSpent_ += (System.currentTimeMillis() - now);
118         resetErrorCounter();
119     }
120
121     public void pushPendingData()
122     {
123         Message[] _events = getAllMessages();
124
125         for (int x = 0; x < _events.length; ++x)
126         {
127             try
128             {
129                 deliverMessageWithRetry(_events[x]);
130             } finally
131             {
132                 _events[x].dispose();
133             }
134         }
135     }
136
137     public void connect_any_push_consumer(PushConsumer pushConsumer) throws AlreadyConnected
138     {
139         checkIsNotConnected();
140
141         pushConsumer_ = pushConsumer;
142
143         connectClient(pushConsumer);
144     }
145
146     public List JavaDoc getSubsequentFilterStages()
147     {
148         return CollectionsWrapper.singletonList(this);
149     }
150
151     public MessageConsumer getMessageConsumer()
152     {
153         return this;
154     }
155
156    
157
158     protected void connectionResumed()
159     {
160         schedulePush();
161     }
162
163     public synchronized Servant JavaDoc getServant()
164     {
165         if (thisServant_ == null)
166         {
167             thisServant_ = new ProxyPushSupplierPOATie(this);
168         }
169         return thisServant_;
170     }
171
172     public org.omg.CORBA.Object JavaDoc activate()
173     {
174         return ProxyPushSupplierHelper.narrow(getServant()._this_object(getORB()));
175     }
176
177     public long getCost()
178     {
179         return timeSpent_;
180     }
181 }
Popular Tags