1 /** 2 * ae.utils.statequeue 3 * 4 * License: 5 * This Source Code Form is subject to the terms of 6 * the Mozilla Public License, v. 2.0. If a copy of 7 * the MPL was not distributed with this file, You 8 * can obtain one at http://mozilla.org/MPL/2.0/. 9 * 10 * Authors: 11 * Vladimir Panteleev <ae@cy.md> 12 */ 13 14 module ae.utils.statequeue; 15 16 import core.time; 17 18 import ae.net.asockets; 19 import ae.utils.array; 20 import ae.utils.promise; 21 22 /** 23 Let `f(x)` be an expensive operation which changes something to 24 (or towards) state `x`. At most one `f` call may be in progress at any time. 25 This type orchestrates a series of operations that eventually bring 26 the state to some goal, while allowing the goal to change at any time. 27 */ 28 struct StateQueue(State) 29 { 30 private: 31 bool prodPending; 32 33 // Flag to aid the debug invariant. 34 // Normally (oldState == newState) == (currentTransition is null). 35 // One exception to that is when a transition from an invalid state started, 36 // and setCurrentState was called during the transition, 37 // so we're "transitioning" from an invalid to an invalid state. 38 debug bool stateWasReset; 39 40 void enqueueProd() 41 { 42 if (prodPending) 43 return; 44 prodPending = true; 45 socketManager.onNextTick(&prod); 46 } 47 48 void prod() 49 { 50 prodPending = false; 51 if (currentTransition) 52 return; // Will be picked up in onComplete 53 assert(oldState == newState); 54 if (newState == goalState) 55 { 56 // Already in the goal state 57 goalPromise.fulfill(); 58 return; 59 } 60 // Start a new transition 61 newState = goalState; 62 currentTransition = stateFunc(goalState) 63 .then(&onComplete, &onFail); 64 } 65 66 void onComplete(State resultState) 67 { 68 assert(currentTransition); 69 debug assert(oldState != newState || stateWasReset); 70 debug stateWasReset = false; 71 oldState = newState = resultState; 72 currentTransition = null; 73 74 prod(); 75 } 76 77 void onFail(Exception e) 78 { 79 assert(currentTransition); 80 debug assert(oldState != newState || stateWasReset); 81 debug stateWasReset = false; 82 currentTransition = null; 83 84 if (newState == goalState) 85 { 86 // State transition failed. 87 // We cannot reach the goal state; give up until further instructions. 88 newState = goalState = oldState; 89 goalPromise.reject(e); 90 } 91 else 92 { 93 // Actually, we now want to go somewhere else. 94 // Try again. 95 newState = oldState; 96 enqueueProd(); 97 } 98 } 99 100 public: 101 @disable this(); 102 103 /// The asynchronous implementation function which actually changes the state. 104 Promise!State delegate(State) stateFunc; 105 106 /// The state that any current change is moving away from. 107 State oldState; 108 109 /// The state that any current change is moving towards. 110 State newState; 111 112 /// The final state that we want to be in. 113 State goalState; 114 115 /// The promise that will be fulfilled when we reach the goal state. 116 Promise!void goalPromise; 117 118 /// The current state transition. 119 Promise!void currentTransition; 120 121 debug invariant 122 { 123 if (currentTransition) 124 assert(oldState != newState || stateWasReset); 125 else 126 assert(oldState == newState); 127 } 128 129 /// Constructor. 130 this( 131 /// The function implementing the state transition operation. 132 /// Accepts the goal state, and returns a promise which is the 133 /// resulting (ideally but not necessarily, the goal) state. 134 /// If the returned promise is rejected, it indicates that the 135 /// state hasn't changed and the goal cannot be reached. 136 Promise!State delegate(State) stateFunc, 137 /// The initial state. 138 State initialState = State.init, 139 ) 140 { 141 this.stateFunc = stateFunc; 142 this.oldState = this.newState = this.goalState = initialState; 143 goalPromise = resolve(); 144 } 145 146 /// Set the goal state. Starts off a transition operation if needed. 147 /// Returns a promise that will be fulfilled when we reach the goal state, 148 /// or rejected if the goal state changes before it is reached. 149 Promise!void setGoal(State state) 150 { 151 if (goalState != state) 152 { 153 if (currentTransition || newState != goalState) 154 goalPromise.reject(new Exception("Goal changed")); 155 goalPromise = new Promise!void; 156 157 this.goalState = state; 158 enqueueProd(); 159 } 160 return goalPromise; 161 } 162 163 /// Can be used to indicate that the state has been changed externally 164 /// (e.g. to some "invalid"/"dirty" state). 165 /// If a transition operation is already in progress, assume that it will 166 /// change the state to the given state instead of its actual goal. 167 void setCurrentState(State state = State.init) 168 { 169 if (currentTransition) 170 { 171 newState = state; 172 debug stateWasReset = true; 173 } 174 else 175 { 176 oldState = newState = state; 177 enqueueProd(); 178 } 179 } 180 } 181 182 // Test changing the goal multiple times per tick 183 debug(ae_unittest) unittest 184 { 185 import ae.utils.promise.timing : sleep; 186 187 int state, workDone; 188 Promise!int changeState(int i) 189 { 190 return sleep(1.msecs).then({ 191 workDone++; 192 state = i; 193 return i; 194 }); 195 } 196 197 auto q = StateQueue!int(&changeState); 198 assert(workDone == 0); 199 200 q.setGoal(1).ignoreResult(); 201 q.setGoal(2).ignoreResult(); 202 socketManager.loop(); 203 assert(state == 2 && workDone == 1); 204 } 205 206 // Test incremental transitions towards the goal 207 debug(ae_unittest) unittest 208 { 209 import ae.utils.promise.timing : sleep; 210 211 int state, workDone; 212 Promise!int changeState(int i) 213 { 214 return sleep(1.msecs).then({ 215 workDone++; 216 auto nextState = state + 1; 217 state = nextState; 218 return nextState; 219 }); 220 } 221 222 auto q = StateQueue!int(&changeState); 223 assert(workDone == 0); 224 225 q.setGoal(3).ignoreResult(); 226 socketManager.loop(); 227 assert(state == 3 && workDone == 3); 228 } 229 230 231 /// A wrapper around a `StateQueue` which modifies its behavior, such 232 /// that: 233 /// 1. After a transition to a state completes, a temporary "lock" is 234 /// obtained, which blocks any transitions while it is held; 235 /// 2. Transition requests form a queue of arbitrary length. 236 struct LockingStateQueue( 237 /// A type representing the state. 238 State, 239 /// If `true`, guarantee that requests for a certain goal state will 240 /// be satisfied strictly in the order that they were requested. 241 /// If `false` (default), requests for a certain state may be 242 /// grouped together and satisfied out-of-order. 243 bool strictlyOrdered = false, 244 ) 245 { 246 private: 247 StateQueue!State stateQueue; 248 249 struct DesiredState 250 { 251 State state; 252 Promise!Lock[] callbacks; 253 } 254 DesiredState[] desiredStates; 255 256 bool isLocked; 257 debug size_t lockIndex; 258 259 Lock acquire() 260 { 261 assert(!isLocked); 262 263 Lock lock; 264 debug lock.lockIndex = ++lockIndex; 265 isLocked = true; 266 267 return lock; 268 } 269 270 void prod() 271 { 272 if (isLocked) 273 return; // Waiting for .release() -> .prod() 274 275 // Drain fulfilled queued states 276 while (desiredStates.length > 0 && desiredStates[0].callbacks.length == 0) 277 desiredStates = desiredStates[1 .. $]; 278 279 if (desiredStates.length == 0) 280 return; // Nothing to do 281 282 // Acquire the lock now, whether we are transitioning to another state, 283 // or immediately resolving a callback. 284 auto lock = acquire(); 285 step(lock); 286 } 287 288 void step(Lock lock) 289 { 290 assert(desiredStates.length > 0); 291 292 // StateQueue should be idle 293 assert(stateQueue.oldState == stateQueue.newState && stateQueue.newState == stateQueue.goalState); 294 295 // Check for matches in the queue 296 size_t maxIndex = strictlyOrdered ? 1 : desiredStates.length; 297 foreach (ref desiredState; desiredStates[0 .. maxIndex]) 298 if (desiredState.state == stateQueue.newState) 299 { 300 auto callback = desiredState.callbacks.queuePop(); 301 callback.fulfill(lock); 302 // Execution will be resumed when .resume() is called with the lock 303 return; 304 } 305 306 // No matches in the queue? Go to the next queued goal state 307 stateQueue 308 .setGoal(desiredStates[0].state) 309 .then({ 310 // TODO: if stateFunc moved incrementally but not fully towards goalState, 311 // this could still be useful for us when strictlyOrdered is false and 312 // this intermediary state is in the queue. However, currently StateQueue 313 // only resolves its returned promise when goalState is reached. 314 315 // Re-check queue 316 step(lock); 317 }) 318 .except((Exception e) { 319 // On a transition error, drain all queued states 320 auto queue = desiredStates; 321 desiredStates = null; 322 foreach (ref desiredState; queue) 323 foreach (callback; desiredState.callbacks) 324 callback.reject(e); 325 release(lock); 326 }); 327 } 328 329 public: 330 /// Constructor. 331 this( 332 /// The function implementing the state transition operation. 333 /// Accepts the goal state, and returns a promise which is the 334 /// resulting (ideally but not necessarily, the goal) state. 335 Promise!State delegate(State) stateFunc, 336 /// The initial state. 337 State initialState = State.init, 338 ) 339 { 340 this.stateQueue = StateQueue!State(stateFunc, initialState); 341 } 342 343 /// Represents a held lock. 344 /// The lock is acquired automatically when a desired state is reached. 345 /// To release the lock, call `.release` on the queue object. 346 struct Lock 347 { 348 debug private size_t lockIndex; 349 } 350 351 /// Enqueue a desired goal state. 352 Promise!Lock addGoal(State state) 353 { 354 scope(success) prod(); 355 auto p = new Promise!Lock(); 356 static if (!strictlyOrdered) 357 foreach (ref desiredState; desiredStates) 358 if (desiredState.state == state) 359 { 360 desiredState.callbacks ~= p; 361 return p; 362 } 363 desiredStates ~= DesiredState(state, [p]); 364 return p; 365 } 366 367 /// Relinquish the lock, allowing a transition to a different state. 368 void release(Lock lock) 369 { 370 assert(isLocked, "Attempting to release a lock when one is not held"); 371 debug assert(lockIndex == lock.lockIndex, "Attempting to release a mismatching lock"); 372 373 isLocked = false; 374 prod(); 375 } 376 377 /// These may be useful to access in stateFunc. 378 @property State oldState() { return stateQueue.oldState; } 379 @property State newState() { return stateQueue.newState; } /// ditto 380 @property State goalState() { return stateQueue.goalState; } /// ditto 381 } 382 383 debug(ae_unittest) unittest 384 { 385 import ae.utils.promise.timing : sleep; 386 387 Promise!int changeState(int i) 388 { 389 return sleep(1.msecs).then({ 390 return i; 391 }); 392 } 393 394 static foreach (bool strictlyOrdered; [false, true]) 395 {{ 396 auto q = LockingStateQueue!(int, strictlyOrdered)(&changeState); 397 398 int[] goals; 399 void addGoal(int goal) 400 { 401 q.addGoal(goal).then((lock) { 402 goals ~= goal; 403 q.release(lock); 404 }); 405 } 406 addGoal(1); 407 addGoal(2); 408 addGoal(1); 409 socketManager.loop(); 410 auto expectedGoals = strictlyOrdered ? [1, 2, 1] : [1, 1, 2]; 411 assert(goals == expectedGoals); 412 }} 413 }