1 29 30 package com.caucho.jms.jdbc; 31 32 import com.caucho.config.ConfigException; 33 import com.caucho.jms.JMSExceptionWrapper; 34 import com.caucho.jms.session.MessageConsumerImpl; 35 import com.caucho.jms.session.SessionImpl; 36 import com.caucho.log.Log; 37 import com.caucho.util.L10N; 38 39 import javax.annotation.PostConstruct; 40 import javax.jms.JMSException ; 41 import javax.jms.Message ; 42 import javax.jms.Queue ; 43 import javax.jms.QueueBrowser ; 44 import java.sql.SQLException ; 45 import java.util.logging.Logger ; 46 47 50 public class JdbcQueue extends JdbcDestination implements Queue { 51 static final Logger log = Log.open(JdbcQueue.class); 52 static final L10N L = new L10N(JdbcQueue.class); 53 54 private int _id; 55 56 public JdbcQueue() 57 { 58 } 59 60 63 public String getQueueName() 64 { 65 return getName(); 66 } 67 68 71 public void setQueueName(String name) 72 { 73 setName(name); 74 } 75 76 79 public int getId() 80 { 81 return _id; 82 } 83 84 87 @PostConstruct 88 public void init() 89 throws ConfigException, SQLException 90 { 91 if (_jdbcManager.getDataSource() == null) 92 throw new ConfigException(L.l("JdbcQueue requires a <data-source> element.")); 93 94 if (getName() == null) 95 throw new ConfigException(L.l("JdbcQueue requires a <queue-name> element.")); 96 97 _jdbcManager.init(); 98 99 _id = createDestination(getName(), false); 100 } 101 102 105 public MessageConsumerImpl createConsumer(SessionImpl session, 106 String selector, 107 boolean noWait) 108 throws JMSException 109 { 110 return new JdbcQueueConsumer(session, selector, _jdbcManager, this); 111 } 112 113 116 public QueueBrowser createBrowser(SessionImpl session, String selector) 117 throws JMSException 118 { 119 return new JdbcQueueBrowser(session, selector, this); 120 } 121 122 125 public void send(Message message) 126 throws JMSException 127 { 128 long expireTime = message.getJMSExpiration(); 129 if (expireTime <= 0) 130 expireTime = Long.MAX_VALUE / 2; 131 132 purgeExpiredMessages(); 133 134 try { 135 _jdbcManager.getJdbcMessage().send(message, _id, expireTime); 136 } catch (Exception e) { 137 throw new JMSExceptionWrapper(e); 138 } 139 140 messageAvailable(); 141 } 142 143 146 public void commit(int session) 147 throws JMSException 148 { 149 } 150 151 154 public String toString() 155 { 156 return "JdbcQueue[" + getName() + "]"; 157 } 158 } 159 160 | Popular Tags |