1 29 30 package com.caucho.ejb.message; 31 32 import com.caucho.config.ConfigException; 33 import com.caucho.ejb.AbstractContext; 34 import com.caucho.ejb.AbstractServer; 35 import com.caucho.ejb.EjbServerManager; 36 import com.caucho.util.L10N; 37 import com.caucho.util.Log; 38 39 import javax.ejb.MessageDrivenBean ; 40 import javax.ejb.MessageDrivenContext ; 41 import javax.jms.*; 42 import java.lang.reflect.Method ; 43 import java.util.ArrayList ; 44 import java.util.logging.Level ; 45 import java.util.logging.Logger ; 46 47 50 public class MessageServer extends AbstractServer { 51 private static final L10N L = new L10N(MessageServer.class); 52 protected static final Logger log = Log.open(MessageServer.class); 53 54 private Connection _connection; 55 private Destination _destination; 56 57 private String _subscriptionName; 58 private String _selector; 59 private int _acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 60 61 private int _consumerMax = 5; 62 63 private MessageDrivenContext _context; 64 65 private ArrayList <Consumer> _consumers = new ArrayList <Consumer>(); 66 67 public MessageServer(EjbServerManager manager) 68 { 69 super(manager); 70 71 _context = new MessageDrivenContextImpl(this); 72 } 73 74 77 public void setDestination(Destination destination) 78 { 79 _destination = destination; 80 } 81 82 85 public void setConsumerMax(int consumer) 86 { 87 _consumerMax = consumer; 88 } 89 90 93 public void init() 94 throws Exception 95 { 96 super.init(); 97 } 98 99 102 public void start() 103 throws Exception 104 { 105 super.start(); 106 107 ConnectionFactory factory = null; if (factory == null) 109 factory = _ejbManager.getConnectionFactory(); 110 111 if (_destination == null) 112 throw new ConfigException(L.l("No destination is configured.")); 113 114 if (_consumerMax <= 0) 115 throw new ConfigException(L.l("No listeners are configured.")); 116 117 if (factory == null) 118 throw new ConfigException(L.l("Message beans need a jms-connection-factory. The ConnectionFactory object must be configured.")); 119 120 Connection connection = factory.createConnection(); 121 _connection = connection; 122 123 if (_destination instanceof Topic) 124 _consumerMax = 1; 125 126 for (int i = 0; i < _consumerMax; i++) { 127 Consumer consumer = new Consumer(); 128 129 _consumers.add(consumer); 130 131 consumer.start(); 132 } 133 134 _connection.start(); 135 136 } 137 138 void generate() 139 throws Exception 140 { 141 } 142 143 public AbstractContext getContext(Object obj, boolean foo) 144 { 145 throw new UnsupportedOperationException (); 146 } 147 148 public Connection getJMSConnection() 149 { 150 return _connection; 151 } 152 153 public Destination getDestination() 154 { 155 return _destination; 156 } 157 158 161 public void destroy() 162 { 163 try { 164 ArrayList <Consumer> consumers = new ArrayList <Consumer>(_consumers); 165 _consumers = null; 166 167 for (Consumer consumer : consumers) { 168 consumer.destroy(); 169 } 170 171 if (_connection != null) 172 _connection.close(); 173 } catch (Exception e) { 174 log.log(Level.WARNING, e.toString(), e); 175 } 176 } 177 178 class Consumer { 179 private Session _session; 180 private MessageConsumer _consumer; 181 private MessageListener _listener; 182 183 Consumer() 184 throws Exception 185 { 186 } 187 188 191 void start() 192 throws Exception 193 { 194 Class cl = _contextImplClass; 195 196 _listener = (MessageListener) cl.newInstance(); 197 198 if (_listener instanceof MessageDrivenBean ) { 199 MessageDrivenBean bean = (MessageDrivenBean ) _listener; 200 bean.setMessageDrivenContext(_context); 201 } 202 203 Method create = null; 204 205 try { 206 create = cl.getMethod("ejbCreate", new Class [0]); 207 create.invoke(_listener, new Object [0]); 208 } catch (NoSuchMethodException e) { 209 } 210 211 boolean transacted = false; 214 215 _session = _connection.createSession(transacted, _acknowledgeMode); 216 217 if (_subscriptionName != null) { 218 Topic topic = (Topic) _destination; 219 220 _consumer = _session.createDurableSubscriber(topic, 221 _subscriptionName, 222 _selector, 223 true); 224 } 225 else { 226 _consumer = _session.createConsumer(_destination, _selector); 227 } 228 229 _consumer.setMessageListener(_listener); 230 } 231 232 235 public Session getSession() 236 throws JMSException 237 { 238 return _session; 239 } 240 241 244 private void destroy() 245 throws JMSException 246 { 247 Session session = _session; 248 _session = null; 249 250 try { 251 if (session != null) 252 session.close(); 253 } finally { 254 if (_listener instanceof MessageDrivenBean ) { 255 MessageDrivenBean bean = (MessageDrivenBean ) _listener; 256 _listener = null; 257 bean.ejbRemove(); 258 } 259 } 260 } 261 } 262 } 263 | Popular Tags |