1 5 package com.tc.objectserver.handler; 6 7 import com.tc.async.api.AbstractEventHandler; 8 import com.tc.async.api.ConfigurationContext; 9 import com.tc.async.api.EventContext; 10 import com.tc.async.api.Sink; 11 import com.tc.logging.TCLogger; 12 import com.tc.net.protocol.tcm.ChannelID; 13 import com.tc.net.protocol.tcm.CommunicationsManager; 14 import com.tc.net.protocol.tcm.MessageChannel; 15 import com.tc.net.protocol.tcm.TCMessageType; 16 import com.tc.object.msg.ClusterMembershipMessage; 17 import com.tc.object.net.DSOChannelManager; 18 import com.tc.object.net.DSOChannelManagerEventListener; 19 import com.tc.objectserver.core.api.ServerConfigurationContext; 20 import com.tc.objectserver.tx.ServerTransactionManager; 21 import com.tc.objectserver.tx.TransactionBatchManager; 22 import com.tc.util.concurrent.ThreadUtil; 23 24 27 public class ChannelLifeCycleHandler extends AbstractEventHandler { 28 private final ServerTransactionManager transactionManager; 29 private final TransactionBatchManager transactionBatchManager; 30 private TCLogger logger; 31 private final CommunicationsManager commsManager; 32 private final DSOChannelManager channelMgr; 33 34 public ChannelLifeCycleHandler(CommunicationsManager commsManager, ServerTransactionManager transactionManager, 35 TransactionBatchManager transactionBatchManager, DSOChannelManager channelManager) { 36 this.commsManager = commsManager; 37 this.transactionManager = transactionManager; 38 this.transactionBatchManager = transactionBatchManager; 39 this.channelMgr = channelManager; 40 } 41 42 public void handleEvent(EventContext context) { 43 Event event = (Event) context; 44 45 switch (event.type) { 46 case Event.CREATE: { 47 channelCreated(event.channel); 48 break; 49 } 50 case Event.REMOVE: { 51 channelRemoved(event.channel); 52 break; 53 } 54 default: { 55 throw new AssertionError ("unknown event: " + event.type); 56 } 57 } 58 } 59 60 private void channelRemoved(MessageChannel channel) { 61 ChannelID channelID = channel.getChannelID(); 62 broadcastClusterMemebershipMessage(ClusterMembershipMessage.EventType.NODE_DISCONNECTED, channel.getChannelID()); 63 if (commsManager.isInShutdown()) { 64 logger.info("Ignoring transport disconnect for " + channelID + " while shutting down."); 65 } else { 66 ThreadUtil.reallySleep(500); 68 logger.info("Received transport disconnect. Killing client " + channelID); 69 transactionManager.shutdownClient(channelID); 70 transactionBatchManager.shutdownClient(channelID); 71 } 72 } 73 74 private void channelCreated(MessageChannel channel) { 75 broadcastClusterMemebershipMessage(ClusterMembershipMessage.EventType.NODE_CONNECTED, channel.getChannelID()); 76 } 77 78 private void broadcastClusterMemebershipMessage(int eventType, ChannelID channelID) { 79 MessageChannel[] channels = channelMgr.getActiveChannels(); 80 for (int i = 0; i < channels.length; i++) { 81 MessageChannel channel = channels[i]; 82 83 if (!channel.getChannelID().equals(channelID)) { 84 ClusterMembershipMessage cmm = (ClusterMembershipMessage) channel 85 .createMessage(TCMessageType.CLUSTER_MEMBERSHIP_EVENT_MESSAGE); 86 cmm.initialize(eventType, channelID, channels); 87 cmm.send(); 88 } 89 } 90 } 91 92 public void initialize(ConfigurationContext context) { 93 super.initialize(context); 94 ServerConfigurationContext scc = (ServerConfigurationContext) context; 95 this.logger = scc.getLogger(ChannelLifeCycleHandler.class); 96 } 97 98 public static class Event implements EventContext { 99 public static final int CREATE = 0; 100 public static final int REMOVE = 1; 101 102 private final int type; 103 private final MessageChannel channel; 104 105 Event(int type, MessageChannel channel) { 106 this.type = type; 107 this.channel = channel; 108 if ((type != CREATE) && (type != REMOVE)) { throw new IllegalArgumentException ("invalid type: " + type); } 109 } 110 } 111 112 public static class EventListener implements DSOChannelManagerEventListener { 113 114 private final Sink sink; 115 116 public EventListener(Sink sink) { 117 this.sink = sink; 118 } 119 120 public void channelCreated(MessageChannel channel) { 121 sink.add(new Event(Event.CREATE, channel)); 122 } 123 124 public void channelRemoved(MessageChannel channel) { 125 sink.add(new Event(Event.REMOVE, channel)); 126 } 127 128 } 129 130 } 131 | Popular Tags |