1 package org.sapia.taskman; 2 3 import org.sapia.taskman.transaction.Transaction; 4 import org.sapia.taskman.transaction.TransactionListener; 5 6 import java.io.Serializable ; 7 import java.util.ArrayList ; 8 import java.util.List ; 9 10 35 public class TaskmanTransaction implements Transaction, Serializable { 36 37 static final long serialVersionUID = 1L; 38 39 private List _tasks = new ArrayList (); 40 TaskManager _taskMan; 41 TransactionListener _listener; 42 private boolean _isError; 43 private int _currentIndex = 0; 44 private int _status = STATUS_INITIAL; 45 private long _nextExecTime; 46 private Object _txId; 47 48 TaskmanTransaction(Object txId, TaskManager taskMan) { 49 _txId = txId; 50 _taskMan = taskMan; 51 } 52 53 TaskmanTransaction(Object txId, TaskManager taskMan, TransactionListener listener) { 54 this(txId, taskMan); 55 _listener = listener; 56 } 57 58 61 public synchronized void commit() { 62 if(_status != STATUS_INITIAL) { 63 throw new IllegalStateException ("Transaction already committed"); 64 } 65 66 _status = STATUS_COMMITTING; 67 68 if(_listener != null) { 69 _listener.onBegin(); 70 } 71 72 _taskMan.wakeUp(); 73 } 74 75 78 public int getStatus() { 79 return _status; 80 } 81 82 85 public Object getId() { 86 return _txId; 87 } 88 89 synchronized void registerTask(TaskDescriptor desc) { 90 if(_tasks.size() == 0) { 91 desc.setRoot(true); 92 } 93 94 if(!desc.isPeriodic()) { 95 desc.calcNextExecTime(); 96 } 97 98 if((_nextExecTime != 0) && (desc.nextExecTime() < _nextExecTime)) { 99 _nextExecTime = desc.nextExecTime(); 100 } else if(_nextExecTime == 0) { 101 _nextExecTime = desc.nextExecTime(); 102 } 103 104 if(_status == STATUS_INITIAL) { 105 _tasks.add(desc); 106 } else { 107 _tasks.add(_currentIndex, desc); 108 } 109 } 110 111 synchronized long nextExecTime() { 112 return _nextExecTime; 113 } 114 115 synchronized void execute() { 116 long next = -1; 117 118 if(_tasks.size() == 0) { 119 _nextExecTime = next; 120 121 return; 122 } else if(_currentIndex >= _tasks.size()) { 123 _currentIndex = 0; 124 125 return; 126 } 127 128 TaskDescriptor current = (TaskDescriptor) _tasks.get(_currentIndex); 129 130 if(current.isFinished()) { 131 _tasks.remove(_currentIndex); 132 notifyCompleted(); 133 if(current.getTaskOutput() != null) 134 current.getTaskOutput().close(); 135 } else if(_isError) { 136 _tasks.remove(_currentIndex); 137 notifyCompleted(); 138 if(current.getTaskOutput() != null) 139 current.getTaskOutput().close(); 140 } else { 141 if(System.currentTimeMillis() >= current.nextExecTime()) { 142 143 TaskOutput out = current.getTaskOutput(); 144 if(out == null) 145 out = _taskMan.newTaskOutput(current.getName()); 146 TaskContext ctx; 147 if(!(out instanceof TxTaskOutput)) { 148 out = new TxTaskOutput(out, this); 149 } 150 ctx = new TaskContext(out, _taskMan); 151 ctx.setTransaction(this); 152 ctx.copyVals(current.getContextVals()); 153 try { 154 _currentIndex++; 155 current.exec(ctx); 156 } catch(RuntimeException e) { 157 out.error(e); 158 out.close(); 159 } 160 161 if(next == -1) { 162 next = current.nextExecTime(); 163 } else if(current.nextExecTime() < next) { 164 next = current.nextExecTime(); 165 } 166 } 167 } 168 _nextExecTime = next; 169 } 170 171 void notifyError(Object msg, Throwable t) { 172 _isError = true; 173 174 if(_listener != null) { 175 if(t == null) { 176 _listener.onError(msg, t); 177 } else { 178 _listener.onError(msg, t); 179 } 180 } 181 } 182 183 void notifyError(Throwable t) { 184 _isError = true; 185 if(_listener != null) 186 _listener.onError(t); 187 } 188 189 boolean isError() { 190 return _isError; 191 } 192 193 void setTaskManager(TaskManager tm) { 194 _taskMan = tm; 195 } 196 197 private void notifyCompleted() { 198 if(_tasks.size() == 0) { 199 _status = STATUS_COMMITTED; 200 201 if(_listener != null) { 202 _listener.onCommitted(!_isError); 203 } 204 } 205 } 206 } 207 | Popular Tags |