1  package frost.threads
  2  
  3  ====================================================================================================
  4  A queue which can be shared between multiple [Thread]s, allowing any of them to submit messages to
  5  or pull messages out of it. Multiple threads may simultaneously wait for a message from the same
  6  message queue, in which case one of the waiting threads will be selected to receive the message in a
  7  system-dependent fashion, while the other threads will continue to wait.
  8  ====================================================================================================
  9  @unsafeImmutable
 10  class MessageQueue<T:Immutable> : Immutable {
 11      @private
 12      @unsafeImmutable
 13      class Message<T:Immutable> : Immutable {
 14          def payload:T
 15  
 16          var next:Message<T>?
 17  
 18          init(payload:T) {
 19              self.payload := payload
 20          }
 21  
 22          @override
 23          function get_toString():String {
 24              if next !== null {
 25                  return "Message(\{payload}, \{next})"
 26              }
 27              return "Message(\{payload})"
 28          }
 29      }
 30  
 31      @private
 32      def lock := Lock()
 33  
 34      @private
 35      def notifier := Notifier(lock)
 36  
 37      @private
 38      var count := 0
 39  
 40      @private
 41      var head:Message<T>?
 42  
 43      @private
 44      var tail:Message<T>?
 45  
 46      ================================================================================================
 47      Posts an object to the queue.
 48      
 49      @param data the object to post
 50      ================================================================================================
 51      method post(data:T) {
 52          def scope := ScopedLock(lock)
 53          if tail !== null {
 54              assert head !== null
 55              tail.next := Message<T>(data)
 56              tail := tail.next
 57          }
 58          else {
 59              assert count = 0
 60              head := Message<T>(data)
 61              tail := head
 62          }
 63          count += 1
 64          notifier.notify()
 65      }
 66  
 67      ================================================================================================
 68      Returns the number of messages currently in the queue.
 69      
 70      @returns the number of messages in this `MessageQueue`
 71      @see hasMessage()
 72      @see getMessage()
 73      ================================================================================================
 74      method pendingMessages():Int {
 75          def scope := ScopedLock(lock)
 76          return count
 77      }
 78      
 79      ================================================================================================
 80      Returns `true` if there are one or more messages in the queue.
 81      
 82      @returns whether this queue has pending messages
 83      @see pendingMessages()
 84      @see getMessage()
 85      ================================================================================================
 86      method hasMessage():Bit {
 87          return pendingMessages() > 0
 88      }
 89      
 90      ================================================================================================
 91      Returns the next message from the queue, blocking until one is available.
 92      
 93      @returns the next message from the queue
 94      ================================================================================================
 95      method getMessage():T {
 96          def scope := ScopedLock(lock)
 97          while head == null {
 98              notifier.wait()
 99          }
100          def result := head.payload
101          head := head.next
102          if head == null {
103              tail := null
104          }
105          count -= 1
106          return result
107      }
108  
109      ================================================================================================
110      Remove all pending messages from the queue.
111      ================================================================================================
112      method clear() {
113          while hasMessage() {
114              getMessage()
115          }
116      }
117  }