KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > com > scalagent > joram > mom > dest > ftp > FtpQueueImpl


1 /*
2  * JORAM: Java(TM) Open Reliable Asynchronous Messaging
3  * Copyright (C) 2001 - 2007 ScalAgent Distributed Technologies
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
18  * USA.
19  *
20  * Initial developer(s): Nicolas Tachker (ScalAgent)
21  * Contributor(s):
22  */

23 package com.scalagent.joram.mom.dest.ftp;
24
25 import java.io.IOException JavaDoc;
26 import java.util.Enumeration JavaDoc;
27 import java.util.Hashtable JavaDoc;
28 import java.util.Properties JavaDoc;
29 import java.util.Vector JavaDoc;
30
31 import org.objectweb.joram.mom.dest.DeadMQueueImpl;
32 import org.objectweb.joram.mom.dest.QueueImpl;
33 import org.objectweb.joram.mom.notifications.ClientMessages;
34 import org.objectweb.joram.shared.messages.Message;
35 import org.objectweb.util.monolog.api.BasicLevel;
36 import org.objectweb.util.monolog.api.Logger;
37
38 import fr.dyade.aaa.agent.AgentId;
39 import fr.dyade.aaa.agent.Debug;
40
41 /**
42  * The <code>FtpQueueImpl</code> class implements the MOM queue behaviour,
43  * basically storing messages and delivering them upon clients requests.
44  */

45 public class FtpQueueImpl extends QueueImpl {
46
47   private static final long serialVersionUID = 2742569589343325791L;
48
49   public static Logger logger =
50     Debug.getLogger("com.scalagent.joram.mom.dest.ftp.FtpQueueImpl");
51   
52   private String JavaDoc user = "anonymous";
53   private String JavaDoc pass = "no@no.no";
54   private String JavaDoc path = null;
55   private transient TransferItf transfer = null;
56   private ClientMessages currentNot = null;
57   private AgentId dmq = null;
58   private int clientContext;
59   private int requestId;
60
61   public String JavaDoc ftpImplName = "com.scalagent.joram.mom.dest.ftp.TransferImplRef";
62
63   private Hashtable JavaDoc transferTable;
64   /**
65    * Constructs a <code>FtpQueueImpl</code> instance.
66    *
67    * @param destId Identifier of the agent hosting the queue.
68    * @param adminId Identifier of the administrator of the queue.
69    * @param prop The initial set of properties.
70    */

71   public FtpQueueImpl(AgentId destId,
72                       AgentId adminId,
73                       Properties JavaDoc prop) {
74     super(destId, adminId, prop);
75     setProperties(prop);
76
77     transferTable = new Hashtable JavaDoc();
78     this.path = path;
79     if (user != null)
80       this.user = user;
81     if (pass != null)
82       this.pass = pass;
83     try {
84       if ((ftpImplName != null) && (ftpImplName.length() > 0))
85         transfer = (TransferItf) Class.forName(ftpImplName).newInstance();
86     } catch (Exception JavaDoc exc) {
87       transfer = null;
88       logger.log(BasicLevel.ERROR,
89                                     "FtpQueueImpl : transfer = null" ,exc);
90     }
91     if (logger.isLoggable(BasicLevel.DEBUG))
92       logger.log(BasicLevel.DEBUG, "--- " + this +
93                                     " transfer = "+ transfer);
94   }
95
96   protected void setProperties(Properties JavaDoc prop) {
97     user = prop.getProperty("user", user);
98     pass = prop.getProperty("pass", pass);
99     path = prop.getProperty("path", path);
100     ftpImplName = prop.getProperty("ftpImpl", ftpImplName);
101   }
102
103   public String JavaDoc toString() {
104     return "FtpQueueImpl:" + destId.toString();
105   }
106
107   public void ftpNot(FtpNot not) {
108     if (logger.isLoggable(BasicLevel.DEBUG))
109       logger.log(BasicLevel.DEBUG, "--- " + this +
110                                     " ftpNot(" + not + ")\n" +
111                                     "transferTable = " + transferTable);
112     Message msg = (Message) ((Vector JavaDoc) not.getMessages()).get(0);
113     storeMessage(new org.objectweb.joram.mom.messages.Message(msg));
114     deliverMessages(0);
115     transferTable.remove(new FtpMessage(msg).getIdentifier());
116
117     if (logger.isLoggable(BasicLevel.DEBUG))
118       logger.log(BasicLevel.DEBUG, "--- " + this +
119                                     " doProcess : transferTable = " + transferTable);
120   }
121
122   public ClientMessages preProcess(AgentId from, ClientMessages not) {
123     for (Enumeration JavaDoc msgs = not.getMessages().elements();
124          msgs.hasMoreElements();) {
125       Message msg = (Message) msgs.nextElement();
126       if (isFtpMsg(msg)) {
127         doProcessFtp(not,msg);
128         not.getMessages().remove(msg);
129       }
130     }
131     if (not.getMessages().size() > 0) {
132       return not;
133     }
134     return null;
135   }
136
137   protected boolean isFtpMsg(Message message) {
138     FtpMessage msg = new FtpMessage(message);
139     return (msg.propertyExists(SharedObj.url) &&
140             msg.propertyExists(SharedObj.crc) &&
141             msg.propertyExists(SharedObj.ack));
142   }
143
144   protected void doProcessFtp(ClientMessages not,
145                               Message msg) {
146
147     if (logger.isLoggable(BasicLevel.DEBUG))
148       logger.log(BasicLevel.DEBUG, "--- " + this +
149                                     " doProcessFtp(" + not + "," + msg + ")");
150
151     if (transfer != null) {
152       dmq = not.getDMQId();
153       if (dmq == null && super.dmqId != null)
154         dmq = super.dmqId;
155       else if ( dmq == null)
156         dmq = DeadMQueueImpl.getId();
157
158       clientContext = not.getClientContext();
159       requestId = not.getRequestId();
160
161       FtpMessage ftpMsg = new FtpMessage(msg);
162       transferTable.put(ftpMsg.getIdentifier(),ftpMsg);
163       FtpThread t = new FtpThread(transfer,
164                                   (FtpMessage) ftpMsg.clone(),
165                                   destId,
166                                   dmq,
167                                   clientContext,
168                                   requestId,
169                                   user,
170                                   pass,
171                                   path);
172       t.start();
173     } else {
174       ClientMessages deadM =
175         new ClientMessages(not.getClientContext(),
176                            not.getRequestId());
177       
178       msg.notWriteable = true;
179       deadM.addMessage(msg);
180       sendToDMQ(deadM,not.getDMQId());
181     }
182   }
183
184   private void readObject(java.io.ObjectInputStream JavaDoc in)
185     throws IOException JavaDoc, ClassNotFoundException JavaDoc {
186     in.defaultReadObject();
187
188     try {
189       if ((ftpImplName != null) && (ftpImplName.length() > 0))
190         transfer = (TransferItf) Class.forName(ftpImplName).newInstance();
191     } catch (Exception JavaDoc exc) {
192       transfer = null;
193       logger.log(BasicLevel.ERROR,
194                                     "readObject : transfer = null" ,exc);
195     }
196     if (logger.isLoggable(BasicLevel.DEBUG))
197       logger.log(BasicLevel.DEBUG, "--- " + this +
198                                     " readObject transfer = "+ transfer);
199
200     if (transfer != null) {
201       if (logger.isLoggable(BasicLevel.DEBUG))
202         logger.log(BasicLevel.DEBUG, "--- " + this +
203                                       " readObject : transferTable = " + transferTable);
204
205       for (Enumeration JavaDoc e = transferTable.elements(); e.hasMoreElements(); ) {
206         Message msg = (Message) e.nextElement();
207         FtpMessage ftpMsg = new FtpMessage(msg);
208         FtpThread t = new FtpThread(transfer,
209                                     (FtpMessage) ftpMsg.clone(),
210                                     destId,
211                                     dmq,
212                                     clientContext,
213                                     requestId,
214                                     user,
215                                     pass,
216                                     path);
217         t.start();
218       }
219     }
220   }
221 }
222
Popular Tags