1 /**
2  * ae.utils.statequeue
3  *
4  * License:
5  *   This Source Code Form is subject to the terms of
6  *   the Mozilla Public License, v. 2.0. If a copy of
7  *   the MPL was not distributed with this file, You
8  *   can obtain one at http://mozilla.org/MPL/2.0/.
9  *
10  * Authors:
11  *   Vladimir Panteleev <ae@cy.md>
12  */
13 
14 module ae.utils.statequeue;
15 
16 import core.time;
17 
18 import ae.net.asockets;
19 import ae.utils.array;
20 import ae.utils.promise;
21 
22 /**
23    Let `f(x)` be an expensive operation which changes something to
24    (or towards) state `x`.  At most one `f` call may be in progress at any time.
25    This type orchestrates a series of operations that eventually bring
26    the state to some goal, while allowing the goal to change at any time.
27  */
28 struct StateQueue(State)
29 {
30 private:
31 	bool prodPending;
32 
33 	// Flag to aid the debug invariant.
34 	// Normally (oldState == newState) == (currentTransition is null).
35 	// One exception to that is when a transition from an invalid state started,
36 	// and setCurrentState was called during the transition,
37 	// so we're "transitioning" from an invalid to an invalid state.
38 	debug bool stateWasReset;
39 
40 	void enqueueProd()
41 	{
42 		if (prodPending)
43 			return;
44 		prodPending = true;
45 		socketManager.onNextTick(&prod);
46 	}
47 
48 	void prod()
49 	{
50 		prodPending = false;
51 		if (currentTransition)
52 			return; // Will be picked up in onComplete
53 		assert(oldState == newState);
54 		if (newState == goalState)
55 		{
56 			// Already in the goal state
57 			goalPromise.fulfill();
58 			return;
59 		}
60 		// Start a new transition
61 		newState = goalState;
62 		currentTransition = stateFunc(goalState)
63 			.then(&onComplete, &onFail);
64 	}
65 
66 	void onComplete(State resultState)
67 	{
68 		assert(currentTransition);
69 		debug assert(oldState != newState || stateWasReset);
70 		debug stateWasReset = false;
71 		oldState = newState = resultState;
72 		currentTransition = null;
73 
74 		prod();
75 	}
76 
77 	void onFail(Exception e)
78 	{
79 		assert(currentTransition);
80 		debug assert(oldState != newState || stateWasReset);
81 		debug stateWasReset = false;
82 		currentTransition = null;
83 
84 		if (newState == goalState)
85 		{
86 			// State transition failed.
87 			// We cannot reach the goal state; give up until further instructions.
88 			newState = goalState = oldState;
89 			goalPromise.reject(e);
90 		}
91 		else
92 		{
93 			// Actually, we now want to go somewhere else.
94 			// Try again.
95 			newState = oldState;
96 			enqueueProd();
97 		}
98 	}
99 
100 public:
101 	@disable this();
102 
103 	/// The asynchronous implementation function which actually changes the state.
104 	Promise!State delegate(State) stateFunc;
105 
106 	/// The state that any current change is moving away from.
107 	State oldState;
108 
109 	/// The state that any current change is moving towards.
110 	State newState;
111 
112 	/// The final state that we want to be in.
113 	State goalState;
114 
115 	/// The promise that will be fulfilled when we reach the goal state.
116 	Promise!void goalPromise;
117 
118 	/// The current state transition.
119 	Promise!void currentTransition;
120 
121 	debug invariant
122 	{
123 		if (currentTransition)
124 			assert(oldState != newState || stateWasReset);
125 		else
126 			assert(oldState == newState);
127 	}
128 
129 	/// Constructor.
130 	this(
131 		/// The function implementing the state transition operation.
132 		/// Accepts the goal state, and returns a promise which is the
133 		/// resulting (ideally but not necessarily, the goal) state.
134 		/// If the returned promise is rejected, it indicates that the
135 		/// state hasn't changed and the goal cannot be reached.
136 		Promise!State delegate(State) stateFunc,
137 		/// The initial state.
138 		State initialState = State.init,
139 	)
140 	{
141 		this.stateFunc = stateFunc;
142 		this.oldState = this.newState = this.goalState = initialState;
143 		goalPromise = resolve();
144 	}
145 
146 	/// Set the goal state.  Starts off a transition operation if needed.
147 	/// Returns a promise that will be fulfilled when we reach the goal state,
148 	/// or rejected if the goal state changes before it is reached.
149 	Promise!void setGoal(State state)
150 	{
151 		if (goalState != state)
152 		{
153 			if (currentTransition || newState != goalState)
154 				goalPromise.reject(new Exception("Goal changed"));
155 			goalPromise = new Promise!void;
156 
157 			this.goalState = state;
158 			enqueueProd();
159 		}
160 		return goalPromise;
161 	}
162 
163 	/// Can be used to indicate that the state has been changed externally
164 	/// (e.g. to some "invalid"/"dirty" state).
165 	/// If a transition operation is already in progress, assume that it will
166 	/// change the state to the given state instead of its actual goal.
167 	void setCurrentState(State state = State.init)
168 	{
169 		if (currentTransition)
170 		{
171 			newState = state;
172 			debug stateWasReset = true;
173 		}
174 		else
175 		{
176 			oldState = newState = state;
177 			enqueueProd();
178 		}
179 	}
180 }
181 
182 // Test changing the goal multiple times per tick
183 debug(ae_unittest) unittest
184 {
185 	import ae.utils.promise.timing : sleep;
186 
187 	int state, workDone;
188 	Promise!int changeState(int i)
189 	{
190 		return sleep(1.msecs).then({
191 			workDone++;
192 			state = i;
193 			return i;
194 		});
195 	}
196 
197 	auto q = StateQueue!int(&changeState);
198 	assert(workDone == 0);
199 
200 	q.setGoal(1).ignoreResult();
201 	q.setGoal(2).ignoreResult();
202 	socketManager.loop();
203 	assert(state == 2 && workDone == 1);
204 }
205 
206 // Test incremental transitions towards the goal
207 debug(ae_unittest) unittest
208 {
209 	import ae.utils.promise.timing : sleep;
210 
211 	int state, workDone;
212 	Promise!int changeState(int i)
213 	{
214 		return sleep(1.msecs).then({
215 			workDone++;
216 			auto nextState = state + 1;
217 			state = nextState;
218 			return nextState;
219 		});
220 	}
221 
222 	auto q = StateQueue!int(&changeState);
223 	assert(workDone == 0);
224 
225 	q.setGoal(3).ignoreResult();
226 	socketManager.loop();
227 	assert(state == 3 && workDone == 3);
228 }
229 
230 
231 /// A wrapper around a `StateQueue` which modifies its behavior, such
232 /// that:
233 /// 1. After a transition to a state completes, a temporary "lock" is
234 ///    obtained, which blocks any transitions while it is held;
235 /// 2. Transition requests form a queue of arbitrary length.
236 struct LockingStateQueue(
237 	/// A type representing the state.
238 	State,
239 	/// If `true`, guarantee that requests for a certain goal state will
240 	/// be satisfied strictly in the order that they were requested.
241 	/// If `false` (default), requests for a certain state may be
242 	/// grouped together and satisfied out-of-order.
243 	bool strictlyOrdered = false,
244 )
245 {
246 private:
247 	StateQueue!State stateQueue;
248 
249 	struct DesiredState
250 	{
251 		State state;
252 		Promise!Lock[] callbacks;
253 	}
254 	DesiredState[] desiredStates;
255 
256 	bool isLocked;
257 	debug size_t lockIndex;
258 
259 	Lock acquire()
260 	{
261 		assert(!isLocked);
262 
263 		Lock lock;
264 		debug lock.lockIndex = ++lockIndex;
265 		isLocked = true;
266 
267 		return lock;
268 	}
269 
270 	void prod()
271 	{
272 		if (isLocked)
273 			return; // Waiting for .release() -> .prod()
274 
275 		// Drain fulfilled queued states
276 		while (desiredStates.length > 0 && desiredStates[0].callbacks.length == 0)
277 			desiredStates = desiredStates[1 .. $];
278 
279 		if (desiredStates.length == 0)
280 			return; // Nothing to do
281 
282 		// Acquire the lock now, whether we are transitioning to another state,
283 		// or immediately resolving a callback.
284 		auto lock = acquire();
285 		step(lock);
286 	}
287 
288 	void step(Lock lock)
289 	{
290 		assert(desiredStates.length > 0);
291 
292 		// StateQueue should be idle
293 		assert(stateQueue.oldState == stateQueue.newState && stateQueue.newState == stateQueue.goalState);
294 
295 		// Check for matches in the queue
296 		size_t maxIndex = strictlyOrdered ? 1 : desiredStates.length;
297 		foreach (ref desiredState; desiredStates[0 .. maxIndex])
298 			if (desiredState.state == stateQueue.newState)
299 			{
300 				auto callback = desiredState.callbacks.queuePop();
301 				callback.fulfill(lock);
302 				// Execution will be resumed when .resume() is called with the lock
303 				return;
304 			}
305 
306 		// No matches in the queue? Go to the next queued goal state
307 		stateQueue
308 			.setGoal(desiredStates[0].state)
309 			.then({
310 				// TODO: if stateFunc moved incrementally but not fully towards goalState,
311 				// this could still be useful for us when strictlyOrdered is false and
312 				// this intermediary state is in the queue. However, currently StateQueue
313 				// only resolves its returned promise when goalState is reached.
314 
315 				// Re-check queue
316 				step(lock);
317 			})
318 			.except((Exception e) {
319 				// On a transition error, drain all queued states
320 				auto queue = desiredStates;
321 				desiredStates = null;
322 				foreach (ref desiredState; queue)
323 					foreach (callback; desiredState.callbacks)
324 						callback.reject(e);
325 				release(lock);
326 			});
327 	}
328 
329 public:
330 	/// Constructor.
331 	this(
332 		/// The function implementing the state transition operation.
333 		/// Accepts the goal state, and returns a promise which is the
334 		/// resulting (ideally but not necessarily, the goal) state.
335 		Promise!State delegate(State) stateFunc,
336 		/// The initial state.
337 		State initialState = State.init,
338 	)
339 	{
340 		this.stateQueue = StateQueue!State(stateFunc, initialState);
341 	}
342 
343 	/// Represents a held lock.
344 	/// The lock is acquired automatically when a desired state is reached.
345 	/// To release the lock, call `.release` on the queue object.
346 	struct Lock
347 	{
348 		debug private size_t lockIndex;
349 	}
350 
351 	/// Enqueue a desired goal state.
352 	Promise!Lock addGoal(State state)
353 	{
354 		scope(success) prod();
355 		auto p = new Promise!Lock();
356 		static if (!strictlyOrdered)
357 			foreach (ref desiredState; desiredStates)
358 				if (desiredState.state == state)
359 				{
360 					desiredState.callbacks ~= p;
361 					return p;
362 				}
363 		desiredStates ~= DesiredState(state, [p]);
364 		return p;
365 	}
366 
367 	/// Relinquish the lock, allowing a transition to a different state.
368 	void release(Lock lock)
369 	{
370 		assert(isLocked, "Attempting to release a lock when one is not held");
371 		debug assert(lockIndex == lock.lockIndex, "Attempting to release a mismatching lock");
372 
373 		isLocked = false;
374 		prod();
375 	}
376 
377 	/// These may be useful to access in stateFunc.
378 	@property State oldState() { return stateQueue.oldState; }
379 	@property State newState() { return stateQueue.newState; } /// ditto
380 	@property State goalState() { return stateQueue.goalState; } /// ditto
381 }
382 
383 debug(ae_unittest) unittest
384 {
385 	import ae.utils.promise.timing : sleep;
386 
387 	Promise!int changeState(int i)
388 	{
389 		return sleep(1.msecs).then({ 
390 			return i;
391 		});
392 	}
393 
394 	static foreach (bool strictlyOrdered; [false, true])
395 	{{
396 		auto q = LockingStateQueue!(int, strictlyOrdered)(&changeState);
397 
398 		int[] goals;
399 		void addGoal(int goal)
400 		{
401 			q.addGoal(goal).then((lock) {
402 				goals ~= goal;
403 				q.release(lock);
404 			});
405 		}
406 		addGoal(1);
407 		addGoal(2);
408 		addGoal(1);
409 		socketManager.loop();
410 		auto expectedGoals = strictlyOrdered ? [1, 2, 1] : [1, 1, 2];
411 		assert(goals == expectedGoals);
412 	}}
413 }