1 package net.walend.somnifugi; 2 3 import java.util.Hashtable ; 4 import java.util.Enumeration ; 5 6 import javax.naming.Context ; 7 import javax.naming.NamingException ; 8 9 import javax.jms.Message ; 10 11 import net.walend.somnifugi.channel.Puttable; 12 import net.walend.somnifugi.channel.Takable; 13 import net.walend.somnifugi.channel.Channel; 14 import net.walend.somnifugi.channel.ChannelFactory; 15 16 26 27 public class TimeoutChannelFactory 28 implements ChannelFactory<Message > 29 { 30 33 public static final int DEFAULTTIMEOUT = 600000; 35 public static final String DEFAULTTIMEOUTPROP = SomniProperties.DEFAULT + "." + SomniProperties.TIMEOUTPROP; 36 37 private ChannelFactory wrappedChannelFactory = null; 38 39 public TimeoutChannelFactory() 40 { 41 SomniLogger.IT.finer("Creating new TimeoutChannelFactory."); 42 } 43 44 47 public Puttable<Message > createPuttable(String destinationName,Context context) 48 throws SomniNamingException 49 { 50 return createChannel(destinationName,context).getPuttable(); 51 } 52 53 56 public Takable<Message > createTakable(String destinationName,Context context) 57 throws SomniNamingException 58 { 59 return createChannel(destinationName,context).getTakable(); 60 } 61 62 @SuppressWarnings ("unchecked") 64 public Channel<Message > createChannel(String destinationName,Context context) 65 throws SomniNamingException 66 { 67 try 68 { 69 Hashtable env = context.getEnvironment(); 70 int timeout = DEFAULTTIMEOUT; 71 72 String timeoutKey = destinationName + "."+SomniProperties.TIMEOUTPROP; 73 74 String timeoutString = null; 75 76 if(env.containsKey(timeoutKey)) 77 { 78 timeoutString = (String )env.get(timeoutKey); 79 } 80 else if(env.containsKey(DEFAULTTIMEOUTPROP)) 81 { 82 timeoutString = (String )env.get(DEFAULTTIMEOUTPROP); 83 } 84 if(timeoutString!=null) 85 { 86 timeout = Integer.parseInt(timeoutString); 87 } 88 SomniLogger.IT.finest("Creating TimeoutChannel with timeout "+timeout+" for "+destinationName); 89 90 wrappedChannelFactory = ChannelFactoryCache.IT.getChannelFactory(SomniProperties.WRAPPEDPROP+destinationName,context,true); 91 92 Channel<Message > wrappedChannel =wrappedChannelFactory.createChannel(SomniProperties.WRAPPEDPROP+destinationName,context); 93 94 return new TimeoutChannel(timeout,wrappedChannel); 95 } 96 catch(NamingException ne) 97 { 98 throw new SomniNamingException(ne); 99 } 100 } 101 102 private static class TimeoutChannel 103 implements Channel<Message >,Puttable<Message >,Takable<Message > 104 { 105 private Channel<Message > wrappedChannel; 106 private Puttable<Message > wrappedPuttable; 107 private Takable<Message > wrappedTakable; 108 private int timeout; 109 110 public TimeoutChannel(int timeout,Channel<Message > wrappedChannel) 111 { 112 this.timeout = timeout; 113 this.wrappedChannel = wrappedChannel; 114 this.wrappedPuttable = wrappedChannel.getPuttable(); 115 this.wrappedTakable = wrappedChannel.getTakable(); 116 } 117 118 public void put(Message item) 119 throws InterruptedException 120 { 121 boolean result = wrappedPuttable.offer(item,timeout); 122 if(!result) 123 { 124 throw new InterruptedException ("Attempt to offer timed out after "+timeout); 125 } 126 } 127 128 public void pushBack(Message item) 129 throws InterruptedException 130 { 131 wrappedTakable.pushBack(item); 132 } 133 134 public boolean offer(Message item, long msecs) 135 throws InterruptedException 136 { 137 return wrappedPuttable.offer(item,msecs); 138 } 139 140 public Message take() 141 throws InterruptedException 142 { 143 return wrappedTakable.take(); 144 } 145 146 public Message poll() 147 { 148 return wrappedTakable.poll(); 149 } 150 151 public Message poll(long msecs) 152 throws InterruptedException 153 { 154 return wrappedTakable.poll(msecs); 155 } 156 157 public Message peek() 158 { 159 return wrappedTakable.peek(); 160 } 161 162 public boolean hasRealPushback() 164 { 165 return wrappedChannel.hasRealPushback(); 166 } 167 168 public boolean supportsPriorities() 169 { 170 return wrappedChannel.supportsPriorities(); 171 } 172 173 public boolean supportsMessageSelectors() 174 { 175 return wrappedChannel.supportsMessageSelectors(); 176 } 177 178 public Enumeration snapShot() 179 { 180 return wrappedTakable.snapShot(); 181 } 182 183 public Enumeration snapShot(SomniMessageSelector messageSelector) 184 throws SomniMessageSelectorException 185 { 186 return wrappedTakable.snapShot(messageSelector); 187 } 188 189 public Puttable<Message > getPuttable() 190 { 191 return this; 192 } 193 194 public Takable<Message > getTakable() 195 { 196 return this; 197 } 198 199 public Takable<Message > getTakable(SomniMessageSelector messageSelector) 200 { 201 return wrappedChannel.getTakable(messageSelector); 202 } 203 204 public int guessSize() 205 { 206 return wrappedTakable.guessSize(); 207 } 208 209 } 210 } 211 212 232 | Popular Tags |