potamic.queue

added in 0.1

Implements a stream-based message queue over Redis (or KeyDB).

create-queue!

(create-queue! queue-name conn & opts)

Creates (or resets) a queue spec and, if it doesn’t exist, optionally creates the stream key and consumer group. Queue Specs are just Clojure maps and can be reset w/o issue. However, this function will not attempt to reset the stream key or consumer group. Returns vector of [status ?err].

TIP: To perform a full reset, you must call potamic.queue/destroy-queue! then potamic.queue/create-queue!.

Option Description Default
:group Sets reader group QUEUE-NAME-group
:init-id Initial ID 0

status will be one of the following:

Code Description
:created-with-new-stream Both the spec and stream are created
:created-with-existing-stream Spec is created, stream already exists
:updated-with-new-stream Spec is reset, stream is created
:updated-with-existing-stream Spec is reset, stream already exists
nil An error occurred

Examples:

(require '[potamic.db :as db]
         '[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:spec {:uri "redis://localhost:6379/0"}
;=  :pool #taoensso.carmine.connections.ConnectionPool{..}}

(q/create-queue! :my/queue conn)
;= [:created-with-new-stream nil]

(q/create-queue! :my/queue conn)
;= [:updated-with-existing-stream nil]

(q/create-queue! :secondary/queue conn :group :secondary/group)
;= [:created-with-new-stream nil]

See also:

  • potamic.queue/destroy-queue!

destroy-queue!

(destroy-queue! queue-name conn & opts)

Destroys a queue spec, the stream key and the consumer group associated with it. Without the :unsafe option, an error will be returned if there are pending messages. Returns vector of [status ?err].

status will be one of the following:

Code Description
:spec-destroyed_stream-destroyed Spec destroyed, stream destroyed
:spec-destroyed_stream-nonexistent Spec destroyed, no stream found
:spec-nonexistent_stream-destroyed No spec found, stream destroyed
:spec-nonexistent_stream-nonexistent No spec found, no stream found
nil An error occurred
Option Description Default
:unsafe Skip Pending Entries List checks false

WARNING: Do not use :unsafe unless you know what you’re doing!

Examples:

(require '[potamic.db :as db]
'[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:spec {:uri "redis://localhost:6379/0"}
;=  :pool #taoensso.carmine.connections.ConnectionPool{..}}

(q/create-queue! :my/queue conn)
;= [:created-with-new-stream nil]

(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683933694431-0" "1683933694431-1" "1683933694431-2"] nil]

(q/read-next! 2 :from :my/queue :as :consumer/one)
;= [({:id "1683933694431-0", :msg {:a 1}} {:id "1683933694431-1", :msg {:b 2}}) nil]

;; only destroy if no pending messages (in this case will fail)
(q/destroy-queue! :my/queue conn)
;= [nil
;=  #:potamic{:err-type :potamic/db-err
;=            :err-msg "Cannot destroy my/queue, it has pending messages"
;=            :err-data
;=            {:args {:queue-name :my/queue :unsafe false}
;=             :groups [{:consumers 1
;=                       :entries-read 2
;=                       :last-delivered-id "1683933694431-1"
;=                       :name "my/queue-group"
;=                       :pending 2
;=                       :lag 1}]
;=             :consumers [{:idle 3371 :name "consumer/one" :pending 2}]}
;=            :err-file "[..]/potamic/src/potamic/queue.clj"
;=            :err-line 644
;=            :err-column 12}]

;; force-destroy, ignoring if there are pending messages
(q/destroy-queue! :my/queue conn :unsafe true)
;= [:spec-destroyed_stream-destroyed nil]

See also:

  • potamic.queue/create-queue!

get-queue

(get-queue queue-name)

Returns queue spec for queue-name.

Examples:

(require '[potamic.db :as db]
         '[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}

(q/create-queue! :my/queue)
;= [true nil]

(q/get-queue :my/queue)
;= {:queue-name :my/queue
;=  :queue-conn
;=  {:spec {:uri "redis://localhost:6379/0"}
;=          :pool #taoensso.carmine.connections.ConnectionPool{..}}
;=  :group-name :my/queue-group
;=  :redis-queue-name "my/queue"
;=  :redis-group-name "my/queue-group"}

See also:

  • potamic.queue/get-queues
  • potamic.queue/create-queue!

get-queues

(get-queues)(get-queues x)

Get all, or a subset, of queues created via potamic.queue/create-queue!. Return value varies depending on x input type.

Type Result
nil all queues
regex map filtered by searching kv space for pattern
keyword same as calling (get-queue x)

Examples:

(require '[potamic.db :as db]
         '[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}

(q/create-queue! :my/one conn)
;= [true nil]

(q/create-queue! :my/two conn)
;= [true nil]

(q/create-queue! :my/three conn)
;= [true nil]

(q/get-queues)
;= #:my{:one {:queue-name :my/one
;=            :queue-conn
;=            {:spec {:uri "redis://localhost:6379/0"}
;=             :pool #taoensso.carmine.connections.ConnectionPool{..}}
;=            :group-name :my/one-group
;=            :redis-queue-name "my/one"
;=            :redis-group-name "my/one-group"}
;=      :two {:queue-name :my/two
;=            :queue-conn
;=            {:spec {:uri "redis://localhost:6379/0"}
;=             :pool #taoensso.carmine.connections.ConnectionPool{..}}
;=            :group-name :my/two-group
;=            :redis-queue-name "my/two"
;=            :redis-group-name "my/two-group"}
;=      :three {:queue-name :my/three
;=              :queue-conn
;=              {:spec {:uri "redis://localhost:6379/0"}
;=               :pool #taoensso.carmine.connections.ConnectionPool{.. }
;=              :group-name :my/three-group
;=              :redis-queue-name "my/three"
;=              :redis-group-name "my/three-group"}}

(q/get-queues #"three")
;= #:my{:three {:queue-name :my/three
;=      :queue-conn
;=      {:spec {:uri "redis://localhost:6379/0"}
;=       :pool #taoensso.carmine.connections.ConnectionPool{}}
;=      :group-name :my/three-group
;=      :redis-queue-name "my/three"
;=      :redis-group-name "my/three-group"}}

See also:

  • potamic.queue/get-queue
  • potamic.queue/create-queue!
  • potamic.queue/delete-queue

put

(put queue-name & xs)

Put message(s) onto a queue. Returns vector of [?msg-ids ?err].

NOTE: Because put can add more than one message, on success ?msg-ids will always be a vector of ID strings, or nil on error.

NOTE: It is highly recommended to let Redis set the ID automatically. However, if setting the ID, anything that will resolve via name is acceptable. As a reminder: numbers cannot be quoted.

Examples:

(require '[potamic.db :as db]
         '[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:spec {:uri "redis://localhost:6379/0"}
;=  :pool #taoensso.carmine.connections.ConnectionPool{..}}

(q/create-queue! :my/queue conn)
;= [true nil]

;; let Redis set the ID (RECOMMENDED)
;; (all of the following are identical, in effect)
(q/put :my/queue {:a 1 :b 2 :c 3})
(q/put :my/queue :* {:a 1 :b 2 :c 3})
(q/put :my/queue "*" {:a 1 :b 2 :c 3})
(q/put :my/queue '* {:a 1 :b 2 :c 3})
;= [["1683660166747-0"] nil]

;; setting ID for a single message
(q/put :my/queue "1683743739-0" {:a 1})
;= [["1683743739-0"] nil]

;; setting the ID for a single message using wildcard.
(q/put :my/queue "1683743739-*" {:a 1})
(q/put :my/queue :1683743739-* {:a 1})
;= [["1683743739-1"] nil]

;; setting IDs for multi mode. the trailing `*` is required.
(q/put :my/queue "1683743739-*" {:a 1} {:b 2} {:c 3})
;= [["1683743739-2" "1683743739-3" "1683743739-4"] nil]

See also:

  • potamic.queue/read
  • potamic.queue/read-range
  • potamic.queue/read-next!
  • potamic.queue/read-pending-summary
  • potamic.queue/read-pending
  • potamic.queue/create-queue!

read

(read queue-name & {:keys [start block], cnt :count, :or {start 0}})

Reads messages from a queue. Returns vector of [?msgs ?err]. This function wraps Redis’ XREAD. It does not involve groups, nor does it track pending entries. Also, it limits read to a single queue (stream).

?msgs is of the form:

[{:id ID :msg MSG} ..]
; or
nil
Option Default Value
:start 0 (all messages)
:count nil (no limit)
:block nil (return immediately)

Examples:

(require '[potamic.db :as db]
         '[potamic.queue :as q]
         '[clojure.core.async :as async])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:spec {:uri "redis://localhost:6379/0"}
;=  :pool #taoensso.carmine.connections.ConnectionPool{..}}

(q/create-queue! :my/queue conn)
;= [true nil]

(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683912716308-0" "1683912716308-1" "1683912716308-2"]
;=  nil]

(q/read :my/queue)
;= [({:id "1683913507471-0", :msg {:a 1}}
;=   {:id "1683913507471-1", :msg {:b 2}}
;=   {:id "1683913507471-2", :msg {:c 3}})
;=  nil]

(q/read :my/queue :start 0)
;= [({:id "1683913507471-0", :msg {:a 1}}
;=   {:id "1683913507471-1", :msg {:b 2}}
;=   {:id "1683913507471-2", :msg {:c 3}})
;=  nil]

(async/go (async/<! (async/timeout 2000)) (q/put :my/queue {:d 4}))
;= #object[clojure.core.async.impl.channels.ManyToManyChannel ..]

;; block until above Go call executes
(q/read :my/queue :count 10 :start 0 :block [5 :seconds])
;= [({:id "1683915375766-0", :msg {:a 1}}
;=   {:id "1683915375766-1", :msg {:b 2}}
;=   {:id "1683915375766-2", :msg {:c 3}}
;=   {:id "1683915435992-0", :msg {:d 4}})
;=  nil]

See also:

  • potamic.queue/read-next!
  • potamic.queue/put

read-next!

(read-next! consume & {:keys [from as block]})

Reads next message(s) from a queue as consumer for queue group, side-effecting Redis’ Pending Entries List. Returns vector of [?msgs ?err].

?msgs is of the form:

[{:id ID :msg MSG} ..]
; or
nil

NOTE: Readers are responsible for declaring messages “processed” by calling potamic.queue/set-processed!.

Examples:

(require '[potamic.db :as db]
         '[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}

(q/create-queue! :my/queue conn :group :my/consumer)
;= [true nil]

(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1689783633670-0" "1689783633670-1" "1689783633670-2"] nil]

(q/read-next! 1 :from :my/queue :as :my/consumer)
;= [({:id "1689783633670-0", :msg {:a 1}}) nil]

(q/read-next! :all :from :my/queue :as :my/consumer :block 2000)
;= [({:id "1689783633670-1", :msg {:b 2}}
;=   {:id "1689783633670-2", :msg {:c 3}})
;=  nil]

(q/read-next! :all :from :my/queue :as :my/consumer :block 1000)
;= [nil nil]

See also:

  • potamic.queue/read
  • potamic.queue/read-pending
  • potamic.queue/read-pending-summary
  • potamic.queue/put

read-pending

(read-pending count* & opts)

Lists details of pending messages for a queue/group pair. Optionally, a consumer may be provided for sub-filtering. Returns vector of [?details ?err].

?details is of the form:

({:id ID
  :consumer NAME
  :milliseconds-since-delivered MILLISECONDS
  :times-delivered N}
 ..)
Option Description Default
:from Queue name none, required
:for Consumer name nil, get entire group
:start Start ID "-" (beginning)
:end End ID "+" (end)

Examples:

(require '[potamic.db :as db]
         '[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}

(q/create-queue! :my/queue conn)
;= [true nil]

(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683944086236-0" "1683944086236-1" "1683944086236-2"]
;=  nil]

(q/read-next! 1 :from :my/queue :as :consumer/one)
;= [({:id "1683944086236-0", :msg {:a 1}})
;=  nil]

(q/read-pending 10 :from :my/queue :for :consumer/one)
;= [({:id "1683944086236-0"
;=    :consumer "consumer/one"
;=    :milliseconds-since-delivered 9547
;=    :times-delivered 1})
;=  nil]

(q/read-pending 10 :from :my/queue :for :consumer/one :start '- :end '+)
;= [({:id "1683944086236-0"
;=    :consumer "consumer/one"
;=    :milliseconds-since-delivered 16768
;=    :times-delivered 1})
;= nil]

(q/read-pending 1
                :from :my/queue
                :for :consumer/one
                :start "1683944086236-0"
                :end "1683944086236-2")
;= [({:id "1683944086236-0"
;=    :consumer "consumer/one"
;=    :milliseconds-since-delivered 144556
;=    :times-delivered 1})
;=  nil]

See also:

  • potamic.queue/read-pending-summary
  • potamic.queue/set-processed!

read-pending-summary

(read-pending-summary queue-name)

Lists all pending messages, for all consumers, for queue. Returns vector of [?summary ?err].

?summary is of the form:

{:total N
 :start ID
 :end ID
 :consumers {CONSUMER-NAME N-PENDING}}

NOTE: CONSUMER-NAME is coerced by the rules of util/<-str. The string "my/consumer1" becomes the keyword :my/consumer.

Examples:

(require '[potamic.db :as db]
         '[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}

(q/create-queue! :my/queue conn)
;= [true nil]

(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683745855445-0" "1683745855445-1" "1683745855445-2"]
;=  nil]

(q/read-next! 2 :from :my/queue :as :consumer/one)
;= [({:id "1683745855445-0" :msg {:a 1}}
;=   {:id "1683745855445-1" :msg {:b 2}})
;=  nil]

(q/read-next! 1 :from :my/queue :as :consumer/two)
;= [({:id "1683745855445-2" :msg {:c 3}})
;=  nil]

(q/read-pending-summary :my/queue)
;= [{:total 3
;=   :start "1683745855445-0"
;=   :end "1683745855445-2"
;=   :consumers #:consumer{:one 2 :two 1}}
;=  nil]

See also:

  • potamic.queue/read-pending
  • potamic.queue/set-processed!

read-range

(read-range queue-name & opts)

Reads a range of messages from a queue. Returns vector of [?msgs ?err]. This function wraps Redis’ XRANGE. It does not involve groups, nor does it track pending entries.

?msgs is of the form:

[{:id ID :msg MSG} ..]
; or
nil
Option Default Value
:start - (oldest)
:end + (newest)
:count nil (no limit)

Examples:

(require '[potamic.db :as db]
'[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}

(q/create-queue! :my/queue conn)
;= [true nil]

(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683946534423-0" "1683946534423-1" "1683946534423-2"]
;=  nil]

(q/read-range :my/queue :start '- :end '+)
;= [({:id "1683946534423-0", :msg {:a 1}}
;=   {:id "1683946534423-1", :msg {:b 2}}
;=   {:id "1683946534423-2", :msg {:c 3}})
;=  nil]

(q/read-range :my/queue :start '- :end '+ :count 10)
;= [({:id "1683946534423-0", :msg {:a 1}}
;=   {:id "1683946534423-1", :msg {:b 2}}
;=   {:id "1683946534423-2", :msg {:c 3}})
;=  nil]

See also:

  • potamic.queue/read
  • potamic.queue/read-next!
  • potamic.queue/put

set-processed!

(set-processed! queue-name & msg-ids)

Removes message(s) from Redis’ Pending Entries List. This command wraps Redis’ XACK command. Returns vector of [?n-acked ?err].

NOTE: (as per official Redis docs)

“Certain message IDs may no longer be part of the PEL (for example because they have already been acknowledged), and XACK will not count them as successfully acknowledged.”

Examples:

(require '[potamic.db :as db]
         '[potamic.queue :as q])

(def conn (db/make-conn :uri "redis://localhost:6379/0"))
;= {:uri "redis://localhost:6379/0", :pool {}}

(q/create-queue! :my/queue conn)
;= [true nil]

(q/put :my/queue {:a 1} {:b 2} {:c 3})
;= [["1683745855445-0" "1683745855445-1" "1683745855445-2"]
;=  nil]

(q/read-next! 1 :from :my/queue :as :consumer/one)
;= [({:id "1683745855445-0" :msg {:a 1}})
;=  nil]

(q/set-processed! :my/queue "1683745855445-0")

(q/set-processed! :my/queue '1683745855445-1  '1683745855445-2)

See also:

  • potamic.queue/read-next!
  • potamic.queue/read-pending-summary
  • potamic.queue/read-pending