1 26 package de.nava.informa.utils; 27 28 import java.util.*; 29 import java.net.UnknownHostException ; 30 31 import org.apache.commons.logging.*; 32 33 import de.nava.informa.core.*; 34 import de.nava.informa.impl.hibernate.*; 35 import de.nava.informa.parsers.FeedParser; 36 37 41 public class PersistChanGrpMgrTask extends Thread { 42 43 private static Log logger = LogFactory.getLog(PersistChanGrpMgrTask.class); 44 45 private PersistChanGrpMgr mgr; 46 private ChannelBuilder builder; 47 private ChannelBuilderIF tempBuilder; 48 private Map channelInfos; 49 private long minChannelUpdateDelay; 50 private volatile boolean running = false; 51 52 58 public PersistChanGrpMgrTask(PersistChanGrpMgr mgr, long minChannelUpdateDelay) { 59 super("PCGrp: " + mgr.getChannelGroup().getTitle()); 60 this.minChannelUpdateDelay = minChannelUpdateDelay; 61 this.mgr = mgr; 62 builder = mgr.getBuilder(); 63 channelInfos = new HashMap(); 64 tempBuilder = new de.nava.informa.impl.basic.ChannelBuilder(); 65 } 66 67 72 public void setMinChannelUpdateDelay(long minChannelUpdateDelay) { 73 this.minChannelUpdateDelay = minChannelUpdateDelay; 74 } 75 76 80 public void run() { 81 running = true; 82 83 try { 84 while (!isInterrupted()) { 86 long startedLoop = System.currentTimeMillis(); 87 88 performUpdates(); 89 90 long leftToSleep = minChannelUpdateDelay - (startedLoop - System.currentTimeMillis()); 92 logger.debug("Going to sleep for " + leftToSleep + " millis"); 93 if (leftToSleep > 0) Thread.sleep(leftToSleep); 94 } 95 } catch (InterruptedException e) { 96 logger.warn("Interrupted exception within Run method"); 99 } catch (Exception ignoredException) { 102 ignoredException.printStackTrace(); 103 105 } finally { 106 running = false; 107 synchronized (this) { 108 notifyAll(); 109 } 110 } 111 } 112 113 118 public boolean isRunning() { 119 return running; 120 } 121 122 127 public void interrupt() { 128 interrupt(false); 129 } 130 131 136 public void interrupt(boolean wait) { 137 super.interrupt(); 138 if (wait && isRunning()) { 139 while (isRunning()) { 140 try { 141 synchronized (this) { 142 wait(1000); 143 } 144 } catch (InterruptedException e) { 145 } 146 } 147 } 148 } 149 150 153 public void performUpdates() { 154 logger.debug("Starting channel updates loop for " + mgr.getChannelGroup().getTitle()); 155 mgr.notifyPolling(true); 156 Iterator iter = mgr.channelIterator(); 157 158 Channel nextChan; 159 while (iter.hasNext()) { 160 nextChan = (Channel) iter.next(); 161 logger.info("processing: " + nextChan); 162 163 165 try { 166 handleChannel(nextChan, getUpdChanInfo(nextChan)); 167 } catch (RuntimeException e) { 168 logger.error("Error during processing: " + nextChan, e); 169 } catch (NoSuchMethodError ignoreNoSuchMethod) { 171 logger 172 .error("NoSuchMethodError exception within Run method. Ignoring." + nextChan, ignoreNoSuchMethod); 173 } 174 175 } 176 177 mgr.notifyPolling(false); 179 mgr.incrPollingCounter(); 180 } 181 182 188 private UpdateChannelInfo getUpdChanInfo(Channel chan) { 189 UpdateChannelInfo info = (UpdateChannelInfo) channelInfos.get(chan.getLocation()); 190 191 if (info == null) { 193 info = new UpdateChannelInfo(mgr.getAcceptNrErrors()); 194 channelInfos.put(chan.getLocation(), info); 195 } 196 return info; 197 } 198 199 205 private void handleChannel(Channel chan, UpdateChannelInfo info) { 206 if (!info.shouldDeactivate()) { 207 if (shouldUpdate(info)) { 208 synchronized (builder) { 209 if (!info.getFormatDetected()) handleChannelHeader(chan, info); 210 handleChannelItems(chan, info); 211 } 212 213 info.setLastUpdatedTimestamp(System.currentTimeMillis()); 214 } 215 } else { 216 logger.info("Not processing channel: " + chan + " because exceeded error threshold."); 218 return; 219 } 220 } 221 222 231 private boolean shouldUpdate(UpdateChannelInfo info) { 232 return System.currentTimeMillis() - info.getLastUpdatedTimestamp() > minChannelUpdateDelay; 233 } 234 235 241 private void handleChannelHeader(Channel chan, UpdateChannelInfo info) { 242 if (!info.getFormatDetected()) { logger.debug("Handling Channel Header. Format not yet detected."); 245 try { 246 builder.beginTransaction(); 247 builder.reload(chan); 248 ChannelFormat format = FormatDetector.getFormat(chan.getLocation()); 249 chan.setFormat(format); 250 info.setFormatDetected(true); 251 chan.setLastUpdated(new Date()); 252 builder.endTransaction(); 253 } catch (UnknownHostException e) { 254 logger.debug("Host not found: " + e.getMessage()); 256 } catch (Exception e) { 257 info.increaseProblemsOccurred(e); 258 String msg = "Exception in handleChannelHeader for : " + chan; 259 logger.fatal(msg + "\n Continue...."); 260 } finally { 261 if (builder.inTransaction()) builder.resetTransaction(); 263 } 264 } 265 } 266 267 275 private void handleChannelItems(Channel chan, UpdateChannelInfo info) { 276 ChannelIF tempChannel = null; 277 int logHowManySearched = 0; 278 int logHowManyAdded = 0; 279 280 285 try { 286 builder.beginTransaction(); 287 builder.reload(chan); 288 293 tempChannel = FeedParser.parse(tempBuilder, chan.getLocation()); 294 InformaUtils.copyChannelProperties(tempChannel, chan); 295 300 chan.setLastUpdated(new Date()); 301 mgr.notifyChannelRetrieved(chan); 302 306 if (!tempChannel.getItems().isEmpty()) { 307 Iterator it = tempChannel.getItems().iterator(); 308 while (it.hasNext()) { 309 logHowManySearched++; 310 de.nava.informa.impl.basic.Item transientItem = (de.nava.informa.impl.basic.Item) it 311 .next(); 312 if (!chan.getItems().contains(transientItem)) { 313 logger.info("Found new item: " + transientItem); 314 logHowManyAdded++; 315 319 ItemIF newItem = builder.createItem(chan, transientItem); 320 mgr.notifyItemAdded((Item) newItem); 321 } 322 } } 324 builder.endTransaction(); 325 } catch (UnknownHostException e) { 326 logger.debug("Host not found: " + e.getMessage()); 328 } catch (Exception e) { 329 info.increaseProblemsOccurred(e); 330 String msg = "Exception in handleChannelItems. # Potential new items = " + logHowManySearched 331 + ", # Items actually added to channel: " + logHowManyAdded + "\n Stored Chan=" 332 + chan + "\n ParsedChan=" + tempChannel; 333 logger.fatal(msg + "\n Continue...."); 334 } finally { 335 if (builder.inTransaction()) builder.resetTransaction(); 337 } 338 } 339 } | Popular Tags |