KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > org > compiere > util > Queue


1 /******************************************************************************
2  * The contents of this file are subject to the Compiere License Version 1.1
3  * ("License"); You may not use this file except in compliance with the License
4  * You may obtain a copy of the License at http://www.compiere.org/license.html
5  * Software distributed under the License is distributed on an "AS IS" basis,
6  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
7  * the specific language governing rights and limitations under the License.
8  * The Original Code is Compiere ERP & CRM Business Solution
9  * The Initial Developer of the Original Code is Jorg Janke and ComPiere, Inc.
10  * Portions created by Jorg Janke are Copyright (C) 1999-2001 Jorg Janke, parts
11  * created by ComPiere are Copyright (C) ComPiere, Inc.; All Rights Reserved.
12  * Contributor(s): ______________________________________.
13  *****************************************************************************/

14 package org.compiere.util;
15
16 import java.io.*;
17 import java.sql.*;
18 import java.util.*;
19 //import oracle.AQ.*;
20

21 import org.compiere.db.CConnection;
22
23 /**
24  * Message Queuing
25  *
26  * @author Jorg Janke
27  * @version $Id: Queue.java,v 1.7 2002/05/16 21:51:17 jjanke Exp $
28  */

29 public final class Queue implements Serializable
30 {
31 /*
32     private static Connection s_connection;
33     private static AQSession s_session;
34     private static AQQueue s_queueS;
35     private static AQQueue s_queueC;
36     //
37     private static String s_queueName = "AQ_";
38     private static String s_toServer = "S";
39     private static String s_toClient = "C";
40     private static String s_table_extension = "_Tab";
41
42
43     /**
44      * Create AQ Session
45      *
46     private static AQSession createSession()
47     {
48         Log.trace(Log.l3_Util, "Queue.createSession");
49         if (s_connection == null) // autoCommit
50             // get dedicated
51             s_connection = DB.createConnection(true, Connection.TRANSACTION_SERIALIZABLE);
52
53         AQSession aq_sess = null;
54         try
55         {
56             Class.forName("oracle.AQ.AQOracleDriver");
57         }
58         catch (ClassNotFoundException e)
59         {
60             Log.error("Queue.createSession (Driver)", e);
61         }
62         try
63         {
64             aq_sess = AQDriverManager.createAQSession(s_connection);
65         }
66         catch (Exception e)
67         {
68             Log.error("Queue.createSession", e);
69         }
70
71         return aq_sess;
72     } // createSession
73
74     /**
75      * Get AQ Session
76      *
77     private static AQSession getSession()
78     {
79         if (s_session == null)
80             s_session = createSession();
81         return s_session;
82     } // getSession
83
84     /**
85      * Create Queue
86      *
87     private static AQQueue createQueue(String name)
88     {
89         Log.trace(Log.l3_Util, "Queue.createQueue - " + name);
90         //
91         AQSession session = getSession();
92         if (session == null)
93             return null;
94
95         AQQueue queue = null;
96         try
97         {
98             // Payload type: raw - Age Ordered
99             AQQueueTableProperty t_property = new AQQueueTableProperty("RAW");
100             t_property.setComment("Compiere Client/Server Communication Table");
101             t_property.setSortOrder("ENQ_TIME");
102             t_property.setMultiConsumer(false);
103         // t_property.setCompatible("8.1");
104             // Create table in the Compiere scheme
105             String tabName = name + s_table_extension;
106             AQQueueTable table = session.createQueueTable(CConnection.get().getDbUid(),
107                 tabName, t_property);
108             Log.trace(Log.l5_DData, "Queue Table created - " + tabName);
109
110             // Create Queue property
111             AQQueueProperty q_property = new AQQueueProperty();
112             q_property.setComment("Compiere Client/Server Communication Queue");
113             q_property.setRetentionTime(AQQueueProperty.INFINITE);
114             queue = session.createQueue(table, name, q_property);
115             Log.trace(Log.l5_DData, "Queue created - " + name);
116         }
117         catch (Exception e)
118         {
119             Log.error("Queue.createQueue", e);
120         }
121         //
122         return queue;
123     } // createQueue
124
125     /**
126      * Get Queue "Test"
127      *
128     protected static AQQueue getQueue(boolean toServer, boolean reset)
129     {
130         if (reset)
131             close(false);
132         // Server Queue
133         if (toServer && s_queueS == null)
134         {
135             AQSession session = getSession();
136             try
137             {
138                 s_queueS = session.getQueue(CConnection.get().getDbUid(),
139                     s_queueName+s_toServer);
140             }
141             catch (AQException e)
142             {
143                 try
144                 {
145                     // No Queue - So create it
146                     s_queueS = createQueue(s_queueName+s_toServer);
147                     s_queueS.start();
148                 }
149                 catch (Exception e2)
150                 {
151                     Log.error("Queue.getQueue (create1)", e2);
152                 }
153             }
154         }
155         // Client Queue
156         if (!toServer && s_queueC == null)
157         {
158             AQSession session = getSession();
159             try
160             {
161                 s_queueC = session.getQueue(CConnection.get().getDbUid(),
162                     s_queueName+s_toClient);
163             }
164             catch (AQException e)
165             {
166                 try
167                 {
168                     // No Queue - So create it
169                     s_queueC = createQueue(s_queueName+s_toClient);
170                     s_queueC.start();
171                 }
172                 catch (Exception e2)
173                 {
174                     Log.error("Queue.getQueue (create2)", e2);
175                 }
176             }
177         }
178         if (toServer)
179             return s_queueS;
180         return s_queueC;
181     } // getQueue
182
183     /**
184      * Drop Queue
185      *
186     private static void dropQueues()
187     {
188         Log.trace(Log.l3_Util, "Queue.dropQueues");
189         //
190         String tabNameS = s_queueName + s_toServer + s_table_extension;
191         String tabNameC = s_queueName + s_toClient + s_table_extension;
192         try
193         {
194             AQQueueTable tableS = getSession().getQueueTable(CConnection.get().getDbUid(), tabNameS);
195             AQQueueTable tableC = getSession().getQueueTable(CConnection.get().getDbUid(), tabNameC);
196             tableS.drop(true);
197             tableC.drop(true);
198         }
199         catch (Exception e)
200         {
201             Log.error("Queue.dropQueues - " + e.getMessage());
202         }
203     } // dropQueues
204
205     /**
206      * Close Queues & Session
207      *
208     public static void close (boolean closeConnection)
209     {
210         s_queueC = null;
211         s_queueS = null;
212         s_session = null;
213         if (closeConnection && s_connection != null)
214         {
215             try
216             {
217                 s_connection.close();
218             }
219             catch (SQLException e)
220             {
221                 Log.error("Queue.close", e);
222             }
223             s_connection = null;
224         }
225     } // close
226
227
228     /**
229      * Send Message
230      *
231     protected static boolean send (Serializable info, boolean toServer)
232     {
233         if (info == null)
234             return false;
235
236         // Serialize info
237         byte[] data = null;
238         try
239         {
240             ByteArrayOutputStream baos = new ByteArrayOutputStream();
241             ObjectOutputStream oos = new ObjectOutputStream (baos);
242             oos.writeObject(info);
243             oos.flush();
244             oos.close();
245             data = baos.toByteArray();
246             baos.close();
247         }
248         catch (IOException ioe)
249         {
250             Log.error("Queue.send IO - " + ioe.getMessage());
251             return false;
252         }
253
254         try
255         {
256             // create Message
257             AQQueue queue = getQueue(toServer, false);
258             AQMessage message = queue.createMessage();
259             // populate payload
260             AQRawPayload rawPayload = message.getRawPayload();
261             rawPayload.setStream(data, data.length);
262
263             // Standard enqueue Options
264             AQEnqueueOption option = new AQEnqueueOption();
265
266             // Enqueue
267             queue.enqueue(option, message);
268         }
269         catch (Exception e)
270         {
271             Log.error("Queue.send", e);
272             return false;
273         }
274         Log.trace(Log.l4_Data, "Queue.send " + info.getClass()
275             + (toServer ? " ToServer" : " ToClient")
276             + ", Size=" + data.length);
277         return true;
278     } // send
279
280     /**
281      * Receive message
282      *
283     protected static Serializable receive (boolean fromServer)
284     {
285         // Get Queue
286         AQQueue queue = getQueue(!fromServer, false);
287
288         // Get Message
289         AQMessage message = null;
290         try
291         {
292             // Set Dequeue Option
293             AQDequeueOption option = new AQDequeueOption();
294             option.setWaitTime(1); // one second wait
295         // option.setDequeueMode(AQDequeueOption.DEQUEUE_REMOVE);
296             //
297             message = queue.dequeue(option);
298         }
299         catch (AQOracleSQLException e)
300         {
301             if (e.getErrorCode() == 25228) // timeout
302                 return null;
303             Log.error("Queue.receive", e);
304             return null;
305         }
306         catch (Exception e)
307         {
308             Log.error("Queue.receive", e);
309             return null;
310         }
311
312         Serializable info = deserialize (message);
313
314         Log.trace(Log.l4_Data, "Queue.receive " + info.getClass()
315             + (fromServer ? " FromServer" : " FromClient"));
316         return info;
317     } // receive
318
319     /**
320      * De-Serialize
321      *
322     private static Serializable deserialize (AQMessage message)
323     {
324         // Deserialize
325         Serializable info = null;
326         try
327         {
328             // get Payload
329             AQRawPayload raw_payload = message.getRawPayload();
330             byte[] data = raw_payload.getBytes();
331             //
332             ByteArrayInputStream bais = new ByteArrayInputStream(data);
333             ObjectInputStream ois = new ObjectInputStream (bais);
334             info = (Serializable)ois.readObject();
335             ois.close();
336             bais.close();
337         }
338         catch (Exception e)
339         {
340             Log.error("Queue.deserialize", e);
341             return null;
342         }
343         return info;
344     } // deserialize
345
346     /**
347      * List message
348      *
349     protected static ArrayList listMessages (boolean fromServer)
350     {
351         // Get Queue
352         AQQueue queue = getQueue(!fromServer, true);
353         AQDequeueOption option = new AQDequeueOption();
354         try
355         {
356             Log.trace(Log.l3_Util, "Queue.listMessages - " + queue.getName());
357
358             // Set Dequeue Option
359             option.setWaitTime(0); // no wait
360             option.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE);
361         }
362         catch (Exception e)
363         {
364             Log.error("Queue.lsiMessages", e);
365         }
366
367         ArrayList list = new ArrayList();
368
369         // Get Messages
370         AQMessage message = null;
371         do
372         {
373             try
374             {
375                 message = queue.dequeue(option);
376                 Serializable info = deserialize (message);
377                 list.add(info);
378                 Log.trace(Log.l4_Data, "> " + info.toString());
379             }
380             catch (AQOracleSQLException e)
381             {
382                 if (e.getErrorCode() != 25228) // timeout
383                     Log.error("Queue.receive", e);
384                 message = null;
385             }
386             catch (Exception e)
387             {
388                 Log.error("Queue.receive", e);
389             }
390         } while (message != null);
391         return list;
392     } // listMessages
393
394
395
396
397     public static boolean sendToServer(Serializable info)
398     {
399         return send (info, true);
400     }
401
402     public static boolean sendToClient(Serializable info)
403     {
404         return send (info, false);
405     }
406
407     public static Serializable receiveFromServer()
408     {
409         return receive (true);
410     }
411
412     public static Serializable receiveFromClient()
413     {
414         return receive (false);
415     }
416
417     /**************************************************************************
418      * Main Test
419      *
420     public static void main (String[] args)
421     {
422         Env.initTest(9, true); // run as Client
423
424     // dropQueues();
425         Timestamp t = new Timestamp(System.currentTimeMillis());
426
427         sendToServer ("This is a new test " + t.toString());
428         sendToClient (new KeyNamePair (21, "Twenty-one " + t.toString()));
429         sendToServer ("This is a second test " + t.toString());
430         sendToClient (new KeyNamePair (22, "Twenty-two " + t.toString()));
431         listMessages(true);
432         listMessages(false);
433         System.out.println(receiveFromClient());
434         System.out.println(receiveFromServer());
435         listMessages(true);
436         listMessages(false);
437
438         System.out.println("Fini");
439         System.exit(0);
440     // AEnv.exit(0);
441     } // Main
442 */

443 } // Queue
444
Popular Tags