potamic.queue
added in 0.1
Implements a stream-based message queue over Redis (or KeyDB).
create-queue!
(create-queue! queue-name conn & opts)
Creates (or resets) a queue spec and, if it doesn’t exist, optionally creates the stream key and consumer group. Queue Specs are just Clojure maps and can be reset w/o issue. However, this function will not attempt to reset the stream key or consumer group. Returns vector of [status ?err]
.
TIP: To perform a full reset, you must call potamic.queue/destroy-queue!
then potamic.queue/create-queue!
.
Option | Description | Default |
---|---|---|
:group | Sets reader group | QUEUE-NAME-group |
:init-id | Initial ID | 0 |
status
will be one of the following:
Code | Description |
---|---|
:created-with-new-stream | Both the spec and stream are created |
:created-with-existing-stream | Spec is created, stream already exists |
:updated-with-new-stream | Spec is reset, stream is created |
:updated-with-existing-stream | Spec is reset, stream already exists |
nil | An error occurred |
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:spec {:uri "redis://localhost:6379/0"}
;= :pool #taoensso.carmine.connections.ConnectionPool{..}}
(q/create-queue! :my/queue conn)
;= [:created-with-new-stream nil]
(q/create-queue! :my/queue conn)
;= [:updated-with-existing-stream nil]
(q/create-queue! :secondary/queue conn :group :secondary/group)
;= [:created-with-new-stream nil]
See also:
potamic.queue/destroy-queue!
destroy-queue!
(destroy-queue! queue-name conn & opts)
Destroys a queue spec, the stream key and the consumer group associated with it. Without the :unsafe
option, an error will be returned if there are pending messages. Returns vector of [status ?err]
.
status
will be one of the following:
Code | Description |
---|---|
:spec-destroyed_stream-destroyed | Spec destroyed, stream destroyed |
:spec-destroyed_stream-nonexistent | Spec destroyed, no stream found |
:spec-nonexistent_stream-destroyed | No spec found, stream destroyed |
:spec-nonexistent_stream-nonexistent | No spec found, no stream found |
nil | An error occurred |
Option | Description | Default |
---|---|---|
:unsafe | Skip Pending Entries List checks | false |
WARNING: Do not use :unsafe
unless you know what you’re doing!
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:spec {:uri "redis://localhost:6379/0"}
;= :pool #taoensso.carmine.connections.ConnectionPool{..}}
(q/create-queue! :my/queue conn)
;= [:created-with-new-stream nil]
(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683933694431-0" "1683933694431-1" "1683933694431-2"] nil]
(q/read-next! 2 :from :my/queue :as :consumer/one)
;= [({:id "1683933694431-0", :msg {:a 1}} {:id "1683933694431-1", :msg {:b 2}}) nil]
;; only destroy if no pending messages (in this case will fail)
(q/destroy-queue! :my/queue conn)
;= [nil
;= #:potamic{:err-type :potamic/db-err
;= :err-msg "Cannot destroy my/queue, it has pending messages"
;= :err-data
;= {:args {:queue-name :my/queue :unsafe false}
;= :groups [{:consumers 1
;= :entries-read 2
;= :last-delivered-id "1683933694431-1"
;= :name "my/queue-group"
;= :pending 2
;= :lag 1}]
;= :consumers [{:idle 3371 :name "consumer/one" :pending 2}]}
;= :err-file "[..]/potamic/src/potamic/queue.clj"
;= :err-line 644
;= :err-column 12}]
;; force-destroy, ignoring if there are pending messages
(q/destroy-queue! :my/queue conn :unsafe true)
;= [:spec-destroyed_stream-destroyed nil]
See also:
potamic.queue/create-queue!
get-queue
(get-queue queue-name)
Returns queue spec for queue-name
.
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}
(q/create-queue! :my/queue)
;= [true nil]
(q/get-queue :my/queue)
;= {:queue-name :my/queue
;= :queue-conn
;= {:spec {:uri "redis://localhost:6379/0"}
;= :pool #taoensso.carmine.connections.ConnectionPool{..}}
;= :group-name :my/queue-group
;= :redis-queue-name "my/queue"
;= :redis-group-name "my/queue-group"}
See also:
potamic.queue/get-queues
potamic.queue/create-queue!
get-queues
(get-queues)
(get-queues x)
Get all, or a subset, of queues created via potamic.queue/create-queue!
. Return value varies depending on x
input type.
Type | Result |
---|---|
nil | all queues |
regex | map filtered by searching kv space for pattern |
keyword | same as calling (get-queue x) |
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}
(q/create-queue! :my/one conn)
;= [true nil]
(q/create-queue! :my/two conn)
;= [true nil]
(q/create-queue! :my/three conn)
;= [true nil]
(q/get-queues)
;= #:my{:one {:queue-name :my/one
;= :queue-conn
;= {:spec {:uri "redis://localhost:6379/0"}
;= :pool #taoensso.carmine.connections.ConnectionPool{..}}
;= :group-name :my/one-group
;= :redis-queue-name "my/one"
;= :redis-group-name "my/one-group"}
;= :two {:queue-name :my/two
;= :queue-conn
;= {:spec {:uri "redis://localhost:6379/0"}
;= :pool #taoensso.carmine.connections.ConnectionPool{..}}
;= :group-name :my/two-group
;= :redis-queue-name "my/two"
;= :redis-group-name "my/two-group"}
;= :three {:queue-name :my/three
;= :queue-conn
;= {:spec {:uri "redis://localhost:6379/0"}
;= :pool #taoensso.carmine.connections.ConnectionPool{.. }
;= :group-name :my/three-group
;= :redis-queue-name "my/three"
;= :redis-group-name "my/three-group"}}
(q/get-queues #"three")
;= #:my{:three {:queue-name :my/three
;= :queue-conn
;= {:spec {:uri "redis://localhost:6379/0"}
;= :pool #taoensso.carmine.connections.ConnectionPool{}}
;= :group-name :my/three-group
;= :redis-queue-name "my/three"
;= :redis-group-name "my/three-group"}}
See also:
potamic.queue/get-queue
potamic.queue/create-queue!
potamic.queue/delete-queue
put
(put queue-name & xs)
Put message(s) onto a queue. Returns vector of [?msg-ids ?err]
.
NOTE: Because put
can add more than one message, on success ?msg-ids
will always be a vector of ID strings, or nil
on error.
NOTE: It is highly recommended to let Redis set the ID automatically. However, if setting the ID, anything that will resolve via name
is acceptable. As a reminder: numbers cannot be quoted.
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:spec {:uri "redis://localhost:6379/0"}
;= :pool #taoensso.carmine.connections.ConnectionPool{..}}
(q/create-queue! :my/queue conn)
;= [true nil]
;; let Redis set the ID (RECOMMENDED)
;; (all of the following are identical, in effect)
(q/put :my/queue {:a 1 :b 2 :c 3})
(q/put :my/queue :* {:a 1 :b 2 :c 3})
(q/put :my/queue "*" {:a 1 :b 2 :c 3})
(q/put :my/queue '* {:a 1 :b 2 :c 3})
;= [["1683660166747-0"] nil]
;; setting ID for a single message
(q/put :my/queue "1683743739-0" {:a 1})
;= [["1683743739-0"] nil]
;; setting the ID for a single message using wildcard.
(q/put :my/queue "1683743739-*" {:a 1})
(q/put :my/queue :1683743739-* {:a 1})
;= [["1683743739-1"] nil]
;; setting IDs for multi mode. the trailing `*` is required.
(q/put :my/queue "1683743739-*" {:a 1} {:b 2} {:c 3})
;= [["1683743739-2" "1683743739-3" "1683743739-4"] nil]
See also:
potamic.queue/read
potamic.queue/read-range
potamic.queue/read-next!
potamic.queue/read-pending-summary
potamic.queue/read-pending
potamic.queue/create-queue!
read
(read queue-name & {:keys [start block], cnt :count, :or {start 0}})
Reads messages from a queue. Returns vector of [?msgs ?err]
. This function wraps Redis’ XREAD
. It does not involve groups, nor does it track pending entries. Also, it limits read to a single queue (stream).
?msgs
is of the form:
[{:id ID :msg MSG} ..]
; or
nil
Option | Default Value |
---|---|
:start | 0 (all messages) |
:count | nil (no limit) |
:block | nil (return immediately) |
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q]
'[clojure.core.async :as async])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:spec {:uri "redis://localhost:6379/0"}
;= :pool #taoensso.carmine.connections.ConnectionPool{..}}
(q/create-queue! :my/queue conn)
;= [true nil]
(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683912716308-0" "1683912716308-1" "1683912716308-2"]
;= nil]
(q/read :my/queue)
;= [({:id "1683913507471-0", :msg {:a 1}}
;= {:id "1683913507471-1", :msg {:b 2}}
;= {:id "1683913507471-2", :msg {:c 3}})
;= nil]
(q/read :my/queue :start 0)
;= [({:id "1683913507471-0", :msg {:a 1}}
;= {:id "1683913507471-1", :msg {:b 2}}
;= {:id "1683913507471-2", :msg {:c 3}})
;= nil]
(async/go (async/<! (async/timeout 2000)) (q/put :my/queue {:d 4}))
;= #object[clojure.core.async.impl.channels.ManyToManyChannel ..]
;; block until above Go call executes
(q/read :my/queue :count 10 :start 0 :block [5 :seconds])
;= [({:id "1683915375766-0", :msg {:a 1}}
;= {:id "1683915375766-1", :msg {:b 2}}
;= {:id "1683915375766-2", :msg {:c 3}}
;= {:id "1683915435992-0", :msg {:d 4}})
;= nil]
See also:
potamic.queue/read-next!
potamic.queue/put
read-next!
(read-next! consume & {:keys [from as block]})
Reads next message(s) from a queue as consumer for queue group, side-effecting Redis’ Pending Entries List. Returns vector of [?msgs ?err]
.
?msgs
is of the form:
[{:id ID :msg MSG} ..]
; or
nil
NOTE: Readers
are responsible for declaring messages “processed” by calling potamic.queue/set-processed!
.
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}
(q/create-queue! :my/queue conn :group :my/consumer)
;= [true nil]
(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1689783633670-0" "1689783633670-1" "1689783633670-2"] nil]
(q/read-next! 1 :from :my/queue :as :my/consumer)
;= [({:id "1689783633670-0", :msg {:a 1}}) nil]
(q/read-next! :all :from :my/queue :as :my/consumer :block 2000)
;= [({:id "1689783633670-1", :msg {:b 2}}
;= {:id "1689783633670-2", :msg {:c 3}})
;= nil]
(q/read-next! :all :from :my/queue :as :my/consumer :block 1000)
;= [nil nil]
See also:
potamic.queue/read
potamic.queue/read-pending
potamic.queue/read-pending-summary
potamic.queue/put
read-pending
(read-pending count* & opts)
Lists details of pending messages for a queue
/group
pair. Optionally, a consumer
may be provided for sub-filtering. Returns vector of [?details ?err]
.
?details
is of the form:
({:id ID
:consumer NAME
:milliseconds-since-delivered MILLISECONDS
:times-delivered N}
..)
Option | Description | Default |
---|---|---|
:from | Queue name | none, required |
:for | Consumer name | nil, get entire group |
:start | Start ID | "-" (beginning) |
:end | End ID | "+" (end) |
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}
(q/create-queue! :my/queue conn)
;= [true nil]
(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683944086236-0" "1683944086236-1" "1683944086236-2"]
;= nil]
(q/read-next! 1 :from :my/queue :as :consumer/one)
;= [({:id "1683944086236-0", :msg {:a 1}})
;= nil]
(q/read-pending 10 :from :my/queue :for :consumer/one)
;= [({:id "1683944086236-0"
;= :consumer "consumer/one"
;= :milliseconds-since-delivered 9547
;= :times-delivered 1})
;= nil]
(q/read-pending 10 :from :my/queue :for :consumer/one :start '- :end '+)
;= [({:id "1683944086236-0"
;= :consumer "consumer/one"
;= :milliseconds-since-delivered 16768
;= :times-delivered 1})
;= nil]
(q/read-pending 1
:from :my/queue
:for :consumer/one
:start "1683944086236-0"
:end "1683944086236-2")
;= [({:id "1683944086236-0"
;= :consumer "consumer/one"
;= :milliseconds-since-delivered 144556
;= :times-delivered 1})
;= nil]
See also:
potamic.queue/read-pending-summary
potamic.queue/set-processed!
read-pending-summary
(read-pending-summary queue-name)
Lists all pending messages, for all consumers, for queue
. Returns vector of [?summary ?err]
.
?summary
is of the form:
{:total N
:start ID
:end ID
:consumers {CONSUMER-NAME N-PENDING}}
NOTE: CONSUMER-NAME
is coerced by the rules of util/<-str
. The string "my/consumer1"
becomes the keyword :my/consumer
.
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}
(q/create-queue! :my/queue conn)
;= [true nil]
(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683745855445-0" "1683745855445-1" "1683745855445-2"]
;= nil]
(q/read-next! 2 :from :my/queue :as :consumer/one)
;= [({:id "1683745855445-0" :msg {:a 1}}
;= {:id "1683745855445-1" :msg {:b 2}})
;= nil]
(q/read-next! 1 :from :my/queue :as :consumer/two)
;= [({:id "1683745855445-2" :msg {:c 3}})
;= nil]
(q/read-pending-summary :my/queue)
;= [{:total 3
;= :start "1683745855445-0"
;= :end "1683745855445-2"
;= :consumers #:consumer{:one 2 :two 1}}
;= nil]
See also:
potamic.queue/read-pending
potamic.queue/set-processed!
read-range
(read-range queue-name & opts)
Reads a range of messages from a queue. Returns vector of [?msgs ?err]
. This function wraps Redis’ XRANGE
. It does not involve groups, nor does it track pending entries.
?msgs
is of the form:
[{:id ID :msg MSG} ..]
; or
nil
Option | Default Value |
---|---|
:start | - (oldest) |
:end | + (newest) |
:count | nil (no limit) |
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}
(q/create-queue! :my/queue conn)
;= [true nil]
(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683946534423-0" "1683946534423-1" "1683946534423-2"]
;= nil]
(q/read-range :my/queue :start '- :end '+)
;= [({:id "1683946534423-0", :msg {:a 1}}
;= {:id "1683946534423-1", :msg {:b 2}}
;= {:id "1683946534423-2", :msg {:c 3}})
;= nil]
(q/read-range :my/queue :start '- :end '+ :count 10)
;= [({:id "1683946534423-0", :msg {:a 1}}
;= {:id "1683946534423-1", :msg {:b 2}}
;= {:id "1683946534423-2", :msg {:c 3}})
;= nil]
See also:
potamic.queue/read
potamic.queue/read-next!
potamic.queue/put
set-processed!
(set-processed! queue-name & msg-ids)
Removes message(s) from Redis’ Pending Entries List. This command wraps Redis’ XACK
command. Returns vector of [?n-acked ?err]
.
NOTE: (as per official Redis docs)
“Certain message IDs may no longer be part of the PEL (for example because they have already been acknowledged), and XACK will not count them as successfully acknowledged.”
Examples:
(require '[potamic.db :as db]
'[potamic.queue :as q])
(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}
(q/create-queue! :my/queue conn)
;= [true nil]
(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683745855445-0" "1683745855445-1" "1683745855445-2"]
;= nil]
(q/read-next! 1 :from :my/queue :as :consumer/one)
;= [({:id "1683745855445-0" :msg {:a 1}})
;= nil]
(q/set-processed! :my/queue "1683745855445-0")
(q/set-processed! :my/queue '1683745855445-1 '1683745855445-2)
See also:
potamic.queue/read-next!
potamic.queue/read-pending-summary
potamic.queue/read-pending