KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > rift > coad > daemon > messageservice > named > NamedQueueImpl


1 /*
2  * <Add library description here>
3  * Copyright (C) 2006 Rift IT Contracting
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18  *
19  * NamedQueueImpl.java
20  */

21
22 package com.rift.coad.daemon.messageservice.named;
23
24 // java imports
25
import java.rmi.Remote JavaDoc;
26 import java.rmi.RemoteException JavaDoc;
27 import java.util.ArrayList JavaDoc;
28 import java.util.Date JavaDoc;
29 import java.util.List JavaDoc;
30 import java.util.HashMap JavaDoc;
31 import java.util.Map JavaDoc;
32 import java.util.Vector JavaDoc;
33 import java.util.concurrent.ConcurrentHashMap JavaDoc;
34 import java.util.concurrent.ConcurrentLinkedQueue JavaDoc;
35 import javax.naming.Context JavaDoc;
36 import javax.naming.InitialContext JavaDoc;
37 import javax.transaction.SystemException JavaDoc;
38 import javax.transaction.UserTransaction JavaDoc;
39 import javax.transaction.Status JavaDoc;
40 import javax.transaction.xa.XAException JavaDoc;
41 import javax.transaction.xa.XAResource JavaDoc;
42 import javax.transaction.xa.Xid JavaDoc;
43
44 // logging import
45
import org.apache.log4j.Logger;
46
47 // hibernate imports
48
import org.hibernate.*;
49 import org.hibernate.cfg.*;
50
51 // coadunation imports
52
import com.rift.coad.daemon.messageservice.db.*;
53 import com.rift.coad.daemon.messageservice.Message;
54 import com.rift.coad.daemon.messageservice.MessageManager;
55 import com.rift.coad.daemon.messageservice.MessageQueue;
56 import com.rift.coad.daemon.messageservice.MessageQueueManager;
57 import com.rift.coad.daemon.messageservice.MessageService;
58 import com.rift.coad.daemon.messageservice.MessageServiceException;
59 import com.rift.coad.daemon.messageservice.MessageServiceImpl;
60 import com.rift.coad.daemon.messageservice.NamedQueue;
61 import com.rift.coad.daemon.messageservice.TimeoutException;
62 import com.rift.coad.daemon.messageservice.message.MessageImpl;
63 import com.rift.coad.daemon.messageservice.message.MessageManagerFactory;
64 import com.rift.coad.daemon.messageservice.message.MessageManagerImpl;
65 import com.rift.coad.daemon.servicebroker.ServiceBroker;
66 import com.rift.coad.hibernate.util.HibernateUtil;
67 import com.rift.coad.lib.configuration.Configuration;
68 import com.rift.coad.lib.configuration.ConfigurationFactory;
69 import com.rift.coad.lib.Resource;
70 import com.rift.coad.lib.ResourceIndex;
71 import com.rift.coad.util.transaction.TransactionManager;
72 import com.rift.coad.util.lock.LockRef;
73 import com.rift.coad.util.lock.ObjectLockFactory;
74 import com.rift.coad.util.connection.ConnectionManager;
75
76 /**
77  * The implementation of the Queue. This object represents a named database
78  * queue.
79  *
80  * @author Brett Chaldecott
81  */

82 public class NamedQueueImpl implements NamedQueue,ResourceIndex,Resource JavaDoc {
83     
84     // class constants
85
private final static String JavaDoc TIMEOUT = "QUEUE_TIMEOUT";
86     private final static long DEFAULT_TIMEOUT = 30000;
87     
88     
89     // the logger reference
90
protected Logger log =
91             Logger.getLogger(NamedQueueImpl.class.getName());
92     
93     
94     // private member variables
95
private String JavaDoc queueName = null;
96     private long maxTimeout = DEFAULT_TIMEOUT;
97     
98     
99     /**
100      * The constructor of the queue.
101      *
102      * @param queueName The name of the queue.
103      * @exception MessageServiceException
104      */

105     public NamedQueueImpl(String JavaDoc queueName) throws MessageServiceException {
106         try {
107             this.queueName = queueName;
108             Configuration config = ConfigurationFactory.getInstance().getConfig(
109                     this.getClass());
110             maxTimeout = config.getLong(TIMEOUT,DEFAULT_TIMEOUT);
111         } catch (Exception JavaDoc ex) {
112             log.error("Failed to init the Named Queue : " + ex.getMessage(),ex);
113             throw new MessageServiceException(
114                     "Failed to init the Named Queue : " + ex.getMessage(),ex);
115         }
116     }
117     
118     
119     /**
120      * This method returns a message for processing. If that message is not
121      * acknowledged by the target with in a configured time it will be made
122      * available for processing again.
123      *
124      * @return The reference to the Message for processing.
125      * @param delay The delay before returning a null reference.
126      * @exception RemoteException
127      * @exception MessageServiceException
128      * @exception TimeoutException
129      */

130     public Message receive(long delay) throws RemoteException JavaDoc,
131             MessageServiceException, TimeoutException {
132         try {
133             return NamedMemoryQueue.getInstance(queueName).poll(delay);
134         } catch (Throwable JavaDoc ex) {
135             log.error("Failed to retrieve message : " + ex.getMessage(),ex);
136             throw new MessageServiceException(
137                     "Failed to retrieve message : " + ex.getMessage(),ex);
138         }
139     }
140     
141     
142     /**
143      * This method adds a service to the list of services used to identify this
144      * queue, by the service broker.
145      *
146      * @param service The string containing the service name.
147      * @exception RemoteException
148      * @exception MessageServiceException
149      */

150     public void addService(String JavaDoc service) throws RemoteException JavaDoc,
151             MessageServiceException {
152         try {
153             Session session = HibernateUtil.getInstance(
154                     MessageServiceImpl.class).getSession();
155             com.rift.coad.daemon.messageservice.db.MessageQueue mq =
156                     (com.rift.coad.daemon.messageservice.db.MessageQueue)
157                     session.get(com.rift.coad.daemon.messageservice.db.
158
                    MessageQueue.class,queueName);
159             MessageQueueService messageQueueService = new MessageQueueService(
160                     service,mq);
161             session.persist(messageQueueService);
162             ServiceBroker broker = (ServiceBroker)ConnectionManager.
163                     getInstance().getConnection(ServiceBroker.class,
164                     "ServiceBroker");
165             List JavaDoc serviceList = new ArrayList JavaDoc();
166             serviceList.add(service);
167             broker.registerService(MessageService.JNDI_URL,serviceList);
168         } catch (Exception JavaDoc ex) {
169             log.error("Failed to add a service : " + ex.getMessage(),ex);
170             throw new MessageServiceException(
171                     "Failed to add a service : " + ex.getMessage(),ex);
172         }
173     }
174     
175     
176     /**
177      * This method returns a list of services used to identify this queue to the
178      * service broker.
179      *
180      * @return The list of service used to identify this queue to the service
181      * broker.
182      * @exception RemoteException
183      * @exception MessageServiceException
184      */

185     public List JavaDoc listServices() throws RemoteException JavaDoc,
186             MessageServiceException {
187         try {
188             Session session = HibernateUtil.getInstance(
189                     MessageServiceImpl.class).getSession();
190             List JavaDoc entries = session.createQuery(
191                     "SELECT mqs.service FROM MessageQueueService as mqs " +
192                     "WHERE mqs.messageQueue.named = ?").
193                     setString(0,this.queueName).list();
194             List JavaDoc result = new ArrayList JavaDoc();
195             for (int index = 0; index < entries.size(); index++) {
196                 result.add(((Object JavaDoc[])entries.get(index))[index]);
197             }
198             return result;
199         } catch (Exception JavaDoc ex) {
200             log.error("Failed to add a service : " + ex.getMessage(),ex);
201             throw new MessageServiceException(
202                     "Failed to add a service : " + ex.getMessage(),ex);
203         }
204     }
205     
206     
207     /**
208      * This method removes a service from the list of services.
209      *
210      * @param service The name of the service to remove.
211      * @exception RemoteException
212      * @exception MessageServiceException
213      */

214     public void removeService(String JavaDoc service) throws RemoteException JavaDoc,
215             MessageServiceException {
216         try {
217             Session session = HibernateUtil.getInstance(
218                     MessageServiceImpl.class).getSession();
219             session.createQuery(
220                     "DELETE FROM MessageQueueService as mqs " +
221                     "WHERE mqs.service = ? AND mqs.messageQueue.named = ?").
222                     setString(0,service).setString(1,this.queueName).
223                     executeUpdate();
224             ServiceBroker broker = (ServiceBroker)ConnectionManager.
225                     getInstance().getConnection(ServiceBroker.class,
226                     "ServiceBroker");
227             List JavaDoc mqs = session.createQuery(
228                     "FROM MessageQueueService as mqs " +
229                     "WHERE mqs.service = ?").setString(0,service).list();
230             if (mqs.size() == 0) {
231                 List JavaDoc serviceList = new ArrayList JavaDoc();
232                 serviceList.add(service);
233                 broker.removeServiceProviders(MessageService.JNDI_URL,
234                         serviceList);
235             }
236         } catch (Exception JavaDoc ex) {
237             log.error("Failed to remove a service : " + ex.getMessage(),ex);
238             throw new MessageServiceException(
239                     "Failed to remove a service : " + ex.getMessage(),ex);
240         }
241     }
242     
243     
244     /**
245      * This method returns the primary key of this resource to enable
246      * indexing.
247      *
248      * @return The primary key of this object.
249      */

250     public Object JavaDoc getPrimaryKey() {
251         return queueName;
252     }
253     
254     
255     /**
256      * This method returns the name of the resource.
257      *
258      * @return The string containing the name of the resource.
259      */

260     public String JavaDoc getResourceName() {
261         return queueName;
262     }
263     
264     
265     /**
266      * This method will be called to release the resources controlled by
267      * this object.
268      *
269      * @param This method adds a new resource.
270      */

271     public void releaseResource() {
272         
273     }
274     
275 }
276
Popular Tags