1 23 package com.scalagent.joram.mom.dest.ftp; 24 25 import java.io.IOException ; 26 import java.util.Enumeration ; 27 import java.util.Hashtable ; 28 import java.util.Properties ; 29 import java.util.Vector ; 30 31 import org.objectweb.joram.mom.dest.DeadMQueueImpl; 32 import org.objectweb.joram.mom.dest.QueueImpl; 33 import org.objectweb.joram.mom.notifications.ClientMessages; 34 import org.objectweb.joram.shared.messages.Message; 35 import org.objectweb.util.monolog.api.BasicLevel; 36 import org.objectweb.util.monolog.api.Logger; 37 38 import fr.dyade.aaa.agent.AgentId; 39 import fr.dyade.aaa.agent.Debug; 40 41 45 public class FtpQueueImpl extends QueueImpl { 46 47 private static final long serialVersionUID = 2742569589343325791L; 48 49 public static Logger logger = 50 Debug.getLogger("com.scalagent.joram.mom.dest.ftp.FtpQueueImpl"); 51 52 private String user = "anonymous"; 53 private String pass = "no@no.no"; 54 private String path = null; 55 private transient TransferItf transfer = null; 56 private ClientMessages currentNot = null; 57 private AgentId dmq = null; 58 private int clientContext; 59 private int requestId; 60 61 public String ftpImplName = "com.scalagent.joram.mom.dest.ftp.TransferImplRef"; 62 63 private Hashtable transferTable; 64 71 public FtpQueueImpl(AgentId destId, 72 AgentId adminId, 73 Properties prop) { 74 super(destId, adminId, prop); 75 setProperties(prop); 76 77 transferTable = new Hashtable (); 78 this.path = path; 79 if (user != null) 80 this.user = user; 81 if (pass != null) 82 this.pass = pass; 83 try { 84 if ((ftpImplName != null) && (ftpImplName.length() > 0)) 85 transfer = (TransferItf) Class.forName(ftpImplName).newInstance(); 86 } catch (Exception exc) { 87 transfer = null; 88 logger.log(BasicLevel.ERROR, 89 "FtpQueueImpl : transfer = null" ,exc); 90 } 91 if (logger.isLoggable(BasicLevel.DEBUG)) 92 logger.log(BasicLevel.DEBUG, "--- " + this + 93 " transfer = "+ transfer); 94 } 95 96 protected void setProperties(Properties prop) { 97 user = prop.getProperty("user", user); 98 pass = prop.getProperty("pass", pass); 99 path = prop.getProperty("path", path); 100 ftpImplName = prop.getProperty("ftpImpl", ftpImplName); 101 } 102 103 public String toString() { 104 return "FtpQueueImpl:" + destId.toString(); 105 } 106 107 public void ftpNot(FtpNot not) { 108 if (logger.isLoggable(BasicLevel.DEBUG)) 109 logger.log(BasicLevel.DEBUG, "--- " + this + 110 " ftpNot(" + not + ")\n" + 111 "transferTable = " + transferTable); 112 Message msg = (Message) ((Vector ) not.getMessages()).get(0); 113 storeMessage(new org.objectweb.joram.mom.messages.Message(msg)); 114 deliverMessages(0); 115 transferTable.remove(new FtpMessage(msg).getIdentifier()); 116 117 if (logger.isLoggable(BasicLevel.DEBUG)) 118 logger.log(BasicLevel.DEBUG, "--- " + this + 119 " doProcess : transferTable = " + transferTable); 120 } 121 122 public ClientMessages preProcess(AgentId from, ClientMessages not) { 123 for (Enumeration msgs = not.getMessages().elements(); 124 msgs.hasMoreElements();) { 125 Message msg = (Message) msgs.nextElement(); 126 if (isFtpMsg(msg)) { 127 doProcessFtp(not,msg); 128 not.getMessages().remove(msg); 129 } 130 } 131 if (not.getMessages().size() > 0) { 132 return not; 133 } 134 return null; 135 } 136 137 protected boolean isFtpMsg(Message message) { 138 FtpMessage msg = new FtpMessage(message); 139 return (msg.propertyExists(SharedObj.url) && 140 msg.propertyExists(SharedObj.crc) && 141 msg.propertyExists(SharedObj.ack)); 142 } 143 144 protected void doProcessFtp(ClientMessages not, 145 Message msg) { 146 147 if (logger.isLoggable(BasicLevel.DEBUG)) 148 logger.log(BasicLevel.DEBUG, "--- " + this + 149 " doProcessFtp(" + not + "," + msg + ")"); 150 151 if (transfer != null) { 152 dmq = not.getDMQId(); 153 if (dmq == null && super.dmqId != null) 154 dmq = super.dmqId; 155 else if ( dmq == null) 156 dmq = DeadMQueueImpl.getId(); 157 158 clientContext = not.getClientContext(); 159 requestId = not.getRequestId(); 160 161 FtpMessage ftpMsg = new FtpMessage(msg); 162 transferTable.put(ftpMsg.getIdentifier(),ftpMsg); 163 FtpThread t = new FtpThread(transfer, 164 (FtpMessage) ftpMsg.clone(), 165 destId, 166 dmq, 167 clientContext, 168 requestId, 169 user, 170 pass, 171 path); 172 t.start(); 173 } else { 174 ClientMessages deadM = 175 new ClientMessages(not.getClientContext(), 176 not.getRequestId()); 177 178 msg.notWriteable = true; 179 deadM.addMessage(msg); 180 sendToDMQ(deadM,not.getDMQId()); 181 } 182 } 183 184 private void readObject(java.io.ObjectInputStream in) 185 throws IOException , ClassNotFoundException { 186 in.defaultReadObject(); 187 188 try { 189 if ((ftpImplName != null) && (ftpImplName.length() > 0)) 190 transfer = (TransferItf) Class.forName(ftpImplName).newInstance(); 191 } catch (Exception exc) { 192 transfer = null; 193 logger.log(BasicLevel.ERROR, 194 "readObject : transfer = null" ,exc); 195 } 196 if (logger.isLoggable(BasicLevel.DEBUG)) 197 logger.log(BasicLevel.DEBUG, "--- " + this + 198 " readObject transfer = "+ transfer); 199 200 if (transfer != null) { 201 if (logger.isLoggable(BasicLevel.DEBUG)) 202 logger.log(BasicLevel.DEBUG, "--- " + this + 203 " readObject : transferTable = " + transferTable); 204 205 for (Enumeration e = transferTable.elements(); e.hasMoreElements(); ) { 206 Message msg = (Message) e.nextElement(); 207 FtpMessage ftpMsg = new FtpMessage(msg); 208 FtpThread t = new FtpThread(transfer, 209 (FtpMessage) ftpMsg.clone(), 210 destId, 211 dmq, 212 clientContext, 213 requestId, 214 user, 215 pass, 216 path); 217 t.start(); 218 } 219 } 220 } 221 } 222 | Popular Tags |