Channels in Kotlin — part one

Communication essentials for Coroutines

Bob Dahlberg
The Startup

--

Photo by Taylor Vick on Unsplash

Using coroutines in Kotlin comes with a few struggles since the way of writing your code, and reasoning about asynchronicity is somewhat different from what we are used to.

The construct that helped me get my head around it all was the Channel. If you’ve worked with Rx or similar before, you should also be able to draw parallels and have a smoother transition to coroutines. Also, if you understand Channels, the Flow construct will be easier to grasp as well.

What is a Channel?

As described in the API reference, a channel is a synchronization primitive, a way of communicating streams of values between coroutines. It implements two interfaces, SendChannel and ReceiveChannel. The names are quite self-explanatory, and in the purest form, you can use a Channel to send values and to receive them.

Both the send and the receive functions are suspending on a Channel. It means that we have to be inside of a Coroutine to be allowed to call them. It also guides us not to use them in the same Coroutine, here’s why.

val channel = Channel<String>()
launch {
channel.send("value")
channel.receive()
}

This code snippet never terminates. Since the send call is suspending and the only way to release it is via a request to the receive function, it will stay suspended forever. So you need to take into consideration the order of execution. What if we switch the order in the example to have received first? Well, it’s the same result. Since the receive call is also suspending, and will only be released from the suspension upon a request to the send function, we have the same problem.

This behavior is the default for a Channel. That the send and the receive call needs to meet and exchange the value sort of speak, almost like a rendezvous. Thus the type of this default behavior is Channel.RENDEZVOUS.

Other types of Channel

There are four types to take into consideration, and they can alter the behavior of the code example above.

Channel.RENDEZVOUS  
Channel.BUFFERED
Channel.UNLIMITED
Channel.CONFLATED

Buffered

The buffered type means that you can set a fixed buffer that the Channel can store. When invoking send, if there’s room in the buffer, the call won’t suspend. But if you have a buffer of one and sending twice without any request to receive, it will suspend again.

To specify a buffered channel, you send in the size of the buffer when creating the Channel — seen in the updated example below, where I entered 5 (five) as the capacity of the buffer.

val channel = Channel<String>(5)
launch {
channel.send("value")
channel.receive()
}

And now the code terminates. Since send don’t suspend due to the buffer specified, and the receive call doesn’t suspend since there’s a value in the Channel it can receive. And just to let you know, this example code is rather useless. But it’s an excellent way to show how rendezvous and buffered channels work.

Unlimited

The next type of Channel is the unlimited one, which means that sending on it will never suspend. It just adds to the unlimited buffer. On the receiving end, it’s the same behavior as always. If there is no value in the Channel, the invocation will suspend until there is.

It can come in handy when you have a stream of events that sometimes comes very often and sometimes more or less idles. Like tracking events, when the user is active, we trigger tons of events. When the user is, for example, reading, no events get triggered. And on the receiving end, we want to batch the events instead of reacting to each of them. Here’s a naive PoC of the described scenario.

val channel = Channel<Event>(Channel.UNLIMITED)
launch {
onTrackEvent {
channel.send(it)
}
}
launch {
while
(!channel.isClosedForReceive) {
repeat(25) {
batch.add(channel.receive())
}
batch.send()
}
}

In the first Coroutine launched, we listen to tracking events and pass them on to the Channel. And in the second one, we check if the Channel is closed before acting on the next batch of events. In real life, we would have to be less naive with the batching since the Channel can get closed at any time. We’ll get back to why we should close channels later.

Conflated

This channel type is the most interesting to me. What it does is that it only ever keeps the latest value sent to it. So it’s the same for this as the unlimited that sending on it will never suspend, it will instead replace the previous value if any. And on the receiving end, it will as always suspend on an empty channel. If you, like me, are used to Rx, you probably think of BehaviorSubject or similar, and if you do, I need to remind you that once anyone has received the most recent value from a channel, it is removed from the Channel. On a BehaviorSubject, each en every subscriber will get the latest observed value.

One use case for this would be if you have a UI that updates a value every fifth second, let’s make it viewers of a live stream to be concrete. We don’t care about everything that happens all the time. We want to update to the latest count of viewers on each update.

sealed class Event {
object Enter : Event()
object Leave : Event()
}
val motion = Channel<Event>(Channel.UNLIMITED)
val viewers = Channel<Int>(Channel.CONFLATED)
suspend fun enter() = motion.send(Enter)
suspend
fun leave() = motion.send(Leave)
launch(Dispatchers.DEFAULT) {
var count = 0
for(msg in motion) {
when(msg) {
is Enter -> count++
is Leave -> count--
}
viewers.send(count)
}
}
launch(Dispatchers.MAIN) {
while
(!viewers.isClosedForReceive) {
showNumberOfViewers(viewers.receive())
delay(5000)
}
}

The code snippet is quite an extensive one to show the conflated channel type. But I also wanted to show an example with more to it, to give you a sense of how powerful Channels are.

To walk through it real fast, we have one unlimited Channel that receives each time a viewer enters or leaves. By calling either the enter or the leave function, those functions send the corresponding event on the unlimited motion channel.

In our first Coroutine, we iterate over the Channel that receives all events. This is powerful since the for-loop itself will handle the suspending and possible closing of the Channel. The Coroutine also has a local state in the count variable, and since it’s only modified within the Coroutine, it is safe to do that. (No matter how many threads or coroutines are calling the Channel, we only receive within one coroutine).

On each update, our Coroutine that listens to events trigger an update to the conflated viewers channel with the current count of viewers as value. In the second Coroutine, launched on the Main dispatcher (used for UI updates on Android, for example), we receive on that Channel. But after each time we receive, we go into a delay for five seconds. When the delay is over, if the Channel is still open to receive, we try to get the latest value by calling receive once again. And we will always only show the latest known value, ignoring all fluctuation on the value during the delay.

If you are knowledgable around coroutines and suspending, you might see one flaw or quirk in the example above. That is that we are not guaranteed to update every five seconds, do you see why?
Correct, the viewers.receive() function will suspend if the viewers channel is empty, for an unknown amount of time. That’s a good thing, in my opinion, since we don’t update the UI if we don’t have to. Because if the viewers channel is empty, nothing has happened to the count of viewers. So our UI is up to date. And since we are suspending, we will now update as soon as there is a change.

Next up

Now we understand what a Channel is and how the different types of Channels work. In the following story, we’ll go thru more practical examples of how to use it. We’ll discover patterns and extension-functions that help us do it fluently. Then we’ll also see why Channels aren’t a replacement for Rx.

Part two available here: https://medium.com/@dahlberg.bob/channels-in-kotlin-part-two-7d52abbc5b6e

--

--

Bob Dahlberg
The Startup

Lead Developer at Qvik, Coach, Agile Thinker, GDG Lead.