1 /**
2  * Cross-thread UI communication.
3  *
4  * Background threads cannot touch Win32 controls directly — those belong to the
5  * UI thread. `CommandQueue!T` is a thread-safe FIFO; `UiDispatcher!T` pairs a
6  * queue with a target window so a worker can `post` a command and wake the UI
7  * thread, which then `drain`s and processes the commands on its own turf.
8  *
9  * The framework supplies only the mechanism. Applications define their own
10  * command type (typically an enum) and the WndProc handling for the wake
11  * message.
12  */
13 module deft.commandqueue;
14 
15 version (Windows):
16 
17 import core.sync.mutex : Mutex;
18 import core.sys.windows.windows;
19 
20 /// Default wake message id, in the `WM_APP`..`0xBFFF` application-reserved range.
21 enum uint defaultWakeMessage = WM_APP + 1;
22 
23 /// A thread-safe FIFO queue of `T`.
24 class CommandQueue(T)
25 {
26 	private T[] items;
27 	private Mutex mutex;
28 
29 	/// Create an empty queue with its own mutex.
30 	this()
31 	{
32 		mutex = new Mutex();
33 	}
34 
35 	/// Append an item. Safe to call from any thread.
36 	void push(T item)
37 	{
38 		synchronized (mutex)
39 			items ~= item;
40 	}
41 
42 	/// Atomically remove and return all queued items, in insertion order.
43 	T[] drainAll()
44 	{
45 		synchronized (mutex)
46 		{
47 			auto result = items;
48 			items = null;
49 			return result;
50 		}
51 	}
52 
53 	/// Whether the queue currently holds no items.
54 	bool empty()
55 	{
56 		synchronized (mutex)
57 			return items.length == 0;
58 	}
59 }
60 
61 /**
62  * Pairs a `CommandQueue!T` with a target window. `post` enqueues a command and
63  * wakes the UI thread with `PostMessageW`; the window's WndProc responds to the
64  * wake message by calling `drain` and processing the result.
65  */
66 struct UiDispatcher(T)
67 {
68 	/// The backing queue (shared; reference type).
69 	CommandQueue!T queue;
70 
71 	/// The window woken on `post`.
72 	HWND targetHwnd;
73 
74 	/// The wake message id posted to `targetHwnd`.
75 	uint messageId = defaultWakeMessage;
76 
77 	/**
78 	 * Create a dispatcher targeting `hwnd`, with a fresh backing queue. `messageId`
79 	 * is the wake message posted to the window on `post` (defaults to
80 	 * `defaultWakeMessage`).
81 	 */
82 	this(HWND hwnd, uint messageId = defaultWakeMessage)
83 	{
84 		this.queue = new CommandQueue!T();
85 		this.targetHwnd = hwnd;
86 		this.messageId = messageId;
87 	}
88 
89 	/// Enqueue a command and wake the target window.
90 	void post(T command)
91 	{
92 		if (queue is null)
93 			queue = new CommandQueue!T();
94 		queue.push(command);
95 		if (targetHwnd !is null)
96 			PostMessageW(targetHwnd, messageId, 0, 0);
97 	}
98 
99 	/// Remove and return all queued commands. Call on the UI thread.
100 	T[] drain()
101 	{
102 		if (queue is null)
103 			return null;
104 		return queue.drainAll();
105 	}
106 }
107 
108 unittest
109 {
110 	// Push then drain returns everything in order; the queue is then empty.
111 	auto q = new CommandQueue!int();
112 	q.push(1);
113 	q.push(2);
114 	q.push(3);
115 	assert(q.drainAll() == [1, 2, 3]);
116 	assert(q.empty());
117 }
118 
119 unittest
120 {
121 	// Draining an empty queue yields an empty array and does not block.
122 	auto q = new CommandQueue!int();
123 	assert(q.empty());
124 	assert(q.drainAll().length == 0);
125 }
126 
127 unittest
128 {
129 	// Push/drain roundtrip with a non-trivial element type.
130 	auto q = new CommandQueue!string();
131 	q.push("a");
132 	q.push("b");
133 	auto drained = q.drainAll();
134 	assert(drained == ["a", "b"]);
135 	assert(q.empty());
136 }
137 
138 unittest
139 {
140 	// Concurrent pushes from many threads: every item must survive.
141 	import core.thread : Thread;
142 
143 	enum threadCount = 8;
144 	enum perThread = 1000;
145 
146 	auto q = new CommandQueue!int();
147 	Thread[] threads;
148 	foreach (t; 0 .. threadCount)
149 		threads ~= new Thread({
150 			foreach (i; 0 .. perThread)
151 				q.push(i);
152 		}).start();
153 
154 	foreach (th; threads)
155 		th.join();
156 
157 	auto all = q.drainAll();
158 	assert(all.length == threadCount * perThread);
159 }