Composable Collectable in Elixir

Collectable

If you are not familiar with collectable, check out Hex Docs for more detailed introduction.

If the functions in Enumerable are about taking values out, then Collectable is about collecting those values into a structure.

While Enumerable is about the act of pulling values out of something. Collectable is about the act of pushing values into a something. Emphasis is on the act part. The something itself is not important. The something might be a list, map, TCP connection or a file on disk. Elixir’s Enumerable and Collectable protocols are an implementation of this abstraction.

In this article we will understand how to make collectable (the act of pushing something) composable, and why we should care about it. We also address an unique readability issue related to Collectable. And ensure back-pressure, error handling is in check while building it.

Why we need collectable to be composable ?

We need it to be composable for the same reason why we need Enum and Stream module to work with Enumerable. Currently there is no equivalent modules to compose Collectable values.

Let’s take an example to illustrate this better.

You have infinite stream of some kind of logs. Each line in the stream can be of different level – info, warn, error. Our task is to push logs to somewhere, and this somewhere is unknown at this point. Not only that we need the destination to be configurable, you also need complete control over how it is pushed. You might want to push logs to AWS s3, to a file on disk, push error logs to console and skip remaining logs, or buffer logs before making expensive network calls.

The log stream might look like this:

log_stream =
  Stream.repeatedly(fn ->
    random_str = Base.encode64(:crypto.strong_rand_bytes(5))

    if :rand.uniform_real() < 0.5 do
      "warn: #{random_str}"
    else
      "error: #{random_str}"
    end
  end)


## `sink` can be aws S3, File on disk, list, string
# Stream.into(log_stream, sink)

How would you model this keeping the flexibility in mind?

Cases like these are the reason why we need collectable to be composable. Hopefully at the end of the post we solve this better using composable collectable.

Push Container

Elixir’s Collectable protocol is an implementation of the idea. To avoid getting lost in the details of the protocol, let’s build our own push container from bottom-up approach. We will get back to Collectable protocol after a while.

So what we we want is something where we can push stuff. We don’t care about implementation details of this something.

Lets represent this aspect using function called push. We call push function with the “where we want to push” as first argument and “the thing we want to push” as second argument. So if something wants to be a push container, it only has to implement push(something, anything) function.

defmodule PushContainer do
  # string container
  def push(string, value) when is_binary(string) do
    string <> value
  end

  # list container
  def push(list, value) when is_list(list) do
    list ++ [value]
  end

  # void container
  def push(nil, _) do
    nil
  end
end

defmodule Example do
  import PushContainer

  def random_binaries(container, count) do
    Enum.reduce(1..count, container, fn _, container ->
      bin = :crypto.strong_rand_bytes(5)
      push(container, bin)
    end)
  end
end

import Example

# returns values as a single binary
random_binaries(<<>>, 10) |> IO.inspect(label: "binary")

# returns values as a list of binaries
random_binaries([], 10) |> IO.inspect(label: "list")

# discards the values and returns nil (similar to /dev/null)
random_binaries(nil, 10) |> IO.inspect(label: "nil")

Container here does not have to be a data structure, It can be a process, gen-server, socket or file. While data structure only need push function to work, for resources we need a way to initialise and release. For example, to create a file container, first step (initialise) to open it, then push values and at the end close it (release). So let’s add support to these stages. While at it lets group all these 3 functions into a module so it is easier to build container for different type.

defmodule PushContainer do
  defstruct [:acc, :init, :func, :finish]

  @type acc :: any

  @spec build((() -> acc), (any, acc -> acc), (acc -> any)) :: PushContainer.t()
  def build(init, func, finish) do
    %__MODULE__{init: init, func: func, finish: finish}
  end

  @spec init(PushContainer.t()) :: PushContainer.t()
  def init(%PushContainer{init: init} = pc) do
    %PushContainer{pc | acc: init.(), init: :done}
  end

  @spec push(PushContainer.t(), any) :: PushContainer.t()
  def push(%PushContainer{acc: acc, func: func} = pc, value) do
    %PushContainer{pc | acc: func.(value, acc)}
  end

  @spec push(PushContainer.t()) :: PushContainer.t()
  def finish(%PushContainer{acc: acc, finish: finish}) do
    finish.(acc)
  end
end

# change exmaple to adopt this new interface

defmodule Example do
  def random_binaries(%PushContainer{} = pc, count) do
    pc = PushContainer.init(pc)

    pc =
      Enum.reduce(1..count, pc, fn _, pc ->
        bin = :crypto.strong_rand_bytes(5)
        PushContainer.push(pc, bin)
      end)

    PushContainer.finish(pc)
  end
end

Lets build few push containers using new interface:

binary_pc =
  PushContainer.build(
    fn -> <<>> end,
    fn val, acc -> acc <> val end,
    fn acc -> acc end
  )

list_pc =
  PushContainer.build(
    fn -> [] end,
    fn val, acc -> acc ++ [val] end,
    fn acc -> acc end
  )

file_pc =
  PushContainer.build(
    fn -> File.open!("somefile.txt", [:write, :binary]) end,
    fn val, file ->
      :ok = IO.binwrite(file, val)
      file
    end,
    fn file -> File.close(file) end
  )

import Example
random_binaries(binary_pc, 10) |> IO.inspect(label: :binary)
random_binaries(list_pc, 10) |> IO.inspect(label: :list)

:ok = random_binaries(file_pc, 10)
File.read!("somefile.txt")

Now that we are familiar with push containers as building blocks. Let’s head back to composability aspect.

Composing push containers

In the above example the function call random_binaries(..., 1000) is pushing binaries to the container. Now say we want to encode the binaries using base64 before we push. How would we support this?

As before, let’s create a function called map which wraps the original push for the container with a given function and return it as a new push container. It is same as Enum.map but for push container.

defmodule PushContainerHelpers do
  @spec map(PushContainer.t(), (any -> any)) :: PushContainer.t()
  def map(%PushContainer{} = inner_pc, func) do
    PushContainer.build(
      fn -> PushContainer.init(inner_pc) end,
      fn elem, inner_pc -> PushContainer.push(inner_pc, func.(elem)) end,
      fn inner_pc -> PushContainer.finish(inner_pc) end
    )
  end
end

Notice that map takes a push container and returns a push container.

import PushContainerHelpers

encoded =
  random_binaries(
    map(binary_pc, &Base.encode64/1),
    10
  )

IO.puts("Base64 encoded binaries: #{strinencoded_binaries}")

# string -> encode64 -> upcase -> collect
upcased_encoded =
  random_binaries(
    map(
      map(
        binary_pc,
        &String.upcase/1
      ),
      &Base.encode64/1
    ),
    10
  )

IO.puts("Base64 encoded binaries in upcase: #{upcased_encoded_binaries}")

We can compose push container like this by plugging functions dynamically and building new push containers on-the-fly. This works but as you can see in the last example it not easy to read. Let’s see if it improves if we format it differently or if we use pipe.

Readability

Does formatting improves the readability in the above examples?

import PushContainerHelpers

# string -> encode64 -> upcase -> collect-as-string

# formatting as single line
random_binaries(map(map(binary_pc, &String.upcase/1), &Base.encode64/1), 10)

# formatting using pipe
binary_pc
|> map(&String.upcase/1)
|> map(&Base.encode64/1)
|> random_binaries(10)

As you can see formatting is not helping. Using our beloved pipe makes reading even worse! We have to read the expressions from bottom-to-top to understand what is going on. We can visualise it to make the issue more clear.

Push without pipe
Without pipe

Sort of works for small chains, but we still need to address it for long expressions.

With pipe
With pipe

Using pipes does not work at all with push containers!

What is going on here?

The issue is that we prefer to read an expression as INPUT, FUNCTION, OUTPUT in that exact order. This is especially important when we have nested function calls with multiple parameters. Why we like expressions to be in this order? I don’t know for sure but my guess is that, it is so because that’s how most programming languages are modelled. We are trained to read code like that and expect order-of-reading with logical order-of-execution. It also might be because most our languages are written from left-to-right, top-to-bottom (there are few exceptions to this). Or more fundamentally the natural order of cause and effect. If order of an expression is not INPUT, FUNCTION, OUTPUT then that will counter intuitive. This is also the reason why we have pipe operator |> in the first place. Pipe is basically a macro-magic we use to make function chaining more readable by changing the order expressions.

# Enum functions without pipe:
#
# Enum.map(...)
# FUNCTION, INPUT
#
# OUPUT is implicit here as return value
Enum.map(Enum.map(random_binaries(10), &String.upcase/1), &Base.encode64/1)


# Enum functions with pipe
#
# ... |> Enum.map()
# INPUT, FUNCTION
random_binaries(10)
|> Enum.map(&String.upcase/1)
|> Enum.map(&Base.encode64/1)

Pipe works for Enum because first argument we pass to the Enum function is the “input”. So moving input expression a line before the function call makes the movement of values appear natural.

It is more obvious if we visualise this:

Without pipe
Without pipe
With pipe
With pipe

As we can see, using pipe with when multiple functions are involved is pleasant to read. By using pipe we turned FUNCTION INPUT into INPUT FUNCTION which matches perfectly with how we read code.

Why pipe is not working for push container?

# Without pipe:
#
# FUNCTION, OUTPUT, INPUT
random_binaries(binary_pc, 10)

# With pipe:
#
# OUTPUT, FUNCTION, INPUT
binary_pc
|> random_binaries(10)

The “push container” value (binary_pc in the example) we are passing is not an input we pass to the operation in logical sense. It is something where the function should push the output. So using pipe here breaks our expectation and looks confusing.

How to solve this?

Let’s correct the order FUNCTION, OUTPUT, INPUT to FUNCTION, INPUT, OUTPUT by making push-container (binary_pc) value as last argument to the function.

# formatting as single line
random_binaries(10, map(&Base.encode64/1, map(&String.upcase/1, binary_pc)))

# formatting as multiple lines
random_binaries(
  10,
  map(
    &Base.encode64/1,
    map(
      &String.upcase/1,
      binary_pc
    )
  )
)

When visualise this:

Container as last param
Container as last param
Container as last param with multi line formatting
Container as last param with multi line formatting

This reads much better and nearly addresses our concern. Although formatting function nested function calls as multiple lines make it more readable, we could go one step further and create a new operator like |> for cases like this. Our new operator takes two expressions A and B, and appends B as last argument to A. One important thing to note is that the operator must be right associative for this to work. Why it won’t work otherwise is left as exercise to readers :)

Let’s pick +++ operator for demonstration purpose which is right associate.

defmodule PushOp do
  defmacro left +++ right do
    {func, meta, args} = left
    {func, meta, args ++ [unq]}
  end
end

require PushOp
import PushOp

string =
  random_binaries(10)
  +++ map(&Base.encode64/1)
  +++ map(&String.upcase/1)
  +++ binary_pc

IO.puts("=> #{string}")

This is even better than previous code, since we got back to our INPUT, FUNCTION, OUTPUT ordering. Exactly like how pipe operator worked for Enum. To recap, all we did was to make collectable param as last param and created helper macro to improve the ordering of the expression.

Now that readability side quest is complete, lets get back to the main mission.

Remaining push container functions

Implementing other basic helper functions like reduce, filter etc is straight forward. Let’s bundle all functions as Contr module. Similar to Enum module for Enumerable.

defmodule Contr do
  def reduce(func, %PushContainer{} = pc) do
    PushContainer.new(
      fn ->
        PushContainer.init(pc)
      end,
      fn elem, pc ->
        func.(elem, pc)
      end,
      fn pc ->
        PushContainer.finish(pc)
      end
    )
  end

  # redefining map using reduce
  def map(func, %PushContainer{} = pc) do
    reduce(
      fn elem, pc ->
        PushContainer.call(pc, func.(elem))
      end,
      pc
    )
  end

  def filter(func, %PushContainer{} = pc) do
    reduce(
      fn elem, pc ->
        if func.(elem) do
          PushContainer.call(pc, elem)
        else
          pc
        end
      end,
      pc
    )
  end

  def split(func, %PushContainer{} = true_pc, %PushContainer{} = false_pc) do
    PushContainer.new(
      fn ->
        {PushContainer.init(true_pc), PushContainer.init(false_pc)}
      end,
      fn elem, {t, f} ->
        if func.(elem) do
          {PushContainer.push(t, elem), f}
        else
          {t, PushContainer.push(f, elem)}
        end
      end,
      fn {t, f} ->
        {PushContainer.finish(t), PushContainer.finish(f)}
      end
    )
  end
end

Equipped with these, we can compose push containers in multiple ways similar to Enum.

import Example
import PushOp

_string =
  random_binaries(10)
  +++ Contr.map(fn bin -> Base.encode64(bin) end)
  +++ Contr.filter(fn str -> str =~ ~r/^[a-zA-Z=]+$/ end)
  +++ binary_pc

# => "olHQItQ=OWPYlAY="

Only thing pending now is to bridge the PushContainer and Collectable protocol for interoperability, so we can take advantage of modules which already implements Collectable protocol.

defmodule PushContainer do
  defstruct [:acc, :init, :func, :finish]

  def build(init, func, finish) do
    %__MODULE__{init: init, func: func, finish: finish}
  end

  def init(%PushContainer{init: init} = pc) do
    %PushContainer{pc | acc: init.(), init: :done}
  end

  def push(%PushContainer{acc: acc, func: func} = pc, value) do
    %PushContainer{pc | acc: func.(value, acc)}
  end

  def finish(%PushContainer{acc: acc, finish: finish}, status) do
    if is_function(finish, 1) do
      finish.(acc)
    else
      finish.(acc, status)
    end
  end

  def from_collectable(collectable) do
    PushContainer.build(
      fn ->
        Collectable.into(collectable)
      end,
      fn {acc, collector_func}, elem ->
        collector_func.(acc, {:cont, elem})
      end,
      fn {acc, collector_func}, status ->
        collector_func.(acc, status)
      end
    )
  end
end

defimpl Collectable, for: PushContainer do
  def into(%PushContainer{} = pc) do
    collector_func = fn
      acc, {:cont, elem} ->
        PushContainer.push(acc, elem)

      acc, :halt ->
        PushContainer.finish(acc, :halt)

      acc, :done ->
        PushContainer.finish(acc, :done)
    end

    acc = PushContainer.init(pc)
    {acc, collector_func}
  end
end
import Example
import PushOp, only: [{:+++, 2}]

square_set_container =
  Contr.map(
    fn num -> num * num end,
    PushContainer.from_collectable(MapSet.new())
  )

square_set = Enum.into(1..10, square_set_container)
# => #MapSet<[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]>

As for the original sample problem about stream of logs:

import Example
import PushOp, only: [{:+++, 2}]

defmodule Demo do
  def collect_logs(sink) do
    log_stream
    |> Stream.take(10)
    |> Enum.into(sink)
  end
end

error_sink =
  Contr.map(fn "error: " <> line -> line end)
  +++ Contr.map(fn line -> String.upcase(line) end)
  +++ list_pc

warn_sink =
  Contr.map(fn "warn: " <> line -> line end)
  +++ Contr.map(fn line -> line <> "\n" end)
  +++ binary_pc

Demo.collect_logs(
  Contr.split(
    &String.starts_with?(&1, "warn: "),
    warn_sink,
    error_sink
  )
)

To read more about streams in Elixir: