If you are not familiar with collectable, check out Hex Docs for more detailed introduction.
If the functions in Enumerable are about taking values out, then Collectable is about collecting those values into a structure.
While Enumerable is about the act of pulling values out of something. Collectable is about the act of pushing values into a something. Emphasis is on the act part. The something itself is not important. The something might be a list, map, TCP connection or a file on disk. Elixir’s Enumerable
and Collectable
protocols are an implementation of this abstraction.
In this article we will understand how to make collectable (the act of pushing something) composable, and why we should care about it. We also address an unique readability issue related to Collectable. And ensure back-pressure, error handling is in check while building it.
We need it to be composable for the same reason why we need Enum
and Stream
module to work with Enumerable. Currently there is no equivalent modules to compose Collectable values.
Let’s take an example to illustrate this better.
You have infinite stream of some kind of logs. Each line in the stream can be of different level – info, warn, error. Our task is to push logs to somewhere, and this somewhere is unknown at this point. Not only that we need the destination to be configurable, you also need complete control over how it is pushed. You might want to push logs to AWS s3, to a file on disk, push error logs to console and skip remaining logs, or buffer logs before making expensive network calls.
The log stream might look like this:
=
log_stream Stream.repeatedly(fn ->
= Base.encode64(:crypto.strong_rand_bytes(5))
random_str
if :rand.uniform_real() < 0.5 do
"warn: #{random_str}"
else
"error: #{random_str}"
end
end)
## `sink` can be aws S3, File on disk, list, string
# Stream.into(log_stream, sink)
How would you model this keeping the flexibility in mind?
Cases like these are the reason why we need collectable to be composable. Hopefully at the end of the post we solve this better using composable collectable.
Elixir’s Collectable protocol is an implementation of the idea. To avoid getting lost in the details of the protocol, let’s build our own push container from bottom-up approach. We will get back to Collectable protocol after a while.
So what we we want is something where we can push stuff. We don’t care about implementation details of this something.
Lets represent this aspect using function called push
. We call push
function with the “where we want to push” as first argument and “the thing we want to push” as second argument. So if something wants to be a push container, it only has to implement push(something, anything)
function.
defmodule PushContainer do
# string container
def push(string, value) when is_binary(string) do
<> value
string end
# list container
def push(list, value) when is_list(list) do
++ [value]
list end
# void container
def push(nil, _) do
nil
end
end
defmodule Example do
import PushContainer
def random_binaries(container, count) do
Enum.reduce(1..count, container, fn _, container ->
= :crypto.strong_rand_bytes(5)
bin
push(container, bin)end)
end
end
import Example
# returns values as a single binary
<<>>, 10) |> IO.inspect(label: "binary")
random_binaries(
# returns values as a list of binaries
10) |> IO.inspect(label: "list")
random_binaries([],
# discards the values and returns nil (similar to /dev/null)
nil, 10) |> IO.inspect(label: "nil") random_binaries(
Container here does not have to be a data structure, It can be a process, gen-server, socket or file. While data structure only need push
function to work, for resources we need a way to initialise and release. For example, to create a file container, first step (initialise) to open it, then push values and at the end close it (release). So let’s add support to these stages. While at it lets group all these 3 functions into a module so it is easier to build container for different type.
defmodule PushContainer do
defstruct [:acc, :init, :func, :finish]
@type acc :: any
@spec build((() -> acc), (any, acc -> acc), (acc -> any)) :: PushContainer.t()
def build(init, func, finish) do
__MODULE__{init: init, func: func, finish: finish}
%end
@spec init(PushContainer.t()) :: PushContainer.t()
def init(%PushContainer{init: init} = pc) do
PushContainer{pc | acc: init.(), init: :done}
%end
@spec push(PushContainer.t(), any) :: PushContainer.t()
def push(%PushContainer{acc: acc, func: func} = pc, value) do
PushContainer{pc | acc: func.(value, acc)}
%end
@spec push(PushContainer.t()) :: PushContainer.t()
def finish(%PushContainer{acc: acc, finish: finish}) do
.(acc)
finishend
end
# change exmaple to adopt this new interface
defmodule Example do
def random_binaries(%PushContainer{} = pc, count) do
= PushContainer.init(pc)
pc
=
pc Enum.reduce(1..count, pc, fn _, pc ->
= :crypto.strong_rand_bytes(5)
bin PushContainer.push(pc, bin)
end)
PushContainer.finish(pc)
end
end
Lets build few push containers using new interface:
=
binary_pc PushContainer.build(
fn -> <<>> end,
fn val, acc -> acc <> val end,
fn acc -> acc end
)
=
list_pc PushContainer.build(
fn -> [] end,
fn val, acc -> acc ++ [val] end,
fn acc -> acc end
)
=
file_pc PushContainer.build(
fn -> File.open!("somefile.txt", [:write, :binary]) end,
fn val, file ->
:ok = IO.binwrite(file, val)
fileend,
fn file -> File.close(file) end
)
import Example
10) |> IO.inspect(label: :binary)
random_binaries(binary_pc, 10) |> IO.inspect(label: :list)
random_binaries(list_pc,
:ok = random_binaries(file_pc, 10)
File.read!("somefile.txt")
Now that we are familiar with push containers as building blocks. Let’s head back to composability aspect.
In the above example the function call random_binaries(..., 1000)
is pushing binaries to the container. Now say we want to encode the binaries using base64 before we push. How would we support this?
As before, let’s create a function called map
which wraps the original push
for the container with a given function and return it as a new push container. It is same as Enum.map
but for push container.
defmodule PushContainerHelpers do
@spec map(PushContainer.t(), (any -> any)) :: PushContainer.t()
def map(%PushContainer{} = inner_pc, func) do
PushContainer.build(
fn -> PushContainer.init(inner_pc) end,
fn elem, inner_pc -> PushContainer.push(inner_pc, func.(elem)) end,
fn inner_pc -> PushContainer.finish(inner_pc) end
)end
end
Notice that map
takes a push container and returns a push container.
import PushContainerHelpers
=
encoded
random_binaries(&Base.encode64/1),
map(binary_pc, 10
)
IO.puts("Base64 encoded binaries: #{strinencoded_binaries}")
# string -> encode64 -> upcase -> collect
=
upcased_encoded
random_binaries(
map(
map(
binary_pc,&String.upcase/1
),&Base.encode64/1
),10
)
IO.puts("Base64 encoded binaries in upcase: #{upcased_encoded_binaries}")
We can compose push container like this by plugging functions dynamically and building new push containers on-the-fly. This works but as you can see in the last example it not easy to read. Let’s see if it improves if we format it differently or if we use pipe.
Does formatting improves the readability in the above examples?
import PushContainerHelpers
# string -> encode64 -> upcase -> collect-as-string
# formatting as single line
&String.upcase/1), &Base.encode64/1), 10)
random_binaries(map(map(binary_pc,
# formatting using pipe
binary_pc|> map(&String.upcase/1)
|> map(&Base.encode64/1)
|> random_binaries(10)
As you can see formatting is not helping. Using our beloved pipe makes reading even worse! We have to read the expressions from bottom-to-top to understand what is going on. We can visualise it to make the issue more clear.
Sort of works for small chains, but we still need to address it for long expressions.
Using pipes does not work at all with push containers!
The issue is that we prefer to read an expression as INPUT, FUNCTION, OUTPUT
in that exact order. This is especially important when we have nested function calls with multiple parameters. Why we like expressions to be in this order? I don’t know for sure but my guess is that, it is so because that’s how most programming languages are modelled. We are trained to read code like that and expect order-of-reading with logical order-of-execution. It also might be because most our languages are written from left-to-right, top-to-bottom (there are few exceptions to this). Or more fundamentally the natural order of cause and effect. If order of an expression is not INPUT, FUNCTION, OUTPUT
then that will counter intuitive. This is also the reason why we have pipe operator |>
in the first place. Pipe is basically a macro-magic we use to make function chaining more readable by changing the order expressions.
# Enum functions without pipe:
#
# Enum.map(...)
# FUNCTION, INPUT
#
# OUPUT is implicit here as return value
Enum.map(Enum.map(random_binaries(10), &String.upcase/1), &Base.encode64/1)
# Enum functions with pipe
#
# ... |> Enum.map()
# INPUT, FUNCTION
10)
random_binaries(|> Enum.map(&String.upcase/1)
|> Enum.map(&Base.encode64/1)
Pipe works for Enum because first argument we pass to the Enum
function is the “input”. So moving input expression a line before the function call makes the movement of values appear natural.
It is more obvious if we visualise this:
As we can see, using pipe with when multiple functions are involved is pleasant to read. By using pipe we turned FUNCTION INPUT
into INPUT FUNCTION
which matches perfectly with how we read code.
Why pipe is not working for push container?
# Without pipe:
#
# FUNCTION, OUTPUT, INPUT
10)
random_binaries(binary_pc,
# With pipe:
#
# OUTPUT, FUNCTION, INPUT
binary_pc|> random_binaries(10)
The “push container” value (binary_pc
in the example) we are passing is not an input we pass to the operation in logical sense. It is something where the function should push the output. So using pipe here breaks our expectation and looks confusing.
How to solve this?
Let’s correct the order FUNCTION, OUTPUT, INPUT
to FUNCTION, INPUT, OUTPUT
by making push-container (binary_pc
) value as last argument to the function.
# formatting as single line
10, map(&Base.encode64/1, map(&String.upcase/1, binary_pc)))
random_binaries(
# formatting as multiple lines
random_binaries(10,
map(&Base.encode64/1,
map(&String.upcase/1,
binary_pc
)
) )
When visualise this:
This reads much better and nearly addresses our concern. Although formatting function nested function calls as multiple lines make it more readable, we could go one step further and create a new operator like |>
for cases like this. Our new operator takes two expressions A and B, and appends B as last argument to A. One important thing to note is that the operator must be right associative for this to work. Why it won’t work otherwise is left as exercise to readers :)
Let’s pick +++
operator for demonstration purpose which is right associate.
defmodule PushOp do
defmacro left +++ right do
= left
{func, meta, args} ++ [unq]}
{func, meta, args end
end
require PushOp
import PushOp
=
string 10)
random_binaries(+++ map(&Base.encode64/1)
+++ map(&String.upcase/1)
+++ binary_pc
IO.puts("=> #{string}")
This is even better than previous code, since we got back to our INPUT, FUNCTION, OUTPUT
ordering. Exactly like how pipe operator worked for Enum. To recap, all we did was to make collectable param as last param and created helper macro to improve the ordering of the expression.
Now that readability side quest is complete, lets get back to the main mission.
Implementing other basic helper functions like reduce, filter etc is straight forward. Let’s bundle all functions as Contr
module. Similar to Enum
module for Enumerable
.
defmodule Contr do
def reduce(func, %PushContainer{} = pc) do
PushContainer.new(
fn ->
PushContainer.init(pc)
end,
fn elem, pc ->
.(elem, pc)
funcend,
fn pc ->
PushContainer.finish(pc)
end
)end
# redefining map using reduce
def map(func, %PushContainer{} = pc) do
reduce(fn elem, pc ->
PushContainer.call(pc, func.(elem))
end,
pc
)end
def filter(func, %PushContainer{} = pc) do
reduce(fn elem, pc ->
if func.(elem) do
PushContainer.call(pc, elem)
else
pcend
end,
pc
)end
def split(func, %PushContainer{} = true_pc, %PushContainer{} = false_pc) do
PushContainer.new(
fn ->
PushContainer.init(true_pc), PushContainer.init(false_pc)}
{end,
fn elem, {t, f} ->
if func.(elem) do
PushContainer.push(t, elem), f}
{else
PushContainer.push(f, elem)}
{t, end
end,
fn {t, f} ->
PushContainer.finish(t), PushContainer.finish(f)}
{end
)end
end
Equipped with these, we can compose push containers in multiple ways similar to Enum.
import Example
import PushOp
=
_string 10)
random_binaries(+++ Contr.map(fn bin -> Base.encode64(bin) end)
+++ Contr.filter(fn str -> str =~ ~r/^[a-zA-Z=]+$/ end)
+++ binary_pc
# => "olHQItQ=OWPYlAY="
Only thing pending now is to bridge the PushContainer
and Collectable
protocol for interoperability, so we can take advantage of modules which already implements Collectable protocol.
defmodule PushContainer do
defstruct [:acc, :init, :func, :finish]
def build(init, func, finish) do
__MODULE__{init: init, func: func, finish: finish}
%end
def init(%PushContainer{init: init} = pc) do
PushContainer{pc | acc: init.(), init: :done}
%end
def push(%PushContainer{acc: acc, func: func} = pc, value) do
PushContainer{pc | acc: func.(value, acc)}
%end
def finish(%PushContainer{acc: acc, finish: finish}, status) do
if is_function(finish, 1) do
.(acc)
finishelse
.(acc, status)
finishend
end
def from_collectable(collectable) do
PushContainer.build(
fn ->
Collectable.into(collectable)
end,
fn {acc, collector_func}, elem ->
.(acc, {:cont, elem})
collector_funcend,
fn {acc, collector_func}, status ->
.(acc, status)
collector_funcend
)end
end
defimpl Collectable, for: PushContainer do
def into(%PushContainer{} = pc) do
= fn
collector_func :cont, elem} ->
acc, {PushContainer.push(acc, elem)
:halt ->
acc, PushContainer.finish(acc, :halt)
:done ->
acc, PushContainer.finish(acc, :done)
end
= PushContainer.init(pc)
acc
{acc, collector_func}end
end
import Example
import PushOp, only: [{:+++, 2}]
=
square_set_container Contr.map(
fn num -> num * num end,
PushContainer.from_collectable(MapSet.new())
)
= Enum.into(1..10, square_set_container)
square_set # => #MapSet<[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]>
As for the original sample problem about stream of logs:
import Example
import PushOp, only: [{:+++, 2}]
defmodule Demo do
def collect_logs(sink) do
log_stream|> Stream.take(10)
|> Enum.into(sink)
end
end
=
error_sink Contr.map(fn "error: " <> line -> line end)
+++ Contr.map(fn line -> String.upcase(line) end)
+++ list_pc
=
warn_sink Contr.map(fn "warn: " <> line -> line end)
+++ Contr.map(fn line -> line <> "\n" end)
+++ binary_pc
Demo.collect_logs(
Contr.split(
&String.starts_with?(&1, "warn: "),
warn_sink,
error_sink
) )
More about streams: