almost done, queue fix next
This commit is contained in:
66
util/queue/structs.go
Normal file
66
util/queue/structs.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Queue is a thin wrapper around a buffered channel.
|
||||
type Queue[T any] struct {
|
||||
ch chan T
|
||||
closedMu sync.Mutex
|
||||
closed bool
|
||||
closedCh chan struct{}
|
||||
}
|
||||
|
||||
func NewQueue[T any](capacity int) *Queue[T] {
|
||||
if capacity <= 0 {
|
||||
panic("capacity > 0 required")
|
||||
}
|
||||
|
||||
return &Queue[T]{
|
||||
ch: make(chan T, capacity),
|
||||
closedCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Cap returns capacity.
|
||||
func (q *Queue[T]) Cap() int { return cap(q.ch) }
|
||||
|
||||
// Len returns current length (snapshot).
|
||||
func (q *Queue[T]) Len() int { return len(q.ch) }
|
||||
|
||||
// Enqueue returns immediately: true if enqueued, false otherwise.
|
||||
func (q *Queue[T]) Enqueue(v T) bool {
|
||||
select {
|
||||
case q.ch <- v:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Dequeue returns immediately (item, true) or (zero, false) if empty.
|
||||
func (q *Queue[T]) Dequeue() (T, bool) {
|
||||
var zero T
|
||||
select {
|
||||
case v := <-q.ch:
|
||||
return v, true
|
||||
default:
|
||||
return zero, false
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the queue. Further Enqueue attempts return ErrClosed. Consumers drain until channel empty then see ErrClosed.
|
||||
func (q *Queue[T]) Close() {
|
||||
q.closedMu.Lock()
|
||||
|
||||
if q.closed {
|
||||
q.closedMu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
q.closed = true
|
||||
close(q.closedCh)
|
||||
close(q.ch)
|
||||
q.closedMu.Unlock()
|
||||
}
|
||||
Reference in New Issue
Block a user