1 package org.jacorb.notification.servant; 2 3 23 24 import java.util.Iterator ; 25 import java.util.List ; 26 27 import org.apache.avalon.framework.configuration.Configurable; 28 import org.apache.avalon.framework.configuration.Configuration; 29 import org.apache.avalon.framework.logger.Logger; 30 import org.jacorb.notification.FilterManager; 31 import org.jacorb.notification.IContainer; 32 import org.jacorb.notification.OfferManager; 33 import org.jacorb.notification.SubscriptionManager; 34 import org.jacorb.notification.conf.Attributes; 35 import org.jacorb.notification.conf.Default; 36 import org.jacorb.notification.engine.TaskProcessor; 37 import org.jacorb.notification.interfaces.Disposable; 38 import org.jacorb.notification.interfaces.FilterStage; 39 import org.jacorb.notification.util.DisposableManager; 40 import org.jacorb.notification.util.QoSPropertySet; 41 import org.omg.CORBA.NO_IMPLEMENT ; 42 import org.omg.CORBA.OBJECT_NOT_EXIST ; 43 import org.omg.CORBA.ORB ; 44 import org.omg.CosEventChannelAdmin.AlreadyConnected; 45 import org.omg.CosEventComm.Disconnected; 46 import org.omg.CosNotification.NamedPropertyRangeSeqHolder; 47 import org.omg.CosNotification.Property; 48 import org.omg.CosNotification.QoSAdminOperations; 49 import org.omg.CosNotification.UnsupportedQoS; 50 import org.omg.CosNotifyChannelAdmin.ConnectionAlreadyActive; 51 import org.omg.CosNotifyChannelAdmin.ConnectionAlreadyInactive; 52 import org.omg.CosNotifyChannelAdmin.NotConnected; 53 import org.omg.CosNotifyChannelAdmin.ProxyType; 54 import org.omg.CosNotifyFilter.Filter; 55 import org.omg.CosNotifyFilter.FilterAdminOperations; 56 import org.omg.CosNotifyFilter.FilterNotFound; 57 import org.omg.CosNotifyFilter.MappingFilter; 58 import org.omg.CosNotifyFilter.MappingFilterHelper; 59 import org.omg.PortableServer.POA ; 60 import org.omg.PortableServer.Servant ; 61 import org.picocontainer.PicoContainer; 62 63 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean; 64 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt; 65 66 70 71 public abstract class AbstractProxy implements FilterAdminOperations, QoSAdminOperations, 72 FilterStage, Disposable, ManageableServant, Configurable 73 { 74 private final MappingFilter nullMappingFilterRef_; 75 76 protected final boolean isIDPublic_; 77 78 protected final Logger logger_; 79 80 private final SynchronizedBoolean connected_ = new SynchronizedBoolean(false); 81 82 protected final QoSPropertySet qosSettings_; 83 84 protected final Integer id_; 85 86 protected final OfferManager offerManager_; 87 88 protected final SubscriptionManager subscriptionManager_; 89 90 protected Servant thisServant_; 91 92 protected MappingFilter lifetimeFilter_; 93 94 protected MappingFilter priorityFilter_; 95 96 99 private final FilterManager filterManager_; 100 101 private final SynchronizedBoolean destroyed_ = new SynchronizedBoolean(false); 102 103 private final SynchronizedBoolean disposeInProgress_ = new SynchronizedBoolean(false); 104 105 private final SynchronizedInt errorCounter_ = new SynchronizedInt(0); 106 107 private final POA poa_; 108 109 private final ORB orb_; 110 111 private final TaskProcessor taskProcessor_; 112 113 private boolean isInterFilterGroupOperatorOR_; 114 115 private final boolean disposedProxyDisconnectsClient_; 116 117 private final SynchronizedBoolean active_ = new SynchronizedBoolean(true); 118 119 private final DisposableManager disposables_ = new DisposableManager(); 120 121 private final PicoContainer container_; 122 123 125 protected AbstractProxy(IAdmin admin, ORB orb, POA poa, Configuration conf, 126 TaskProcessor taskProcessor, OfferManager offerManager, 127 SubscriptionManager subscriptionManager) 128 { 129 id_ = new Integer (admin.getProxyID()); 130 isIDPublic_ = admin.isIDPublic(); 131 container_ = admin.getContainer(); 132 133 orb_ = orb; 134 poa_ = poa; 135 taskProcessor_ = taskProcessor; 136 137 offerManager_ = offerManager; 138 subscriptionManager_ = subscriptionManager; 139 140 filterManager_ = new FilterManager(); 141 142 nullMappingFilterRef_ = MappingFilterHelper.narrow(orb.string_to_object(orb 143 .object_to_string(null))); 144 145 logger_ = ((org.jacorb.config.Configuration) conf).getNamedLogger(getClass().getName()); 146 147 disposedProxyDisconnectsClient_ = conf.getAttribute( 148 Attributes.DISPOSE_PROXY_CALLS_DISCONNECT, 149 Default.DEFAULT_DISPOSE_PROXY_CALLS_DISCONNECT).equals("on"); 150 151 qosSettings_ = new QoSPropertySet(conf, QoSPropertySet.PROXY_QOS); 152 153 configure(conf); 154 } 155 156 public void configure(Configuration conf) 157 { 158 } 160 161 163 public void addDisposeHook(Disposable d) 164 { 165 disposables_.addDisposable(d); 166 } 167 168 public boolean isIDPublic() 169 { 170 return isIDPublic_; 171 } 172 173 protected POA getPOA() 174 { 175 return poa_; 176 } 177 178 protected ORB getORB() 179 { 180 return orb_; 181 } 182 183 protected TaskProcessor getTaskProcessor() 184 { 185 return taskProcessor_; 186 } 187 188 192 public final int add_filter(Filter filter) 193 { 194 return filterManager_.add_filter(filter); 195 } 196 197 public final void remove_filter(int n) throws FilterNotFound 198 { 199 filterManager_.remove_filter(n); 200 } 201 202 public final Filter get_filter(int n) throws FilterNotFound 203 { 204 return filterManager_.get_filter(n); 205 } 206 207 public final int[] get_all_filters() 208 { 209 return filterManager_.get_all_filters(); 210 } 211 212 public final void remove_all_filters() 213 { 214 filterManager_.remove_all_filters(); 215 } 216 217 219 public void validate_event_qos(Property[] qosProps, NamedPropertyRangeSeqHolder propSeqHolder) 221 { 222 throw new NO_IMPLEMENT (); 223 } 224 225 public final void validate_qos(Property[] props, NamedPropertyRangeSeqHolder propertyRange) 226 throws UnsupportedQoS 227 { 228 qosSettings_.validate_qos(props, propertyRange); 229 } 230 231 public final void set_qos(Property[] qosProps) throws UnsupportedQoS 232 { 233 if (qosSettings_ != null) 234 { 235 qosSettings_.set_qos(qosProps); 236 } 237 } 238 239 public final Property[] get_qos() 240 { 241 return qosSettings_.get_qos(); 242 } 243 244 public final void priority_filter(MappingFilter filter) 245 { 246 priorityFilter_ = filter; 247 } 248 249 public final MappingFilter priority_filter() 250 { 251 if (priorityFilter_ == null) 252 { 253 return nullMappingFilterRef_; 254 } 255 256 return priorityFilter_; 257 } 258 259 public final MappingFilter lifetime_filter() 260 { 261 if (lifetimeFilter_ == null) 262 { 263 return nullMappingFilterRef_; 264 } 265 266 return lifetimeFilter_; 267 } 268 269 public final void lifetime_filter(MappingFilter filter) 270 { 271 lifetimeFilter_ = filter; 272 } 273 274 public final Integer getID() 275 { 276 return id_; 277 } 278 279 284 public final POA _default_POA() 285 { 286 return getPOA(); 287 } 288 289 public final List getFilters() 290 { 291 return filterManager_.getFilters(); 292 } 293 294 public final void deactivate() 295 { 296 logger_.info("deactivate Proxy"); 297 298 try 299 { 300 byte[] _oid = getPOA().servant_to_id(getServant()); 301 getPOA().deactivate_object(_oid); 302 } catch (Exception e) 303 { 304 logger_.fatalError("Couldn't deactivate Proxy", e); 305 } 306 } 307 308 private void tryDisconnectClient() 309 { 310 try 311 { 312 if (disposedProxyDisconnectsClient_ && connected_.get()) 313 { 314 logger_.info("disconnect_client"); 315 316 disconnectClient(); 317 } 318 } catch (Exception e) 319 { 320 logger_.error("disconnect_client raised an unexpected error: " + "ignore", e); 321 } finally 322 { 323 connected_.set(false); 324 } 325 } 326 327 public final boolean isDisposed() 328 { 329 return destroyed_.get(); 330 } 331 332 protected void checkDestroyStatus() throws OBJECT_NOT_EXIST 333 { 334 if (!destroyed_.commit(false, true)) 335 { 336 logger_.fatalError("dispose has been called twice"); 337 338 throw new OBJECT_NOT_EXIST (); 339 } 340 } 341 342 public final void destroy() 343 { 344 checkDestroyStatus(); 345 346 container_.dispose(); 347 348 List list = container_.getComponentInstancesOfType(IContainer.class); 349 for (Iterator i = list.iterator(); i.hasNext();) 350 { 351 IContainer element = (IContainer) i.next(); 352 element.destroy(); 353 } 354 } 355 356 public void dispose() 357 { 358 logger_.info("Destroy Proxy " + id_); 359 360 disposeInProgress_.set(true); 361 362 364 tryDisconnectClient(); 365 366 368 deactivate(); 369 370 372 removeListener(); 373 374 376 remove_all_filters(); 377 378 380 disposables_.dispose(); 381 } 382 383 public abstract ProxyType MyType(); 384 385 void setInterFilterGroupOperatorOR(boolean b) 386 { 387 isInterFilterGroupOperatorOR_ = b; 388 } 389 390 public final boolean hasInterFilterGroupOperatorOR() 391 { 392 return isInterFilterGroupOperatorOR_; 393 } 394 395 public final boolean isConnected() 396 { 397 return !disposeInProgress_.get() && connected_.get(); 398 } 399 400 public final boolean hasLifetimeFilter() 401 { 402 return lifetimeFilter_ != null; 403 } 404 405 public final boolean hasPriorityFilter() 406 { 407 return priorityFilter_ != null; 408 } 409 410 public final MappingFilter getLifetimeFilter() 411 { 412 return lifetimeFilter_; 413 } 414 415 public final MappingFilter getPriorityFilter() 416 { 417 return priorityFilter_; 418 } 419 420 public void resetErrorCounter() 421 { 422 errorCounter_.set(0); 423 } 424 425 public final int getErrorCounter() 426 { 427 return errorCounter_.get(); 428 } 429 430 public final int incErrorCounter() 431 { 432 return errorCounter_.increment(); 433 } 434 435 protected boolean isSuspended() 436 { 437 return !active_.get(); 438 } 439 440 public final void suspend_connection() throws NotConnected, ConnectionAlreadyInactive 441 { 442 checkIsConnected(); 443 444 if (!active_.commit(true, false)) 445 { 446 throw new ConnectionAlreadyInactive(); 447 } 448 449 connectionSuspended(); 450 } 451 452 455 protected void connectionSuspended() 456 { 457 } 459 460 public final void resume_connection() throws NotConnected, ConnectionAlreadyActive 461 { 462 checkIsConnected(); 463 464 if (!active_.commit(false, true)) 465 { 466 throw new ConnectionAlreadyActive(); 467 } 468 469 connectionResumed(); 470 } 471 472 476 protected void connectionResumed() 477 { 478 } 480 481 protected void checkIsConnected() throws NotConnected 482 { 483 if (!connected_.get()) 484 { 485 throw new NotConnected(); 486 } 487 } 488 489 protected void checkIsNotConnected() throws AlreadyConnected 490 { 491 if (connected_.get()) 492 { 493 throw new AlreadyConnected(); 494 } 495 } 496 497 protected void checkStillConnected() throws Disconnected 498 { 499 if (!connected_.get()) 500 { 501 logger_.fatalError("access on a not connected proxy"); 502 503 destroy(); 504 505 throw new Disconnected(); 506 } 507 } 508 509 protected void connectClient(org.omg.CORBA.Object client) 510 { 511 connected_.set(true); 512 } 513 514 517 protected abstract void disconnectClient(); 518 519 protected abstract Servant getServant(); 520 521 protected void handleDisconnected(Disconnected e) 522 { 523 logger_.fatalError("Illegal state: Client think it's disconnected. " 524 + "Proxy thinks Client is still connected. The Proxy will be destroyed.", e); 525 526 destroy(); 527 } 529 530 protected abstract void removeListener(); 531 } | Popular Tags |