KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > outerj > daisy > event > EventDispatcherImpl


1 /*
2  * Copyright 2004 Outerthought bvba and Schaubroeck nv
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */

16 package org.outerj.daisy.event;
17
18 import org.apache.avalon.framework.service.Serviceable;
19 import org.apache.avalon.framework.service.ServiceManager;
20 import org.apache.avalon.framework.service.ServiceException;
21 import org.apache.avalon.framework.activity.Startable;
22 import org.apache.avalon.framework.activity.Initializable;
23 import org.apache.avalon.framework.configuration.Configuration;
24 import org.apache.avalon.framework.configuration.Configurable;
25 import org.apache.avalon.framework.configuration.ConfigurationException;
26 import org.apache.avalon.framework.logger.AbstractLogEnabled;
27 import org.outerj.daisy.jms.JmsClient;
28 import org.outerj.daisy.jms.Sender;
29
30 import javax.jms.*;
31 import javax.sql.DataSource JavaDoc;
32 import java.sql.Connection JavaDoc;
33 import java.sql.PreparedStatement JavaDoc;
34 import java.sql.ResultSet JavaDoc;
35 import java.util.ArrayList JavaDoc;
36 import java.util.Iterator JavaDoc;
37
38 /**
39  * @avalon.component version="1.0" name="eventdispatcher" lifestyle="singleton"
40  * @avalon.service type="org.outerj.daisy.event.EventDispatcher"
41  */

42 public class EventDispatcherImpl extends AbstractLogEnabled implements EventDispatcher, Serviceable, Startable,
43         Initializable, Configurable {
44     private EventDispatchThread eventDispatchThread;
45     private DataSource JavaDoc dataSource;
46     private String JavaDoc jmsTopicName;
47     private JmsClient jmsClient;
48     private Sender topicSender;
49     private boolean stopping = false;
50
51     /**
52      * @avalon.dependency key="datasource" type="javax.sql.DataSource"
53      * @avalon.dependency key="jmsclient" type="org.outerj.daisy.jms.JmsClient"
54      */

55     public void service(ServiceManager serviceManager) throws ServiceException {
56         this.dataSource = (DataSource JavaDoc)serviceManager.lookup("datasource");
57         this.jmsClient = (JmsClient)serviceManager.lookup("jmsclient");
58     }
59
60     public void configure(Configuration configuration) throws ConfigurationException {
61         this.jmsTopicName = configuration.getChild("jmsTopic").getValue();
62     }
63
64     public void notifyNewEvents() {
65         eventDispatchThread.notify();
66     }
67
68     public void initialize() throws Exception JavaDoc {
69         topicSender = jmsClient.getSender(jmsTopicName);
70     }
71
72     public void start() throws Exception JavaDoc {
73         eventDispatchThread = new EventDispatchThread();
74         eventDispatchThread.setDaemon(true);
75         eventDispatchThread.start();
76     }
77
78     public void stop() throws Exception JavaDoc {
79         stopping = true;
80         eventDispatchThread.interrupt();
81         try {
82             eventDispatchThread.join();
83         } catch (InterruptedException JavaDoc e) {}
84     }
85
86     private class EventDispatchThread extends Thread JavaDoc {
87         public EventDispatchThread() {
88             super("EventDispatcher");
89         }
90
91         public synchronized void run() {
92             try {
93                 while (true) {
94                     Connection JavaDoc conn = null;
95                     PreparedStatement JavaDoc stmt = null;
96                     PreparedStatement JavaDoc messageStmt = null;
97                     PreparedStatement JavaDoc removeEventStmt = null;
98                     try {
99                         conn = dataSource.getConnection();
100
101                         stmt = conn.prepareStatement("select seqnr from events order by seqnr");
102                         ResultSet JavaDoc rs = stmt.executeQuery();
103                         ArrayList JavaDoc seqnrsToProcess = new ArrayList JavaDoc();
104
105                         while (rs.next()) {
106                             seqnrsToProcess.add(new Long JavaDoc(rs.getLong(1)));
107                         }
108                         stmt.close();
109
110                         messageStmt = conn.prepareStatement("select message_type, message from events where seqnr = ?");
111                         removeEventStmt = conn.prepareStatement("delete from events where seqnr = ?");
112
113                         Iterator JavaDoc seqnrsToProcessIt = seqnrsToProcess.iterator();
114                         while (seqnrsToProcessIt.hasNext()) {
115                             // Check if we don't want to stop
116
if (stopping)
117                                 return;
118                             long seqnr = ((Long JavaDoc)seqnrsToProcessIt.next()).longValue();
119
120                             messageStmt.setLong(1, seqnr);
121                             rs = messageStmt.executeQuery();
122                             rs.next();
123                             String JavaDoc messageType = rs.getString(1);
124                             String JavaDoc message = rs.getString(2);
125                             rs.close();
126
127                             if (getLogger().isDebugEnabled())
128                                 getLogger().debug("Will forward message " + seqnr + " to JMS.");
129
130                             Message jmsMessage = topicSender.createTextMessage(message);
131                             jmsMessage.setStringProperty("type", messageType);
132
133                             // Again check if we don't want to stop, in an attempt to avoid a forever-wait
134
// condition in ActiveMQ when trying to send a message while the VM is shutting down.
135
if (stopping)
136                                 return;
137                             topicSender.send(jmsMessage);
138
139                             removeEventStmt.setLong(1, seqnr);
140                             removeEventStmt.execute();
141                         }
142                     } catch (Throwable JavaDoc e) {
143                         if (stopping) {
144                             return;
145                         } else {
146                             EventDispatcherImpl.this.getLogger().error("Exception in event dispatcher.", e);
147                         }
148                     } finally {
149                         closeStatement(stmt);
150                         closeStatement(messageStmt);
151                         closeStatement(removeEventStmt);
152                         try {
153                             if (conn != null)
154                                 conn.close();
155                         } catch (Throwable JavaDoc e) {
156                             getLogger().error("Failed to close database connection.", e);
157                         }
158                     }
159                     if (stopping)
160                         return;
161                     wait(5000);
162                 }
163             } catch (InterruptedException JavaDoc e) {
164                 EventDispatcherImpl.this.getLogger().info("Event dispatcher thread interrupted.");
165             }
166         }
167
168         private void closeStatement(PreparedStatement JavaDoc stmt) {
169             try {
170                 if (stmt != null)
171                     stmt.close();
172             } catch (Throwable JavaDoc e) {
173                 getLogger().error("Failed to close JDBC statement.", e);
174             }
175         }
176     }
177 }
178
Popular Tags