1 /** 2 * Cross-thread UI communication. 3 * 4 * Background threads cannot touch Win32 controls directly — those belong to the 5 * UI thread. `CommandQueue!T` is a thread-safe FIFO; `UiDispatcher!T` pairs a 6 * queue with a target window so a worker can `post` a command and wake the UI 7 * thread, which then `drain`s and processes the commands on its own turf. 8 * 9 * The framework supplies only the mechanism. Applications define their own 10 * command type (typically an enum) and the WndProc handling for the wake 11 * message. 12 */ 13 module deft.commandqueue; 14 15 version (Windows): 16 17 import core.sync.mutex : Mutex; 18 import core.sys.windows.windows; 19 20 /// Default wake message id, in the `WM_APP`..`0xBFFF` application-reserved range. 21 enum uint defaultWakeMessage = WM_APP + 1; 22 23 /// A thread-safe FIFO queue of `T`. 24 class CommandQueue(T) 25 { 26 private T[] items; 27 private Mutex mutex; 28 29 /// Create an empty queue with its own mutex. 30 this() 31 { 32 mutex = new Mutex(); 33 } 34 35 /// Append an item. Safe to call from any thread. 36 void push(T item) 37 { 38 synchronized (mutex) 39 items ~= item; 40 } 41 42 /// Atomically remove and return all queued items, in insertion order. 43 T[] drainAll() 44 { 45 synchronized (mutex) 46 { 47 auto result = items; 48 items = null; 49 return result; 50 } 51 } 52 53 /// Whether the queue currently holds no items. 54 bool empty() 55 { 56 synchronized (mutex) 57 return items.length == 0; 58 } 59 } 60 61 /** 62 * Pairs a `CommandQueue!T` with a target window. `post` enqueues a command and 63 * wakes the UI thread with `PostMessageW`; the window's WndProc responds to the 64 * wake message by calling `drain` and processing the result. 65 */ 66 struct UiDispatcher(T) 67 { 68 /// The backing queue (shared; reference type). 69 CommandQueue!T queue; 70 71 /// The window woken on `post`. 72 HWND targetHwnd; 73 74 /// The wake message id posted to `targetHwnd`. 75 uint messageId = defaultWakeMessage; 76 77 /** 78 * Create a dispatcher targeting `hwnd`, with a fresh backing queue. `messageId` 79 * is the wake message posted to the window on `post` (defaults to 80 * `defaultWakeMessage`). 81 */ 82 this(HWND hwnd, uint messageId = defaultWakeMessage) 83 { 84 this.queue = new CommandQueue!T(); 85 this.targetHwnd = hwnd; 86 this.messageId = messageId; 87 } 88 89 /// Enqueue a command and wake the target window. 90 void post(T command) 91 { 92 if (queue is null) 93 queue = new CommandQueue!T(); 94 queue.push(command); 95 if (targetHwnd !is null) 96 PostMessageW(targetHwnd, messageId, 0, 0); 97 } 98 99 /// Remove and return all queued commands. Call on the UI thread. 100 T[] drain() 101 { 102 if (queue is null) 103 return null; 104 return queue.drainAll(); 105 } 106 } 107 108 unittest 109 { 110 // Push then drain returns everything in order; the queue is then empty. 111 auto q = new CommandQueue!int(); 112 q.push(1); 113 q.push(2); 114 q.push(3); 115 assert(q.drainAll() == [1, 2, 3]); 116 assert(q.empty()); 117 } 118 119 unittest 120 { 121 // Draining an empty queue yields an empty array and does not block. 122 auto q = new CommandQueue!int(); 123 assert(q.empty()); 124 assert(q.drainAll().length == 0); 125 } 126 127 unittest 128 { 129 // Push/drain roundtrip with a non-trivial element type. 130 auto q = new CommandQueue!string(); 131 q.push("a"); 132 q.push("b"); 133 auto drained = q.drainAll(); 134 assert(drained == ["a", "b"]); 135 assert(q.empty()); 136 } 137 138 unittest 139 { 140 // Concurrent pushes from many threads: every item must survive. 141 import core.thread : Thread; 142 143 enum threadCount = 8; 144 enum perThread = 1000; 145 146 auto q = new CommandQueue!int(); 147 Thread[] threads; 148 foreach (t; 0 .. threadCount) 149 threads ~= new Thread({ 150 foreach (i; 0 .. perThread) 151 q.push(i); 152 }).start(); 153 154 foreach (th; threads) 155 th.join(); 156 157 auto all = q.drainAll(); 158 assert(all.length == threadCount * perThread); 159 }