KickJava   Java API By Example, From Geeks To Geeks.

Java > Open Source Codes > de > nava > informa > utils > PersistChanGrpMgrTask


1 // Informa -- RSS Library for Java
2
//Copyright (c) 2002 by Niko Schmuck
3
//
4
//Niko Schmuck
5
//http://sourceforge.net/projects/informa
6
//mailto:niko_schmuck@users.sourceforge.net
7
//
8
//This library is free software.
9
//
10
//You may redistribute it and/or modify it under the terms of the GNU
11
//Lesser General Public License as published by the Free Software Foundation.
12
//
13
//Version 2.1 of the license should be included with this distribution in
14
//the file LICENSE. If the license is not included with this distribution,
15
//you may find a copy at the FSF web site at 'www.gnu.org' or 'www.fsf.org',
16
//or you may write to the Free Software Foundation, 675 Mass Ave, Cambridge,
17
//MA 02139 USA.
18
//
19
//This library is distributed in the hope that it will be useful,
20
//but WITHOUT ANY WARRANTY; without even the implied waranty of
21
//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22
//Lesser General Public License for more details.
23
//
24
//$Id: PersistChanGrpMgrTask.java,v 1.18 2004/07/29 12:23:59 pitosalas Exp $
25

26 package de.nava.informa.utils;
27
28 import java.util.*;
29 import java.net.UnknownHostException JavaDoc;
30
31 import org.apache.commons.logging.*;
32
33 import de.nava.informa.core.*;
34 import de.nava.informa.impl.hibernate.*;
35 import de.nava.informa.parsers.FeedParser;
36
37 /**
38  * PersistChanGrpMgrTask - description...
39  *
40  */

41 public class PersistChanGrpMgrTask extends Thread JavaDoc {
42
43   private static Log logger = LogFactory.getLog(PersistChanGrpMgrTask.class);
44
45   private PersistChanGrpMgr mgr;
46   private ChannelBuilder builder;
47   private ChannelBuilderIF tempBuilder;
48   private Map channelInfos;
49   private long minChannelUpdateDelay;
50   private volatile boolean running = false;
51
52   /**
53    * Construct and setup context of the PersistChanGrpMgr
54    *
55    * @param mgr
56    * @param minChannelUpdateDelay minimum number of millis between channel updates.
57    */

58   public PersistChanGrpMgrTask(PersistChanGrpMgr mgr, long minChannelUpdateDelay) {
59     super("PCGrp: " + mgr.getChannelGroup().getTitle());
60     this.minChannelUpdateDelay = minChannelUpdateDelay;
61     this.mgr = mgr;
62     builder = mgr.getBuilder();
63     channelInfos = new HashMap();
64     tempBuilder = new de.nava.informa.impl.basic.ChannelBuilder();
65   }
66
67   /**
68    * Minimum number of milliseconds between updates of channel.
69    *
70    * @param minChannelUpdateDelay minimum pause between updates in milliseconds.
71    */

72   public void setMinChannelUpdateDelay(long minChannelUpdateDelay) {
73     this.minChannelUpdateDelay = minChannelUpdateDelay;
74   }
75
76   /**
77    * run - Called each iteration to process all the Channels in this Group. This will skip inactive
78    * channels. -
79    */

80   public void run() {
81     running = true;
82
83     try {
84       // We do job and sleep until someone interupts us.
85
while (!isInterrupted()) {
86         long startedLoop = System.currentTimeMillis();
87
88         performUpdates();
89
90         // Calculate time left to sleep beween updates
91
long leftToSleep = minChannelUpdateDelay - (startedLoop - System.currentTimeMillis());
92         logger.debug("Going to sleep for " + leftToSleep + " millis");
93         if (leftToSleep > 0) Thread.sleep(leftToSleep);
94       }
95     } catch (InterruptedException JavaDoc e) {
96       // Note that the catch looks like it just continues, but at the same time isInterrupted() goes
97
// to true and ends the
98
logger.warn("Interrupted exception within Run method");
99     } catch (Exception JavaDoc ignoredException) // Ignore all Exceptions (assuming that they did their own
100
// cleanup and we want to keep on polling.
101
{
102       ignoredException.printStackTrace();
103       // and continue
104

105     } finally {
106       running = false;
107       synchronized (this) {
108         notifyAll();
109       }
110     }
111   }
112
113   /**
114    * Returns TRUE if current thread is running.
115    *
116    * @return TRUE if running.
117    */

118   public boolean isRunning() {
119     return running;
120   }
121
122   /**
123    * Interrupt the thread and return.
124    *
125    * @see java.lang.Thread#interrupt()
126    */

127   public void interrupt() {
128     interrupt(false);
129   }
130
131   /**
132    * Interrupts execution of task.
133    *
134    * @param wait TRUE to wait for finish of task.
135    */

136   public void interrupt(boolean wait) {
137     super.interrupt();
138     if (wait && isRunning()) {
139       while (isRunning()) {
140         try {
141           synchronized (this) {
142             wait(1000);
143           }
144         } catch (InterruptedException JavaDoc e) {
145         }
146       }
147     }
148   }
149
150   /**
151    * Perform single update cycle for current group.
152    */

153   public void performUpdates() {
154     logger.debug("Starting channel updates loop for " + mgr.getChannelGroup().getTitle());
155     mgr.notifyPolling(true);
156     Iterator iter = mgr.channelIterator();
157
158     Channel nextChan;
159     while (iter.hasNext()) {
160       nextChan = (Channel) iter.next();
161       logger.info("processing: " + nextChan);
162       
163 // Catch all Exceptions coming out of handleChannel and continue iterating to the next one.
164

165       try {
166         handleChannel(nextChan, getUpdChanInfo(nextChan));
167       } catch (RuntimeException JavaDoc e) {
168         logger.error("Error during processing: " + nextChan, e);
169       } catch (NoSuchMethodError JavaDoc ignoreNoSuchMethod) // Ignore and continue
170
{
171         logger
172             .error("NoSuchMethodError exception within Run method. Ignoring." + nextChan, ignoreNoSuchMethod);
173       }
174
175     }
176
177     // Notify everyone that polling of group finished.
178
mgr.notifyPolling(false);
179     mgr.incrPollingCounter();
180   }
181
182   /**
183    * Return (and create if necessary) an UpdateChannelInfo object, which is a parallel object which
184    * we use here to keep track of information about a channel.
185    *
186    * @param chan - Corresponding Channel.
187    */

188   private UpdateChannelInfo getUpdChanInfo(Channel chan) {
189     UpdateChannelInfo info = (UpdateChannelInfo) channelInfos.get(chan.getLocation());
190
191     if (info == null) // Create a new UpdateChannelInfo object and add it to the Map.
192
{
193       info = new UpdateChannelInfo(mgr.getAcceptNrErrors());
194       channelInfos.put(chan.getLocation(), info);
195     }
196     return info;
197   }
198
199   /**
200    * Process the Channel information.
201    *
202    * @param chan - Channel to process
203    * @param info - UpdateChannelInfo - additional Channel Info object
204    */

205   private void handleChannel(Channel chan, UpdateChannelInfo info) {
206     if (!info.shouldDeactivate()) {
207       if (shouldUpdate(info)) {
208         synchronized (builder) {
209           if (!info.getFormatDetected()) handleChannelHeader(chan, info);
210           handleChannelItems(chan, info);
211         }
212
213         info.setLastUpdatedTimestamp(System.currentTimeMillis());
214       }
215     } else {
216       // Returns true if more errors happened than threshold.
217
logger.info("Not processing channel: " + chan + " because exceeded error threshold.");
218       return;
219     }
220   }
221
222   /**
223    * Returns TRUE if the cannel represented by the <code>info</code> should be updated. Decision
224    * is basing on the fact of last update. If there's not enough time passed since then we don't
225    * need to update this channel.
226    *
227    * @param info info object of the channel.
228    *
229    * @return result of the check.
230    */

231   private boolean shouldUpdate(UpdateChannelInfo info) {
232     return System.currentTimeMillis() - info.getLastUpdatedTimestamp() > minChannelUpdateDelay;
233   }
234
235   /**
236    * handleChannelHeader -
237    *
238    * @param chan
239    * @param info -
240    */

241   private void handleChannelHeader(Channel chan, UpdateChannelInfo info) {
242     if (!info.getFormatDetected()) { // If format has been detected then we've seen this Channel
243
// already
244
logger.debug("Handling Channel Header. Format not yet detected.");
245       try {
246         builder.beginTransaction();
247         builder.reload(chan);
248         ChannelFormat format = FormatDetector.getFormat(chan.getLocation());
249         chan.setFormat(format);
250         info.setFormatDetected(true);
251         chan.setLastUpdated(new Date());
252         builder.endTransaction();
253       } catch (UnknownHostException JavaDoc e) {
254         // Normal situation when user is offline
255
logger.debug("Host not found: " + e.getMessage());
256       } catch (Exception JavaDoc e) {
257         info.increaseProblemsOccurred(e);
258         String JavaDoc msg = "Exception in handleChannelHeader for : " + chan;
259         logger.fatal(msg + "\n Continue....");
260       } finally {
261         // If there was an exception we still will be in transaction.
262
if (builder.inTransaction()) builder.resetTransaction();
263       }
264     }
265   }
266
267   /**
268    * Process items in the newly parsed Channel. If they are new (i.e. not yet persisted) then add
269    * them to the Channel. Note the logXXX variables were put in to do better error reporting in the
270    * event of an Exception.
271    *
272    * @param chan
273    * @param info -
274    */

275   private void handleChannelItems(Channel chan, UpdateChannelInfo info) {
276     ChannelIF tempChannel = null;
277     int logHowManySearched = 0;
278     int logHowManyAdded = 0;
279
280     // TODO: [Aleksey Gureev] I don't see locking of builder here. Locking of the whole peice will
281
// be very
282
// great resource consumption. It's necessary to rework whole method to lock builder for a
283
// minimal time.
284

285     try {
286       builder.beginTransaction();
287       builder.reload(chan);
288       /*
289        * We will now parse the new channel's information into a *memory based* temporary channel. We
290        * will then see which items that we received from the feed are already present and add the
291        * new ones.
292        */

293       tempChannel = FeedParser.parse(tempBuilder, chan.getLocation());
294       InformaUtils.copyChannelProperties(tempChannel, chan);
295       /*
296        * Tricky: this channel might have been loaded into memory by Hibernate in a preceding
297        * Hibernate Session. We need to make it available in this session so it will be written back
298        * to disk when the transaction is committed.
299        */

300       chan.setLastUpdated(new Date());
301       mgr.notifyChannelRetrieved(chan);
302       /*
303        * Compare with the existing items, and only add new ones. In the future this is where we
304        * would put code to diff an item to see how blog author has edited a certain item over time.
305        */

306       if (!tempChannel.getItems().isEmpty()) {
307         Iterator it = tempChannel.getItems().iterator();
308         while (it.hasNext()) {
309           logHowManySearched++;
310           de.nava.informa.impl.basic.Item transientItem = (de.nava.informa.impl.basic.Item) it
311               .next();
312           if (!chan.getItems().contains(transientItem)) {
313             logger.info("Found new item: " + transientItem);
314             logHowManyAdded++;
315             /*
316              * A persistent item is created, using all the state from the memory based item produced
317              * by parser.
318              */

319             ItemIF newItem = builder.createItem(chan, transientItem);
320             mgr.notifyItemAdded((Item) newItem);
321           }
322         } // while it.hasNext()
323
}
324       builder.endTransaction();
325     } catch (UnknownHostException JavaDoc e) {
326       // Normal situation when user is offline
327
logger.debug("Host not found: " + e.getMessage());
328     } catch (Exception JavaDoc e) {
329       info.increaseProblemsOccurred(e);
330       String JavaDoc msg = "Exception in handleChannelItems. # Potential new items = " + logHowManySearched
331           + ", # Items actually added to channel: " + logHowManyAdded + "\n Stored Chan="
332           + chan + "\n ParsedChan=" + tempChannel;
333       logger.fatal(msg + "\n Continue....");
334     } finally {
335       // If there was an exception we still will be in transaction.
336
if (builder.inTransaction()) builder.resetTransaction();
337     }
338   }
339 }
Popular Tags