| 1 package net.walend.somnifugi.juc; 2 3 import java.util.Map ; 4 import java.util.HashMap ; 5 import java.util.Set ; 6 import java.util.HashSet ; 7 8 import javax.naming.Referenceable ; 9 import javax.naming.Reference ; 10 import javax.naming.NamingException ; 11 import javax.naming.Context ; 12 13 import javax.jms.JMSException ; 14 import javax.jms.Message ; 15 16 import net.walend.somnifugi.SomniMessageSelector; 17 import net.walend.somnifugi.SomniNamingException; 18 import net.walend.somnifugi.SomniMessageSelectorException; 19 import net.walend.somnifugi.SomniRuntimeException; 20 import net.walend.somnifugi.SomniMessage; 21 22 import net.walend.somnifugi.channel.Channel; 23 import net.walend.somnifugi.channel.Puttable; 24 import net.walend.somnifugi.channel.FanOut; 25 import net.walend.somnifugi.channel.Takable; 26 import net.walend.somnifugi.channel.ChannelFactory; 27 28 34 35 public class SimpleFanOut 36 implements FanOut<Message > 37 { 38 39 private Map <String ,SomniMessageSelector> subscriberNamesToMessageSelectors = new HashMap <String ,SomniMessageSelector>(); 40 41 private Map <String ,Puttable<Message >> subscriberNamesToPuttables = new HashMap <String ,Puttable<Message >>(); 42 private Map <String ,Takable<Message >> subscriberNamesToTakables = new HashMap <String ,Takable<Message >>(); 43 44 private Map <String ,String > subscriberNamesToConnectionClientIDsForNoLocalMessages = new HashMap <String ,String >(); 45 46 private final Object subscriberNamesToMapsGuard = new Object (); 47 48 private Set <String > durableSubscribers = new HashSet <String >(3); 49 50 private String copyMode; 51 52 private ChannelFactory<Message > factory; 53 private String topicName; 54 private Context context; 55 56 public SimpleFanOut(String copyMode,ChannelFactory<Message > factory,String topicName,Context context) 57 { 58 this.copyMode = copyMode; 59 this.factory = factory; 60 this.topicName = topicName; 61 this.context = context; 62 } 63 64 public Takable<Message > addSubscriber(String subscriber,boolean durable,SomniMessageSelector messageSelector,boolean noLocal,String subscriberConnectionClientID) 65 throws SomniNamingException 66 { 67 synchronized(subscriberNamesToMapsGuard) 68 { 69 if(subscriberNamesToTakables.containsKey(subscriber)) 70 { 71 return subscriberNamesToTakables.get(subscriber); 72 } 73 else 74 { 75 Channel<Message > channel = factory.createChannel(topicName,context); 76 Puttable<Message > puttable = channel.getPuttable(); 77 subscriberNamesToPuttables.put(subscriber,puttable); 78 79 Takable<Message > takable = channel.getTakable(); 80 subscriberNamesToTakables.put(subscriber,takable); 81 82 if(durable) 83 { 84 durableSubscribers.add(subscriber); 85 } 86 if(messageSelector!=null) 87 { 88 subscriberNamesToMessageSelectors.put(subscriber,messageSelector); 89 } 90 if(noLocal) 91 { 92 subscriberNamesToConnectionClientIDsForNoLocalMessages.put(subscriber,subscriberConnectionClientID); 93 } 94 return takable; 95 } 96 } 97 } 98 99 public void removeSubscriber(String subscriber) 100 { 101 synchronized(subscriberNamesToMapsGuard) 102 { 103 if(!durableSubscribers.contains(subscriber)) 104 { 105 subscriberNamesToPuttables.remove(subscriber); 106 subscriberNamesToTakables.remove(subscriber); 107 subscriberNamesToMessageSelectors.remove(subscriber); 108 subscriberNamesToConnectionClientIDsForNoLocalMessages.remove(subscriber); 109 } 110 } 111 } 112 113 public void removeDurableSubscriber(String subscriber) 114 { 115 synchronized(subscriberNamesToMapsGuard) 116 { 117 durableSubscribers.remove(subscriber); 118 subscriberNamesToPuttables.remove(subscriber); 119 subscriberNamesToTakables.remove(subscriber); 120 subscriberNamesToMessageSelectors.remove(subscriber); 121 subscriberNamesToConnectionClientIDsForNoLocalMessages.remove(subscriber); 122 } 123 } 124 125 private boolean shouldPut(String subscriberName,Message message) 126 { 127 try 128 { 129 SomniMessageSelector messageSelector = (SomniMessageSelector)subscriberNamesToMessageSelectors.get(subscriberName); 130 131 if(message instanceof SomniMessage) 132 { 133 SomniMessage somniMessage = (SomniMessage)message; 134 if(subscriberNamesToConnectionClientIDsForNoLocalMessages.containsKey(subscriberName) 135 &&(somniMessage.getSomniProducerConnectionClientID().equals(subscriberNamesToConnectionClientIDsForNoLocalMessages.get(subscriberName)))) 136 { 137 return false; 138 } 139 } 140 if(messageSelector==null) 141 { 142 return true; 143 } 144 if(messageSelector.matches(message)) 145 { 146 return true; 147 } 148 return false; 149 } 150 catch(SomniMessageSelectorException smse) 151 { 152 throw new SomniRuntimeException("Message selector had a problem.",smse); 153 } 154 } 155 156 public void put(Message item) 158 throws InterruptedException  159 { 160 SomniMessage message = (SomniMessage)item; 161 162 synchronized(subscriberNamesToMapsGuard) 163 { 164 for(String key : subscriberNamesToPuttables.keySet()) 165 { 166 if(shouldPut(key,message)) 167 { 168 message = message.copy(copyMode); 169 Puttable<Message > putter = subscriberNamesToPuttables.get(key); 170 putter.put(message); 171 } 172 } 173 } 174 } 175 176 public boolean offer(Message item,long msecs) 178 throws InterruptedException  179 { 180 synchronized(subscriberNamesToMapsGuard) 181 { 182 SomniMessage message = (SomniMessage)item; 183 184 boolean result = true; 185 for(String key : subscriberNamesToPuttables.keySet()) 186 { 187 SomniMessageSelector messageSelector = (SomniMessageSelector)subscriberNamesToMessageSelectors.get(key); 188 if(shouldPut(key,message)) 189 { 190 message = message.copy(copyMode); 191 Puttable<Message > putter = subscriberNamesToPuttables.get(key); 192 result &= putter.offer(message,msecs); 193 } 194 } 195 return result; 196 } 197 } 198 } 199 200 220 | Popular Tags |