KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > caucho > jms > jdbc > JdbcMessage


1 /*
2  * Copyright (c) 1998-2006 Caucho Technology -- all rights reserved
3  *
4  * This file is part of Resin(R) Open Source
5  *
6  * Each copy or derived work must preserve the copyright notice and this
7  * notice unmodified.
8  *
9  * Resin Open Source is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * Resin Open Source is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
17  * of NON-INFRINGEMENT. See the GNU General Public License for more
18  * details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with Resin Open Source; if not, write to the
22  *
23  * Free Software Foundation, Inc.
24  * 59 Temple Place, Suite 330
25  * Boston, MA 02111-1307 USA
26  *
27  * @author Scott Ferguson
28  */

29
30 package com.caucho.jms.jdbc;
31
32 import com.caucho.config.ConfigException;
33 import com.caucho.jdbc.JdbcMetaData;
34 import com.caucho.jdbc.OracleMetaData;
35 import com.caucho.jms.JMSExceptionWrapper;
36 import com.caucho.jms.message.BytesMessageImpl;
37 import com.caucho.jms.message.MapMessageImpl;
38 import com.caucho.jms.message.MessageImpl;
39 import com.caucho.jms.message.ObjectMessageImpl;
40 import com.caucho.jms.message.StreamMessageImpl;
41 import com.caucho.jms.message.TextMessageImpl;
42 import com.caucho.jms.selector.Selector;
43 import com.caucho.util.CharBuffer;
44 import com.caucho.util.L10N;
45 import com.caucho.vfs.ByteToChar;
46 import com.caucho.vfs.ContextLoaderObjectInputStream;
47 import com.caucho.vfs.TempStream;
48 import com.caucho.vfs.WriteStream;
49
50 import javax.jms.*;
51 import javax.sql.DataSource JavaDoc;
52 import java.io.EOFException JavaDoc;
53 import java.io.IOException JavaDoc;
54 import java.io.InputStream JavaDoc;
55 import java.io.ObjectInputStream JavaDoc;
56 import java.io.ObjectOutputStream JavaDoc;
57 import java.sql.Connection JavaDoc;
58 import java.sql.PreparedStatement JavaDoc;
59 import java.sql.ResultSet JavaDoc;
60 import java.sql.SQLException JavaDoc;
61 import java.sql.Statement JavaDoc;
62 import java.sql.Types JavaDoc;
63 import java.util.Enumeration JavaDoc;
64 import java.util.logging.Level JavaDoc;
65 import java.util.logging.Logger JavaDoc;
66
67 /**
68  * Represents a JDBC message.
69  */

70 public class JdbcMessage
71 {
72   static final Logger JavaDoc log = Logger.getLogger(JdbcMessage.class.getName());
73   static final L10N L = new L10N(JdbcMessage.class);
74
75   private static final int MESSAGE = 0;
76   private static final int TEXT = 1;
77   private static final int BYTES = 2;
78   private static final int STREAM = 3;
79   private static final int OBJECT = 4;
80   private static final int MAP = 5;
81   
82   private JdbcManager _jdbcManager;
83   private DataSource JavaDoc _dataSource;
84
85   private String JavaDoc _messageTable;
86   private String JavaDoc _messageSequence;
87
88  private boolean _isOracle;
89
90   public JdbcMessage(JdbcManager jdbcManager)
91   {
92     _jdbcManager = jdbcManager;
93   }
94
95   /**
96    * Initializes the JdbcQueue
97    */

98   public void init()
99     throws ConfigException, SQLException JavaDoc
100   {
101     _messageTable = _jdbcManager.getMessageTable();
102     _dataSource = _jdbcManager.getDataSource();
103
104     JdbcMetaData metaData = _jdbcManager.getMetaData();
105
106     _isOracle = metaData instanceof OracleMetaData;
107       
108     String JavaDoc identity = "";
109
110     if (metaData.supportsIdentity())
111       identity = " auto_increment";
112     else
113       _messageSequence = _messageTable + "_cseq";
114
115     Connection JavaDoc conn = _dataSource.getConnection();
116     try {
117       Statement JavaDoc stmt = conn.createStatement();
118       String JavaDoc sql = "SELECT 1 FROM " + _messageTable + " WHERE 1=0";
119
120       try {
121     ResultSet JavaDoc rs = stmt.executeQuery(sql);
122     rs.next();
123     rs.close();
124     stmt.close();
125
126     return;
127       } catch (SQLException JavaDoc e) {
128     log.finest(e.toString());
129       }
130
131       String JavaDoc blob = _jdbcManager.getBlob();
132       String JavaDoc longType = _jdbcManager.getLongType();
133                                
134       log.info(L.l("creating JMS message table {0}", _messageTable));
135       
136       sql = ("CREATE TABLE " + _messageTable + " (" +
137          " m_id " + longType + " PRIMARY KEY" + identity + "," +
138          " queue INTEGER NOT NULL," +
139          " conn VARCHAR(255)," +
140          " consumer " + longType + "," +
141          " delivered INTEGER NOT NULL," +
142          " msg_type INTEGER NOT NULL," +
143          " expire " + longType + " NOT NULL," +
144          " header " + blob + "," +
145          " body " + blob +
146          ")");
147
148       if (_isOracle) {
149     String JavaDoc extent = "";
150     
151     if (_jdbcManager.getTablespace() != null) {
152       extent = " tablespace " + _jdbcManager.getTablespace();
153     }
154
155     // oracle recommends using retention (over pctversion) for performance
156
// Oracle will keep deleted lobs for the retention time before
157
// releasing them (e.g. 900 seconds)
158
sql += (" LOB(header) STORE AS (cache retention" + extent + ")");
159     sql += (" LOB(body) STORE AS (cache retention" + extent + ")");
160       }
161
162       stmt.executeUpdate(sql);
163
164       if (_messageSequence != null) {
165     stmt.executeUpdate(metaData.createSequenceSQL(_messageSequence, 1));
166       }
167     } finally {
168       conn.close();
169     }
170   }
171
172   /**
173    * Sends the message to the queue.
174    */

175   public long send(Message message, int queue, long expireTime)
176     throws SQLException JavaDoc, IOException JavaDoc, JMSException
177   {
178     if (log.isLoggable(Level.FINE))
179       log.fine("jms jdbc queue:" + queue + " send message");
180
181     TempStream header = new TempStream();
182     header.openWrite();
183       
184     WriteStream ws = new WriteStream(header);
185     writeMessageHeader(ws, message);
186     ws.close();
187
188     TempStream body = null;
189
190     int type = MESSAGE;
191
192     if (message instanceof TextMessage) {
193       TextMessage text = (TextMessage) message;
194     
195       type = TEXT;
196     
197       if (text.getText() != null) {
198     body = new TempStream();
199     body.openWrite();
200     
201     ws = new WriteStream(body);
202     ws.setEncoding("UTF-8");
203     ws.print(text.getText());
204     ws.close();
205       }
206     }
207     else if (message instanceof BytesMessage) {
208       BytesMessage bytes = (BytesMessage) message;
209
210       type = BYTES;
211
212       body = writeBytes(bytes);
213     }
214     else if (message instanceof StreamMessage) {
215       StreamMessage stream = (StreamMessage) message;
216
217       type = STREAM;
218
219       body = writeStream(stream);
220     }
221     else if (message instanceof ObjectMessage) {
222       ObjectMessage obj = (ObjectMessage) message;
223
224       type = OBJECT;
225
226       body = writeObject(obj);
227     }
228     else if (message instanceof MapMessage) {
229       MapMessage obj = (MapMessage) message;
230
231       type = MAP;
232
233       body = writeMap(obj);
234     }
235
236     Connection JavaDoc conn = _dataSource.getConnection();
237     try {
238       String JavaDoc sql;
239
240       if (_messageSequence != null) {
241     sql = _jdbcManager.getMetaData().selectSequenceSQL(_messageSequence);
242
243     PreparedStatement JavaDoc pstmt = conn.prepareStatement(sql);;
244
245     long mId = -1;
246     
247     ResultSet JavaDoc rs = pstmt.executeQuery();
248     if (rs.next())
249       mId = rs.getLong(1);
250     else
251       throw new RuntimeException JavaDoc("can't create message");
252     
253     sql = ("INSERT INTO " + _messageTable +
254            "(m_id, queue, msg_type, expire, delivered, header, body) " +
255            "VALUES (?,?,?,?,0,?,?)");
256
257     pstmt = conn.prepareStatement(sql);
258
259     int i = 1;
260     pstmt.setLong(i++, mId);
261     pstmt.setInt(i++, queue);
262     pstmt.setInt(i++, type);
263     pstmt.setLong(i++, expireTime);
264
265     if (header.getLength() > 0)
266       pstmt.setBinaryStream(i++, header.openRead(), header.getLength());
267     else
268       pstmt.setNull(i++, Types.BINARY);
269     
270     if (body != null)
271       pstmt.setBinaryStream(i++, body.openRead(), body.getLength());
272     else
273       pstmt.setString(i++, "");
274
275     pstmt.executeUpdate();
276       }
277       else {
278     sql = ("INSERT INTO " + _messageTable +
279            "(queue, msg_type, expire, delivered, header, body) " +
280            "VALUES (?,?,?,0,?,?)");
281     PreparedStatement JavaDoc pstmt;
282
283     pstmt = conn.prepareStatement(sql);
284
285     int i = 1;
286     pstmt.setInt(i++, queue);
287     pstmt.setInt(i++, type);
288     pstmt.setLong(i++, expireTime);
289     pstmt.setBinaryStream(i++, header.openRead(), header.getLength());
290     
291     if (body != null)
292       pstmt.setBinaryStream(i++, body.openRead(), body.getLength());
293     else
294       pstmt.setString(i++, "");
295
296     pstmt.executeUpdate();
297       }
298
299       return 0;
300     } finally {
301       conn.close();
302     }
303   }
304
305   /**
306    * Receives a message from the queue.
307    */

308   MessageImpl receive(int queue, int session)
309     throws SQLException JavaDoc, IOException JavaDoc, JMSException
310   {
311     long minId = -1;
312     
313     Connection JavaDoc conn = _dataSource.getConnection();
314     try {
315       String JavaDoc sql = ("SELECT m_id, msg_type, delivered, body, header" +
316             " FROM " + _messageTable +
317             " WHERE ?<id AND queue=? AND consumer IS NULL" +
318             " ORDER BY id");
319
320       PreparedStatement JavaDoc selectStmt = conn.prepareStatement(sql);
321
322       
323       sql = ("UPDATE " + _messageTable + " SET consumer=?, delivered=1 " +
324          "WHERE m_id=? AND consumer IS NULL");
325       
326       PreparedStatement JavaDoc updateStmt = conn.prepareStatement(sql);
327
328       long id = -1;
329       while (true) {
330     id = -1;
331     
332     selectStmt.setLong(1, minId);
333     selectStmt.setInt(2, queue);
334
335     MessageImpl msg = null;
336
337     ResultSet JavaDoc rs = selectStmt.executeQuery();
338     while (rs.next()) {
339       id = rs.getLong(1);
340
341       minId = id;
342
343       msg = readMessage(rs);
344     }
345
346     rs.close();
347
348     if (msg == null)
349       return null;
350
351     updateStmt.setInt(1, session);
352     updateStmt.setLong(2, id);
353       
354     int updateCount = updateStmt.executeUpdate();
355     
356     if (updateCount == 1)
357       return msg;
358     else if (log.isLoggable(Level.FINE)) {
359       log.fine("JdbcMessageQueue[" + queue + "] can't update received message " + id + " for session " + session +".");
360     }
361       }
362     } finally {
363       conn.close();
364     }
365   }
366
367   /**
368    * Acknowledges all received messages from the session.
369    */

370   void acknowledge(int session)
371     throws SQLException JavaDoc
372   {
373     Connection JavaDoc conn = _dataSource.getConnection();
374
375     try {
376       String JavaDoc sql = ("DELETE FROM " + _messageTable + " " +
377             "WHERE consumer=?");
378
379       PreparedStatement JavaDoc pstmt;
380       pstmt = conn.prepareStatement(sql);
381
382       pstmt.setInt(1, session);
383
384       pstmt.executeUpdate();
385
386       pstmt.close();
387     } finally {
388       conn.close();
389     }
390   }
391
392   /**
393    * Reads the message from the result stream.
394    */

395   MessageImpl readMessage(ResultSet JavaDoc rs)
396     throws SQLException JavaDoc, IOException JavaDoc, JMSException
397   {
398     int msgType = rs.getInt(2);
399     boolean redelivered = rs.getInt(3) == 1;
400
401     MessageImpl msg;
402     
403     switch (msgType) {
404     case TEXT:
405       {
406     InputStream is = rs.getBinaryStream(4);
407
408     try {
409       msg = readTextMessage(is);
410     } finally {
411       if (is != null)
412         is.close();
413     }
414     break;
415       }
416         
417     case BYTES:
418       {
419     InputStream is = rs.getBinaryStream(4);
420
421     try {
422       msg = readBytesMessage(is);
423     } finally {
424       if (is != null)
425         is.close();
426     }
427     break;
428       }
429         
430     case STREAM:
431       {
432     InputStream is = rs.getBinaryStream(4);
433
434     try {
435       msg = readStreamMessage(is);
436     } finally {
437       if (is != null)
438         is.close();
439     }
440     break;
441       }
442         
443     case OBJECT:
444       {
445     InputStream is = rs.getBinaryStream(4);
446
447     try {
448       msg = readObjectMessage(is);
449     } finally {
450       if (is != null)
451         is.close();
452     }
453     break;
454       }
455         
456     case MAP:
457       {
458     InputStream is = rs.getBinaryStream(4);
459
460     try {
461       msg = readMapMessage(is);
462     } finally {
463       if (is != null)
464         is.close();
465     }
466     break;
467       }
468         
469     case MESSAGE:
470     default:
471       {
472     msg = new MessageImpl();
473     break;
474       }
475     }
476       
477     InputStream is = rs.getBinaryStream(5);
478     
479     if (is != null) {
480       try {
481     readMessageHeader(is, msg);
482       } finally {
483     is.close();
484       }
485     }
486
487     msg.setJMSRedelivered(redelivered);
488
489     return msg;
490   }
491
492   /**
493    * Writes the message header for a Resin message.
494    */

495   private void writeMessageHeader(WriteStream ws, Message msg)
496     throws IOException JavaDoc, JMSException
497   {
498     Enumeration JavaDoc names = msg.getPropertyNames();
499     CharBuffer cb = new CharBuffer();
500
501     while (names.hasMoreElements()) {
502       String JavaDoc name = (String JavaDoc) names.nextElement();
503       writeValue(ws, cb, name);
504       
505       String JavaDoc value = msg.getStringProperty(name);
506       writeValue(ws, cb, value);
507     }
508   }
509
510   /**
511    * Writes a value to the output stream.
512    */

513   private void writeValue(WriteStream ws, CharBuffer cb, Object JavaDoc value)
514     throws IOException JavaDoc
515   {
516     if (value == null)
517       ws.write('N');
518     else {
519       cb.clear();
520       cb.append(value);
521       int length = cb.length();
522       char []buf = cb.getBuffer();
523
524       ws.write('S');
525       ws.write(length >> 24);
526       ws.write(length >> 16);
527       ws.write(length >> 8);
528       ws.write(length);
529
530       for (int i = 0; i < length; i++) {
531     int ch = buf[i];
532
533     ws.write(ch >> 8);
534     ws.write(ch);
535       }
536     }
537   }
538
539   /**
540    * Writes the bytes message.
541    */

542   private TempStream writeBytes(BytesMessage bytes)
543     throws IOException JavaDoc, JMSException
544   {
545     TempStream body = new TempStream();
546     body.openWrite();
547     
548     WriteStream ws = new WriteStream(body);
549     
550     int data;
551     //bytes.reset();
552

553     while ((data = bytes.readUnsignedByte()) >= 0) {
554       ws.write(data);
555     }
556     
557     ws.close();
558
559     return body;
560   }
561
562   /**
563    * Writes the stream message.
564    */

565   private TempStream writeStream(StreamMessage stream)
566     throws IOException JavaDoc, JMSException
567   {
568     TempStream body = new TempStream();
569     body.openWrite();
570     
571     WriteStream ws = new WriteStream(body);
572     ObjectOutputStream JavaDoc out = new ObjectOutputStream JavaDoc(ws);
573
574     try {
575       while (true) {
576     Object JavaDoc data = stream.readObject();
577
578     out.writeObject(data);
579       }
580     } catch (MessageEOFException e) {
581     }
582
583     out.close();
584     ws.close();
585
586     return body;
587   }
588
589   /**
590    * Writes the object message.
591    */

592   private TempStream writeObject(ObjectMessage obj)
593     throws IOException JavaDoc, JMSException
594   {
595     TempStream body = new TempStream();
596     body.openWrite();
597     
598     WriteStream ws = new WriteStream(body);
599     ObjectOutputStream JavaDoc out = new ObjectOutputStream JavaDoc(ws);
600
601     out.writeObject(obj.getObject());
602
603     out.close();
604     ws.close();
605
606     return body;
607   }
608
609   /**
610    * Writes the map message.
611    */

612   private TempStream writeMap(MapMessage map)
613     throws IOException JavaDoc, JMSException
614   {
615     TempStream body = new TempStream();
616     body.openWrite();
617     
618     WriteStream ws = new WriteStream(body);
619     ObjectOutputStream JavaDoc out = new ObjectOutputStream JavaDoc(ws);
620
621     try {
622       Enumeration JavaDoc e = map.getMapNames();
623       while (e.hasMoreElements()) {
624     String JavaDoc name = (String JavaDoc) e.nextElement();
625     out.writeUTF(name);
626     
627     Object JavaDoc data = map.getObject(name);
628     out.writeObject(data);
629       }
630     } catch (MessageEOFException e) {
631     }
632
633     out.close();
634     ws.close();
635
636     return body;
637   }
638
639   /**
640    * Writes the message header for a Resin message.
641    */

642   private void readMessageHeader(InputStream is, Message msg)
643     throws IOException JavaDoc, JMSException
644   {
645     CharBuffer cb = new CharBuffer();
646
647     int type;
648
649     while ((type = is.read()) > 0) {
650       String JavaDoc name = (String JavaDoc) readValue(is, type, cb);
651       Object JavaDoc value = readValue(is, is.read(), cb);
652
653       msg.setObjectProperty(name, value);
654     }
655   }
656
657   /**
658    * Writes the message header for a Resin message.
659    */

660   private TextMessageImpl readTextMessage(InputStream is)
661     throws IOException JavaDoc, JMSException
662   {
663     TextMessageImpl text = new TextMessageImpl();
664
665     if (is == null)
666       return text;
667
668     ByteToChar byteToChar = ByteToChar.create();
669
670     int ch;
671
672     byteToChar.setEncoding("UTF-8");
673     while ((ch = is.read()) >= 0) {
674       byteToChar.addByte(ch);
675     }
676
677     text.setText(byteToChar.getConvertedString());
678     
679     return text;
680   }
681
682   /**
683    * Reads a bytes message.
684    */

685   private BytesMessageImpl readBytesMessage(InputStream is)
686     throws IOException JavaDoc, JMSException
687   {
688     BytesMessageImpl bytes = new BytesMessageImpl();
689
690     if (is == null) {
691       bytes.reset();
692       
693       return bytes;
694     }
695
696     int data;
697
698     while ((data = is.read()) >= 0) {
699       bytes.writeByte((byte) data);
700     }
701
702     bytes.reset();
703     
704     return bytes;
705   }
706
707   /**
708    * Reads a stream message.
709    */

710   private StreamMessageImpl readStreamMessage(InputStream is)
711     throws IOException JavaDoc, JMSException
712   {
713     StreamMessageImpl stream = new StreamMessageImpl();
714
715     if (is == null)
716       return stream;
717
718     ObjectInputStream in = new ContextLoaderObjectInputStream(is);
719
720     try {
721       while (true) {
722     Object JavaDoc obj = in.readObject();
723
724     stream.writeObject(obj);
725       }
726     } catch (EOFException JavaDoc e) {
727     } catch (Exception JavaDoc e) {
728       throw new JMSExceptionWrapper(e);
729     }
730
731     in.close();
732
733     stream.reset();
734     
735     return stream;
736   }
737
738   /**
739    * Reads a map message.
740    */

741   private MapMessageImpl readMapMessage(InputStream is)
742     throws IOException JavaDoc, JMSException
743   {
744     MapMessageImpl map = new MapMessageImpl();
745
746     if (is == null)
747       return map;
748
749     ObjectInputStream in = new ContextLoaderObjectInputStream(is);
750
751     try {
752       while (true) {
753     String JavaDoc name = in.readUTF();
754     Object JavaDoc obj = in.readObject();
755
756     map.setObject(name, obj);
757       }
758     } catch (EOFException JavaDoc e) {
759     } catch (Exception JavaDoc e) {
760       throw new JMSExceptionWrapper(e);
761     }
762
763     in.close();
764
765     return map;
766   }
767
768   /**
769    * Reads an object message.
770    */

771   private ObjectMessageImpl readObjectMessage(InputStream is)
772     throws IOException JavaDoc, JMSException
773   {
774     ObjectMessageImpl msg = new ObjectMessageImpl();
775
776     if (is == null)
777       return msg;
778
779     ObjectInputStream in = new ContextLoaderObjectInputStream(is);
780
781     try {
782       Object JavaDoc obj = in.readObject();
783       msg.setObject((java.io.Serializable JavaDoc) obj);
784     } catch (IOException JavaDoc e) {
785       throw e;
786     } catch (Exception JavaDoc e) {
787       throw new JMSExceptionWrapper(e);
788     }
789
790     in.close();
791     
792     return msg;
793   }
794
795   /**
796    * Writes a value to the output stream.
797    */

798   private Object JavaDoc readValue(InputStream is, int type, CharBuffer cb)
799     throws IOException JavaDoc
800   {
801     switch (type) {
802     case 'N':
803       return null;
804     case 'S':
805       {
806     cb.clear();
807     int length = readInt(is);
808
809     for (int i = 0; i < length; i++) {
810       char ch = (char) ((is.read() << 8) + is.read());
811
812       cb.append(ch);
813     }
814
815     return cb.toString();
816       }
817     default:
818       throw new IOException JavaDoc(L.l("unknown header type"));
819     }
820   }
821
822   /**
823    * Reads an integer value.
824    */

825   private int readInt(InputStream is)
826     throws IOException JavaDoc
827   {
828     return ((is.read() << 24) +
829         (is.read() << 16) +
830         (is.read() << 8) +
831         (is.read()));
832   }
833
834   /**
835    * Removes the first message matching the selector.
836    */

837   private boolean hasMessage(Selector selector)
838     throws JMSException
839   {
840     return false;
841   }
842
843   /**
844    * Returns a printable view of the queue.
845    */

846   public String JavaDoc toString()
847   {
848     return "JdbcMessage[" + _messageTable + "]";
849   }
850 }
851
852
Popular Tags