1 16 package org.outerj.daisy.event; 17 18 import org.apache.avalon.framework.service.Serviceable; 19 import org.apache.avalon.framework.service.ServiceManager; 20 import org.apache.avalon.framework.service.ServiceException; 21 import org.apache.avalon.framework.activity.Startable; 22 import org.apache.avalon.framework.activity.Initializable; 23 import org.apache.avalon.framework.configuration.Configuration; 24 import org.apache.avalon.framework.configuration.Configurable; 25 import org.apache.avalon.framework.configuration.ConfigurationException; 26 import org.apache.avalon.framework.logger.AbstractLogEnabled; 27 import org.outerj.daisy.jms.JmsClient; 28 import org.outerj.daisy.jms.Sender; 29 30 import javax.jms.*; 31 import javax.sql.DataSource ; 32 import java.sql.Connection ; 33 import java.sql.PreparedStatement ; 34 import java.sql.ResultSet ; 35 import java.util.ArrayList ; 36 import java.util.Iterator ; 37 38 42 public class EventDispatcherImpl extends AbstractLogEnabled implements EventDispatcher, Serviceable, Startable, 43 Initializable, Configurable { 44 private EventDispatchThread eventDispatchThread; 45 private DataSource dataSource; 46 private String jmsTopicName; 47 private JmsClient jmsClient; 48 private Sender topicSender; 49 private boolean stopping = false; 50 51 55 public void service(ServiceManager serviceManager) throws ServiceException { 56 this.dataSource = (DataSource )serviceManager.lookup("datasource"); 57 this.jmsClient = (JmsClient)serviceManager.lookup("jmsclient"); 58 } 59 60 public void configure(Configuration configuration) throws ConfigurationException { 61 this.jmsTopicName = configuration.getChild("jmsTopic").getValue(); 62 } 63 64 public void notifyNewEvents() { 65 eventDispatchThread.notify(); 66 } 67 68 public void initialize() throws Exception { 69 topicSender = jmsClient.getSender(jmsTopicName); 70 } 71 72 public void start() throws Exception { 73 eventDispatchThread = new EventDispatchThread(); 74 eventDispatchThread.setDaemon(true); 75 eventDispatchThread.start(); 76 } 77 78 public void stop() throws Exception { 79 stopping = true; 80 eventDispatchThread.interrupt(); 81 try { 82 eventDispatchThread.join(); 83 } catch (InterruptedException e) {} 84 } 85 86 private class EventDispatchThread extends Thread { 87 public EventDispatchThread() { 88 super("EventDispatcher"); 89 } 90 91 public synchronized void run() { 92 try { 93 while (true) { 94 Connection conn = null; 95 PreparedStatement stmt = null; 96 PreparedStatement messageStmt = null; 97 PreparedStatement removeEventStmt = null; 98 try { 99 conn = dataSource.getConnection(); 100 101 stmt = conn.prepareStatement("select seqnr from events order by seqnr"); 102 ResultSet rs = stmt.executeQuery(); 103 ArrayList seqnrsToProcess = new ArrayList (); 104 105 while (rs.next()) { 106 seqnrsToProcess.add(new Long (rs.getLong(1))); 107 } 108 stmt.close(); 109 110 messageStmt = conn.prepareStatement("select message_type, message from events where seqnr = ?"); 111 removeEventStmt = conn.prepareStatement("delete from events where seqnr = ?"); 112 113 Iterator seqnrsToProcessIt = seqnrsToProcess.iterator(); 114 while (seqnrsToProcessIt.hasNext()) { 115 if (stopping) 117 return; 118 long seqnr = ((Long )seqnrsToProcessIt.next()).longValue(); 119 120 messageStmt.setLong(1, seqnr); 121 rs = messageStmt.executeQuery(); 122 rs.next(); 123 String messageType = rs.getString(1); 124 String message = rs.getString(2); 125 rs.close(); 126 127 if (getLogger().isDebugEnabled()) 128 getLogger().debug("Will forward message " + seqnr + " to JMS."); 129 130 Message jmsMessage = topicSender.createTextMessage(message); 131 jmsMessage.setStringProperty("type", messageType); 132 133 if (stopping) 136 return; 137 topicSender.send(jmsMessage); 138 139 removeEventStmt.setLong(1, seqnr); 140 removeEventStmt.execute(); 141 } 142 } catch (Throwable e) { 143 if (stopping) { 144 return; 145 } else { 146 EventDispatcherImpl.this.getLogger().error("Exception in event dispatcher.", e); 147 } 148 } finally { 149 closeStatement(stmt); 150 closeStatement(messageStmt); 151 closeStatement(removeEventStmt); 152 try { 153 if (conn != null) 154 conn.close(); 155 } catch (Throwable e) { 156 getLogger().error("Failed to close database connection.", e); 157 } 158 } 159 if (stopping) 160 return; 161 wait(5000); 162 } 163 } catch (InterruptedException e) { 164 EventDispatcherImpl.this.getLogger().info("Event dispatcher thread interrupted."); 165 } 166 } 167 168 private void closeStatement(PreparedStatement stmt) { 169 try { 170 if (stmt != null) 171 stmt.close(); 172 } catch (Throwable e) { 173 getLogger().error("Failed to close JDBC statement.", e); 174 } 175 } 176 } 177 } 178 | Popular Tags |