Core Async Essentials

June 30, 2020
see all posts

INTRODUCTION


Core Async is a Clojure/Script library for async programming. Tony Hoare's Communicating Sequential Processes 1 was influential in its development. It is a tool to model concurrent (not parallel) tasks. Parallel tasks are independent of each other; that is, they do not require coordination. They can be modelled deterministically and executed simultaneously. Concurrent tasks, on the other hand, require coordination among workers 2.

Workers are, in the core async context, either producers or consumers. On JVM, these are actual threads or lightweight, cooperatively scheduled threads called go blocks 3. In single-threaded environments such as nodejs or the browser, go blocks are the only choice for workers. Go blocks are scheduled on a fixed-size thread pool. By default, the size of this thread pool is 8.

The golden rule of async programming is: never block the event loop. While working with go blocks, make sure they are free of blocking operations. A blocking operation in a go block will block the underlying executor4.

A worker has to wait if the producing or consuming operation is pending. You can choose to implement this wait as blocking or parking. The only way these workers continue execution is when the worker, on the other end, completes a pending operation.

ANATOMY OF A CHANNEL


A channel in core async is like a baggage carousel at the airport. Travellers are the consumer, and airline staff are the producers. If the carousel is full, the staff cannot put any more bags on it. They have to wait until some travellers take their stuff out. If the carousel is empty, the passenger cannot take their baggage. They have to wait for the bags to arrive. Channel, like the carousel, acts as a rendezvous point between the producers and the consumers. Data put on the channel is added to its buffer. Size of the buffer is customizable, but unbounded buffers are not allowed.

Internally a channel maintains three queues. The puts and takes to enqueue all pending put and take operations. And the buffer to hold the data. The put and the take operations use a channel level mutex to make these operations thread-safe. Both put! and take! functions accepts a (callback) handler. It runs when these operations complete. When they complete immediately, then the handler runs on the same thread. Otherwise, it executes asynchronously.

PUT OPERATION


A put operation on the channel completes immediately when there is a pending take operation waiting on the other end or when there is capacity in the buffer. Otherwise, the put operation has to wait in the puts queue. An unbuffered (same as a buffer of size 0) channel can queue only MAX-QUEUE-SIZE put operations. The default value of MAX-QUEUE-SIZE is 1024.

(require '[cljs.core.async :as a]
		  '[cljs.pprint :as pprint])


;; to inspect a channel
(defn inspect-unbuffered-channel
  [ch]
	(pprint/print-table
	  [{:pending-puts  (-> ch .-puts .-length)
		:pending-takes (-> ch .-takes .-length)}]))


;; putting 1025 items in the channel
(let [ch (a/chan)]
  (dotimes [i 1025] ; <== make it 1024 to fix the ERROR
	(a/put! ch i))
	(inspect-unbuffered-channel ch))

Exceeding this limit leads to an exception when the channel uses the default, fixed-size buffer. This Behaviour is, of course, customizable. For example, a Sliding or a Dropping buffers drops the data upon reaching the limit.

For a channel with a buffer of size 1, we can put 1025 values.

;; to inspect channel and its buffer
(defn inspect-buffered-channel
	[ch]
	(pprint/print-table
	 [{:pending-puts  (-> ch .-puts .-length)
	   :pending-takes (-> ch .-takes .-length)
	   :buf-size      (-> ch .-buf .-n)
	   :buf-items     (-> ch .-buf .-buf .-length)}]))


(let [ch-with-buf-1 (a/chan 1)]

  (prn "The first put!")
  (a/put! ch-with-buf-1 :something-useless)
  (inspect-buffered-channel ch-with-buf-1)

  (prn "More puts")
  (dotimes [i 1024] ; <== Make it 1025 to CRASH
	(a/put! ch-with-buf-1 i))
  (inspect-buffered-channel ch-with-buf-1))

The first put! completes immediately filling up the buffer of size 1. The rest 1024 put operations get queued. Adding one more pending put! will lead to an overflow.

A summary of different put! scenarios.

TAKE OPERATION


A take operation completes immediately when the buffer is not empty or when there is a pending put operation on the channel. On completion, the provided handler is run.

In the following example, we can see there is one item in the buffer because put! was completed immediately. Then we take it out of the channel.

;; Channel with a buffer of size 1
(let [ch (a/chan 1)]
  (a/put! ch :the-first-thing)

  ;; print contents of the channel
  (prn "contents of the channel:")
  (inspect-buffered-channel ch)

  (a/take! ch #(prn %)))

When the buffer is empty, in the following example, a take! operation has to wait for the put! operation on the channel to complete. On inspecting the channel, we can see the pending take!. Putting on the channel completes it.

(let [ch (a/chan 1)]
  (a/take! ch #(prn %))

  (prn "contents of the channel:")
  (inspect-unbuffered-channel ch)

  (a/put! ch :an-item))

When a take operation cannot complete, it gets queued up in the takes queue. At max, this queue can accommodate only MAX-QUEUE-SIZE pending take operations. By default, like with puts queue, this value is 1024. Any take operation after that, without a put in the meantime will cause an exception.

(let [ch (a/chan)]
  ;; Take 1024 times on a channel with empty buffer
  (dotimes [i 1024]  ; <== Make it 1023 to fix the ERROR
	(a/take! ch #(prn %)))

  ;; See whats in the channel
  (prn "contents of the channel:")
  (inspect-unbuffered-channel ch)

  ;; This take! will cause an exception
  (a/take! ch #(prn %)))

A summary of different take! scenarios.

CLOSING A CHANNEL

Metamorphically, the close operation is like putting the lid on a container with a faucet. No more stuff can be put into it. But it can be taken out until the container is empty. The close! function closes the channel. After it is closed, it no longer accepts any new put operations.

(def chan (a/chan))

(a/put! chan :foo)

;; closing the channel
(a/close! chan)

;; put! is rejected, returns `false`
(a/put! chan :bar)

Data in the channel after it is closed can be taken until it is drained.

(a/take! chan #(prn "we got back:" %))

A take! operation on a closed channel that is drained completes with nil.

	(a/take! chan #(prn "take completed with:" %))

If there are pending take! operations, on closing the channel, they will complete by receiving nil values.

(let [ch (a/chan)]

  (a/take! ch #(prn "take completed with: " %))

  ;; close the channel without putting anything
  (a/close! ch))

The timeout function returns a channel that closes itself after t milliseconds.

(let [c (a/timeout 1000)] ; <== Make it 3000 & count to 3
  (a/take! c #(prn "Channel has closed returning-> " %)))

The pending take! operation completes after the channel is closed. It receives nil from the channel.

NOTES


  1. Communicating Sequential Processes

  2. A clear distinction made between concurrency and parallelism in the book Joy of Clojure by Chris Houser and Michael Fogus.

    Parallelism refers to partitioning a task into multiple parts, each run at the same time. Typically, parallel tasks work toward an aggregate goal and the result of one doesn't affect the behaviour of any other parallel task, thus maintaining determinacy. Concurrency refers to the execution of disparate tasks at roughly the same time, each sharing a common resource. The results of concurrent tasks often affect the behaviour of other concurrent tasks, and therefore contain an element of nondeterminism.

  3. core.async/go a macro that, through inversion of control, converts synchronous block of code to an async block of code.

  4. Go blocks run on a fixed size thread pool. Due to an accidental blocking operation, for example, a blocking HTTP request, inside the go block the thread that runs it will be tied up waiting for this operation to complete. This means this thread is now effectively of the pool. If 8 such blocking operations run at the same time, it can freeze the whole process bringing down the production system (how do I even know that :) )

Keep in touch!