1 package frost.threads
2
3 ====================================================================================================
4 A queue which can be shared between multiple [Thread]s, allowing any of them to submit messages to
5 or pull messages out of it. Multiple threads may simultaneously wait for a message from the same
6 message queue, in which case one of the waiting threads will be selected to receive the message in a
7 system-dependent fashion, while the other threads will continue to wait.
8 ====================================================================================================
9 @unsafeImmutable
10 class MessageQueue<T:Immutable> : Immutable {
11 @private
12 @unsafeImmutable
13 class Message<T:Immutable> : Immutable {
14 def payload:T
15
16 var next:Message<T>?
17
18 init(payload:T) {
19 self.payload := payload
20 }
21
22 @override
23 function get_toString():String {
24 if next !== null {
25 return "Message(\{payload}, \{next})"
26 }
27 return "Message(\{payload})"
28 }
29 }
30
31 @private
32 def lock := Lock()
33
34 @private
35 def notifier := Notifier(lock)
36
37 @private
38 var count := 0
39
40 @private
41 var head:Message<T>?
42
43 @private
44 var tail:Message<T>?
45
46 ================================================================================================
47 Posts an object to the queue.
48
49 @param data the object to post
50 ================================================================================================
51 method post(data:T) {
52 def scope := ScopedLock(lock)
53 if tail !== null {
54 assert head !== null
55 tail.next := Message<T>(data)
56 tail := tail.next
57 }
58 else {
59 assert count = 0
60 head := Message<T>(data)
61 tail := head
62 }
63 count += 1
64 notifier.notify()
65 }
66
67 ================================================================================================
68 Returns the number of messages currently in the queue.
69
70 @returns the number of messages in this `MessageQueue`
71 @see hasMessage()
72 @see getMessage()
73 ================================================================================================
74 method pendingMessages():Int {
75 def scope := ScopedLock(lock)
76 return count
77 }
78
79 ================================================================================================
80 Returns `true` if there are one or more messages in the queue.
81
82 @returns whether this queue has pending messages
83 @see pendingMessages()
84 @see getMessage()
85 ================================================================================================
86 method hasMessage():Bit {
87 return pendingMessages() > 0
88 }
89
90 ================================================================================================
91 Returns the next message from the queue, blocking until one is available.
92
93 @returns the next message from the queue
94 ================================================================================================
95 method getMessage():T {
96 def scope := ScopedLock(lock)
97 while head == null {
98 notifier.wait()
99 }
100 def result := head.payload
101 head := head.next
102 if head == null {
103 tail := null
104 }
105 count -= 1
106 return result
107 }
108
109 ================================================================================================
110 Remove all pending messages from the queue.
111 ================================================================================================
112 method clear() {
113 while hasMessage() {
114 getMessage()
115 }
116 }
117 }