1 /******************************************************************************2 * The contents of this file are subject to the Compiere License Version 1.13 * ("License"); You may not use this file except in compliance with the License4 * You may obtain a copy of the License at http://www.compiere.org/license.html5 * Software distributed under the License is distributed on an "AS IS" basis,6 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for7 * the specific language governing rights and limitations under the License.8 * The Original Code is Compiere ERP & CRM Business Solution9 * The Initial Developer of the Original Code is Jorg Janke and ComPiere, Inc.10 * Portions created by Jorg Janke are Copyright (C) 1999-2001 Jorg Janke, parts11 * created by ComPiere are Copyright (C) ComPiere, Inc.; All Rights Reserved.12 * Contributor(s): ______________________________________.13 *****************************************************************************/14 package org.compiere.util;15 16 import java.io.*;17 import java.sql.*;18 import java.util.*;19 //import oracle.AQ.*;20 21 import org.compiere.db.CConnection;22 23 /**24 * Message Queuing25 *26 * @author Jorg Janke27 * @version $Id: Queue.java,v 1.7 2002/05/16 21:51:17 jjanke Exp $28 */29 public final class Queue implements Serializable30 {31 /*32 private static Connection s_connection;33 private static AQSession s_session;34 private static AQQueue s_queueS;35 private static AQQueue s_queueC;36 //37 private static String s_queueName = "AQ_";38 private static String s_toServer = "S";39 private static String s_toClient = "C";40 private static String s_table_extension = "_Tab";41 42 43 /**44 * Create AQ Session45 *46 private static AQSession createSession()47 {48 Log.trace(Log.l3_Util, "Queue.createSession");49 if (s_connection == null) // autoCommit50 // get dedicated51 s_connection = DB.createConnection(true, Connection.TRANSACTION_SERIALIZABLE);52 53 AQSession aq_sess = null;54 try55 {56 Class.forName("oracle.AQ.AQOracleDriver");57 }58 catch (ClassNotFoundException e)59 {60 Log.error("Queue.createSession (Driver)", e);61 }62 try63 {64 aq_sess = AQDriverManager.createAQSession(s_connection);65 }66 catch (Exception e)67 {68 Log.error("Queue.createSession", e);69 }70 71 return aq_sess;72 } // createSession73 74 /**75 * Get AQ Session76 *77 private static AQSession getSession()78 {79 if (s_session == null)80 s_session = createSession();81 return s_session;82 } // getSession83 84 /**85 * Create Queue86 *87 private static AQQueue createQueue(String name)88 {89 Log.trace(Log.l3_Util, "Queue.createQueue - " + name);90 //91 AQSession session = getSession();92 if (session == null)93 return null;94 95 AQQueue queue = null;96 try97 {98 // Payload type: raw - Age Ordered99 AQQueueTableProperty t_property = new AQQueueTableProperty("RAW");100 t_property.setComment("Compiere Client/Server Communication Table");101 t_property.setSortOrder("ENQ_TIME");102 t_property.setMultiConsumer(false);103 // t_property.setCompatible("8.1");104 // Create table in the Compiere scheme105 String tabName = name + s_table_extension;106 AQQueueTable table = session.createQueueTable(CConnection.get().getDbUid(),107 tabName, t_property);108 Log.trace(Log.l5_DData, "Queue Table created - " + tabName);109 110 // Create Queue property111 AQQueueProperty q_property = new AQQueueProperty();112 q_property.setComment("Compiere Client/Server Communication Queue");113 q_property.setRetentionTime(AQQueueProperty.INFINITE);114 queue = session.createQueue(table, name, q_property);115 Log.trace(Log.l5_DData, "Queue created - " + name);116 }117 catch (Exception e)118 {119 Log.error("Queue.createQueue", e);120 }121 //122 return queue;123 } // createQueue124 125 /**126 * Get Queue "Test"127 *128 protected static AQQueue getQueue(boolean toServer, boolean reset)129 {130 if (reset)131 close(false);132 // Server Queue133 if (toServer && s_queueS == null)134 {135 AQSession session = getSession();136 try137 {138 s_queueS = session.getQueue(CConnection.get().getDbUid(),139 s_queueName+s_toServer);140 }141 catch (AQException e)142 {143 try144 {145 // No Queue - So create it146 s_queueS = createQueue(s_queueName+s_toServer);147 s_queueS.start();148 }149 catch (Exception e2)150 {151 Log.error("Queue.getQueue (create1)", e2);152 }153 }154 }155 // Client Queue156 if (!toServer && s_queueC == null)157 {158 AQSession session = getSession();159 try160 {161 s_queueC = session.getQueue(CConnection.get().getDbUid(),162 s_queueName+s_toClient);163 }164 catch (AQException e)165 {166 try167 {168 // No Queue - So create it169 s_queueC = createQueue(s_queueName+s_toClient);170 s_queueC.start();171 }172 catch (Exception e2)173 {174 Log.error("Queue.getQueue (create2)", e2);175 }176 }177 }178 if (toServer)179 return s_queueS;180 return s_queueC;181 } // getQueue182 183 /**184 * Drop Queue185 *186 private static void dropQueues()187 {188 Log.trace(Log.l3_Util, "Queue.dropQueues");189 //190 String tabNameS = s_queueName + s_toServer + s_table_extension;191 String tabNameC = s_queueName + s_toClient + s_table_extension;192 try193 {194 AQQueueTable tableS = getSession().getQueueTable(CConnection.get().getDbUid(), tabNameS);195 AQQueueTable tableC = getSession().getQueueTable(CConnection.get().getDbUid(), tabNameC);196 tableS.drop(true);197 tableC.drop(true);198 }199 catch (Exception e)200 {201 Log.error("Queue.dropQueues - " + e.getMessage());202 }203 } // dropQueues204 205 /**206 * Close Queues & Session207 *208 public static void close (boolean closeConnection)209 {210 s_queueC = null;211 s_queueS = null;212 s_session = null;213 if (closeConnection && s_connection != null)214 {215 try216 {217 s_connection.close();218 }219 catch (SQLException e)220 {221 Log.error("Queue.close", e);222 }223 s_connection = null;224 }225 } // close226 227 228 /**229 * Send Message230 *231 protected static boolean send (Serializable info, boolean toServer)232 {233 if (info == null)234 return false;235 236 // Serialize info237 byte[] data = null;238 try239 {240 ByteArrayOutputStream baos = new ByteArrayOutputStream();241 ObjectOutputStream oos = new ObjectOutputStream (baos);242 oos.writeObject(info);243 oos.flush();244 oos.close();245 data = baos.toByteArray();246 baos.close();247 }248 catch (IOException ioe)249 {250 Log.error("Queue.send IO - " + ioe.getMessage());251 return false;252 }253 254 try255 {256 // create Message257 AQQueue queue = getQueue(toServer, false);258 AQMessage message = queue.createMessage();259 // populate payload260 AQRawPayload rawPayload = message.getRawPayload();261 rawPayload.setStream(data, data.length);262 263 // Standard enqueue Options264 AQEnqueueOption option = new AQEnqueueOption();265 266 // Enqueue267 queue.enqueue(option, message);268 }269 catch (Exception e)270 {271 Log.error("Queue.send", e);272 return false;273 }274 Log.trace(Log.l4_Data, "Queue.send " + info.getClass()275 + (toServer ? " ToServer" : " ToClient")276 + ", Size=" + data.length);277 return true;278 } // send279 280 /**281 * Receive message282 *283 protected static Serializable receive (boolean fromServer)284 {285 // Get Queue286 AQQueue queue = getQueue(!fromServer, false);287 288 // Get Message289 AQMessage message = null;290 try291 {292 // Set Dequeue Option293 AQDequeueOption option = new AQDequeueOption();294 option.setWaitTime(1); // one second wait295 // option.setDequeueMode(AQDequeueOption.DEQUEUE_REMOVE);296 //297 message = queue.dequeue(option);298 }299 catch (AQOracleSQLException e)300 {301 if (e.getErrorCode() == 25228) // timeout302 return null;303 Log.error("Queue.receive", e);304 return null;305 }306 catch (Exception e)307 {308 Log.error("Queue.receive", e);309 return null;310 }311 312 Serializable info = deserialize (message);313 314 Log.trace(Log.l4_Data, "Queue.receive " + info.getClass()315 + (fromServer ? " FromServer" : " FromClient"));316 return info;317 } // receive318 319 /**320 * De-Serialize321 *322 private static Serializable deserialize (AQMessage message)323 {324 // Deserialize325 Serializable info = null;326 try327 {328 // get Payload329 AQRawPayload raw_payload = message.getRawPayload();330 byte[] data = raw_payload.getBytes();331 //332 ByteArrayInputStream bais = new ByteArrayInputStream(data);333 ObjectInputStream ois = new ObjectInputStream (bais);334 info = (Serializable)ois.readObject();335 ois.close();336 bais.close();337 }338 catch (Exception e)339 {340 Log.error("Queue.deserialize", e);341 return null;342 }343 return info;344 } // deserialize345 346 /**347 * List message348 *349 protected static ArrayList listMessages (boolean fromServer)350 {351 // Get Queue352 AQQueue queue = getQueue(!fromServer, true);353 AQDequeueOption option = new AQDequeueOption();354 try355 {356 Log.trace(Log.l3_Util, "Queue.listMessages - " + queue.getName());357 358 // Set Dequeue Option359 option.setWaitTime(0); // no wait360 option.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE);361 }362 catch (Exception e)363 {364 Log.error("Queue.lsiMessages", e);365 }366 367 ArrayList list = new ArrayList();368 369 // Get Messages370 AQMessage message = null;371 do372 {373 try374 {375 message = queue.dequeue(option);376 Serializable info = deserialize (message);377 list.add(info);378 Log.trace(Log.l4_Data, "> " + info.toString());379 }380 catch (AQOracleSQLException e)381 {382 if (e.getErrorCode() != 25228) // timeout383 Log.error("Queue.receive", e);384 message = null;385 }386 catch (Exception e)387 {388 Log.error("Queue.receive", e);389 }390 } while (message != null);391 return list;392 } // listMessages393 394 395 396 397 public static boolean sendToServer(Serializable info)398 {399 return send (info, true);400 }401 402 public static boolean sendToClient(Serializable info)403 {404 return send (info, false);405 }406 407 public static Serializable receiveFromServer()408 {409 return receive (true);410 }411 412 public static Serializable receiveFromClient()413 {414 return receive (false);415 }416 417 /**************************************************************************418 * Main Test419 *420 public static void main (String[] args)421 {422 Env.initTest(9, true); // run as Client423 424 // dropQueues();425 Timestamp t = new Timestamp(System.currentTimeMillis());426 427 sendToServer ("This is a new test " + t.toString());428 sendToClient (new KeyNamePair (21, "Twenty-one " + t.toString()));429 sendToServer ("This is a second test " + t.toString());430 sendToClient (new KeyNamePair (22, "Twenty-two " + t.toString()));431 listMessages(true);432 listMessages(false);433 System.out.println(receiveFromClient());434 System.out.println(receiveFromServer());435 listMessages(true);436 listMessages(false);437 438 System.out.println("Fini");439 System.exit(0);440 // AEnv.exit(0);441 } // Main442 */443 } // Queue444