1 29 30 package com.caucho.jms.resource; 31 32 import com.caucho.config.Config; 33 import com.caucho.config.ConfigException; 34 import com.caucho.config.types.InitProgram; 35 import com.caucho.jms.AbstractDestination; 36 import com.caucho.jms.ConnectionFactoryImpl; 37 import com.caucho.util.L10N; 38 import com.caucho.util.Log; 39 40 import javax.annotation.*; 41 import javax.jms.*; 42 import java.util.logging.Logger ; 43 44 47 public class ListenerResource { 48 private static L10N L = new L10N(ListenerResource.class); 49 protected static Logger log = Log.open(ListenerResource.class); 50 51 private ConnectionFactory _connFactory; 52 private Connection _conn; 53 54 private Destination _destination; 55 56 private int _listenerMax = 5; 57 private ListenerConfig _listenerConfig; 58 59 64 public void setConnectionFactory(ConnectionFactory factory) 65 { 66 _connFactory = factory; 67 } 68 69 74 public void setDestination(Destination destination) 75 { 76 _destination = destination; 77 } 78 79 82 public void setListener(ListenerConfig config) 83 { 84 _listenerConfig = config; 85 } 86 87 @PostConstruct 88 public void init() throws ConfigException, JMSException 89 { 90 if (_destination == null) 91 throw new ConfigException(L.l("'destination' is required for ListenerResource.")); 92 93 if (_listenerConfig == null) 94 throw new ConfigException(L.l("'listener' is required for ListenerResource.")); 95 96 if (_connFactory == null && _destination instanceof AbstractDestination) 97 _connFactory = new ConnectionFactoryImpl(); 98 99 if (_connFactory == null) 100 throw new ConfigException(L.l("connection-factory is required for ListenerResource.")); 101 102 _conn = _connFactory.createConnection(); 103 104 if (_destination instanceof Topic) 105 _listenerMax = 1; 106 } 107 108 public void start() throws Throwable 109 { 110 for (int i = 0; i < _listenerMax; i++) { 111 MessageListener listener = _listenerConfig.newInstance(); 112 113 Session session = _conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 114 115 MessageConsumer consumer = session.createConsumer(_destination); 116 117 consumer.setMessageListener(listener); 118 } 119 120 _conn.start(); 121 122 log.fine("ListenerResource[" + _destination + "] started"); 123 } 124 125 public void stop() throws JMSException 126 { 127 _conn.stop(); 128 129 log.fine("ListenerResource[" + _destination + "] stopped"); 130 } 131 132 public static class ListenerConfig { 133 private Class _type; 134 private InitProgram _init; 135 136 143 public void setType(Class type) throws ConfigException 144 { 145 Config.validate(type, MessageListener.class); 146 147 _type = type; 148 } 149 150 153 public void setInit(InitProgram init) 154 { 155 _init = init; 156 } 157 158 public void init() throws ConfigException 159 { 160 if (_type == null) 161 throw new ConfigException(L.l("'type' is required for listener.")); 162 } 163 164 171 public MessageListener newInstance() throws Throwable , InstantiationException 172 { 173 MessageListener listener = (MessageListener) _type.newInstance(); 174 175 if (_init != null) 176 _init.configure(listener); 177 178 return listener; 179 } 180 } 181 } 182 183 | Popular Tags |