We're planting a tree for every job application! Click here to learn more

Architecting Flow in Elixir Programs: An Introduction + Advanced Techniques

Ovidiu Deac

18 Apr 2018

11 min read

Architecting Flow in Elixir Programs: An Introduction + Advanced Techniques
  • Erlang

Getting the “flow” of a program is one of the first things I do when looking at open source software or joining a new project at work.

The easier it is to grasp how data flows through a program, the easier it is for me as a developer to estimate the impact of changes (and the business of software is all about changes; while “writing hot new cool stuff” is most certainly fun, most professional work I’ve done falls into the “maintaining once-hot-new-cool-stuff and adapt it to changed requirements” category).

Now, writing CLIs (command line interfaces) is a personal pet peeve of mine. These are little, mostly straight-forward programs, sometimes fulfilling a single purpose (like cat or touch) and sometimes being the fascade for a host of features (likegit, docker or mix).

In any case, CLIs are a perfect example to demonstrate what I would consider good “flow” (and, naturally, the ideas presented in this post are just as applicable to embedded software, web applications or any other program).

Let’s see some code Imagine a CLI for converting images. The flow might look something like this:

process.svg

Implemented as a Mix task, it could look something like this:

defmodule Mix.Tasks.ConvertImages do
  use Mix.Task

  def run(argv) do
    {opts, args, _invalid} =
      OptionParser.parse(argv, switches: [target_dir: :string, format: :string])

    glob = List.first(args) || "./image_uploads/*"
    filenames = Path.wildcard(glob)

    target_dir = opts[:target_dir] || "./tmp"
    File.mkdir_p!(target_dir)

    format = opts[:format] || "jpg"
    # TODO: ensure valid format

    results =
      Enum.map(filenames, fn filename ->
        Converter.convert_image(filename, target_dir, format)
      end)

    IO.puts("Wrote #{Enum.count(results)} images to #{target_dir}.")
  end
end

# NOTE: we will omit the definition of `Converter.convert_image/3` for now and
#       assume it works as one would expect (take a filename, convert its
#       contents to the given format and write the result to the given target
#       directory).

I have written this kind of program a dozen times before. There’s nothing really wrong with it, except that writing these kinds of throw-away scripts is much more fun than inheriting them. So let’s do our successor (or our future-self) a favor …

We could group related work inside the function if we want to illustrate the flow of our program:

defmodule Mix.Tasks.ConvertImages2 do
  use Mix.Task

  def run(argv) do
    # 1 - parse options
    {opts, args, _invalid} =
      OptionParser.parse(argv, switches: [target_dir: :string, format: :string])

    glob = List.first(args) || "./image_uploads/*"
    target_dir = opts[:target_dir] || "./tmp"
    format = opts[:format] || "jpg"

    # 2 - validate options
    filenames = Path.wildcard(glob)

    if Enum.empty?(filenames) do
      raise "No images found."
    end

    unless Enum.member?(~w[jpg png], format) do
      raise "Unrecognized format: #{format}"
    end

    # 3 - prepare conversion
    File.mkdir_p!(target_dir)

    # 4 - convert images and write them to target directory
    results =
      Enum.map(filenames, fn filename ->
        Converter.convert_image(filename, target_dir, format)
      end)

    # 5 - report results to STDOUT
    IO.puts("Wrote #{Enum.count(results)} images to #{target_dir}.")
  end
end

But this might just be a case of “commenting not-so-ideal code”, so let’s put these sections into separate functions:

defmodule Mix.Tasks.ConvertImages3 do
  use Mix.Task

  @default_glob "./image_uploads/*"
  @default_target_dir "./tmp"
  @default_format "jpg"

  def run(argv) do
    {glob, target_dir, format} = parse_options(argv)

    validate_options(glob, format)

    filenames = prepare_conversion(glob, target_dir)
    results = convert_images(filenames, target_dir, format)

    report_results(results, target_dir)
  end

  defp parse_options(argv) do
    {opts, args, _invalid} =
      OptionParser.parse(argv, switches: [target_dir: :string, format: :string])

    glob = List.first(args) || @default_glob
    target_dir = opts[:target_dir] || @default_target_dir
    format = opts[:format] || @default_format

    {glob, target_dir, format}
  end

  defp validate_options(glob, format) do
    filenames = Path.wildcard(glob)

    if Enum.empty?(filenames) do
      raise "No images found."
    end

    unless Enum.member?(~w[jpg png], format) do
      raise "Unrecognized format: #{format}"
    end
  end

  defp prepare_conversion(glob, target_dir) do
    File.mkdir_p!(target_dir)

    Path.wildcard(glob)
  end

  defp convert_images(filenames, target_dir, format) do
    Enum.map(filenames, fn filename ->
      Converter.convert_image(filename, target_dir, format)
    end)
  end

  defp report_results(results, target_dir) do
    IO.puts("Wrote #{Enum.count(results)} images to #{target_dir}.")
  end
end

It’s getting easier to see what is happening and what phases the program walks through to reach its goal.

If we revisit our diagram from the top, we start to see that we added the activity of validating our inputs:

process_revisited.svg

See below how we can adapt this diagram into code using Elixir’s pipe operator (|>).

defmodule Mix.Tasks.ConvertImages4 do
  use Mix.Task

  @default_glob "./image_uploads/*"
  @default_target_dir "./tmp"
  @default_format "jpg"

  # NOTE: we could also have refactored this using `with`, but
  #       it doesn't really matter for the point I'm trying to make ^_^

  def run(argv) do
    argv
    |> parse_options()
    |> validate_options()
    |> prepare_conversion()
    |> convert_images()
    |> report_results()
  end

  defp parse_options(argv) do
    {opts, args, _invalid} =
      OptionParser.parse(argv, switches: [target_dir: :string, format: :string])

    glob = List.first(args) || @default_glob
    target_dir = opts[:target_dir] || @default_target_dir
    format = opts[:format] || @default_format

    {glob, target_dir, format}
  end

  defp validate_options({glob, target_dir, format}) do
    filenames = Path.wildcard(glob)

    if Enum.empty?(filenames) do
      raise "No images found."
    end

    unless Enum.member?(~w[jpg png], format) do
      raise "Unrecognized format: #{format}"
    end

    {glob, target_dir, format}
  end

  defp prepare_conversion({glob, target_dir, format}) do
    File.mkdir_p!(target_dir)

    filenames = Path.wildcard(glob)

    {filenames, target_dir, format}
  end

  defp convert_images({filenames, target_dir, format}) do
    results =
      Enum.map(filenames, fn filename ->
        Converter.convert_image(filename, target_dir, format)
      end)

    {results, target_dir}
  end

  defp report_results({results, target_dir}) do
    IO.puts("Wrote #{Enum.count(results)} images to #{target_dir}.")
  end
end

Use Your Imagination

The example is meant to be easily accessible and relatable.

Please imagine the presented solution for a complex app or just a non-trivial use-case for the program above, like converting the images to match certain dimensions based on their filenames, reporting errors, adding a verbose flag to give more information to the user during runtime, optionally writing the EXIF information of the original images to an external datastore and/or serving the collected image metadata through the same CLI while lazily converting new images any time the program is run.

Now we’re talking.

Requirements change, software needs to be maintained

One thing that immediately stands out in the examples above: the last version is the most verbose one (the first version was 24 lines, the last one clocks in at 62 lines).

If we assume a more complex example, this difference in lines will become less significant. In these cases, we will reap the benefits of having a more approachable codebase, cleaner stacktraces and an easier time to delete old and add new code.

The last point is paramount because requirements for software change all the time. So we want to make our programs as adaptable to change as possible.

A clear flow can enable just that. 👍

Advanced Techniques for Architecting Flow in Elixir

In the last post we explored how we can use either |> or with to model how data flows through our program.

There is, however, a third concept to model flow in your applications: to hand down a “token” during the execution of your program. This token contains all the information necessary for your program to fulfil its use-case.

In Elixir, this token is usually a struct. Let’s look at two examples.

Plug.Conn

The most famous example for this in the Elixir ecosystem can be found in Plug.

A “plug” is basically a function that takes a Plug.Conn struct as a first argument and returns a (modified) Plug.Conn struct. Each web request is processed by a Plug pipeline, a series of plugs that get invoked one after another. The Plug.Conn struct contains all information received in the web request and all information to be sent in the server’s response.

defmodule MyPlugPipeline do
  use Plug.Builder

  # You can plug modules, which implement the Plug behaviour
  plug Plug.Logger

  # You can plug local functions, which implement the Plug behaviour
  plug :hello, my_param: 42

  def hello(conn, opts) do
    if opts[:my_param] == 42 do
      send_resp(conn, 200, "The answer to all questions!")
    else
      send_resp(conn, 200, "Options are optional!")
    end
  end

  # You can even plug functions from other modules,
  # as long as they are imported into the current module
  import SomeOtherModule, only: [my_other_plug: 2]

  plug :my_other_plug
end

In each Plug, we can modify the Plug.Conn struct, e.g. set the reponse’s content, add additional response HTTP headers or halt the connection, which causes all the remaining plugs in the pipeline to be skipped.

def hello(conn, opts) do
  case prepare_response() do
    {timing_in_ms, body} ->
      conn
      |> put_resp_content_type("text/plain")
      |> put_resp_header("Server-Timing", "total;dur=#{timing_in_ms}")
      |> send_resp(200, body)

    _ ->
      halt(conn)
  end
end

Another example for a “token”, which is handed down in a business process, are changesets in Ecto.

Ecto.Changeset

Ecto.Changesets are structs used to apply filters, validations and other constraints during the manipulation of structs.

import Ecto.Changeset

user = %User{}

user
|> cast(params, [:name, :email, :age])
|> validate_required([:name, :email])
|> validate_format(:email, ~r/@/)
|> validate_inclusion(:age, 18..100)
|> unique_constraint(:email)

Just like Plug.Conn before, the Ecto.Changeset struct in this example flows through a pipeline of transformations and provides a binding interface for all functions involved in its use-case, i.e. filtering, casting, validating and constraining the manipulation of data.

Contrary to Plug.Conn, the scope of a changeset is not necessarily tied to any kind of request life cycle.

Let’s build our own!

Let’s use these insights to adapt this concept to an application of our own. We will stick with our example of converting images with a Mix task:

process_revisited_options.svg

First, we introduce a struct to help us with handling given command-line arguments (green tasks in the image above).

We will call this struct Options:

defmodule Converter.Options do
  defstruct argv: nil,
            glob: nil,
            target_dir: nil,
            format: nil
end

We will use this to convert the given command-line arguments into a structured form and validate them. Later, we will prepare the conversion process using Options.

defmodule Mix.Tasks.ConvertImages do
  use Mix.Task

  alias Converter.Options

  @default_glob "./image_uploads/*"
  @default_target_dir "./tmp"
  @default_format "jpg"

  def run(argv) do
    validation =
      %Options{argv: argv}
      |> parse_options()
      |> validate_options()

    case validation do
      {:ok, options} ->
        filenames = prepare_conversion(options)

        results = convert_images(filenames, options.target_dir, options.format)

        report_results(options.target_dir, results)

      {:error, error} ->
        report_error(error)
    end
  end

  # Each stage of the conversion process takes the `Options` as argument ...
  defp parse_options(%Options{argv: argv} = options) do
    {opts, args, _invalid} =
      OptionParser.parse(argv, switches: [target_dir: :string, format: :string])

    glob = List.first(args) || @default_glob
    target_dir = opts[:target_dir] || @default_target_dir
    format = opts[:format] || @default_format

    %Options{options | glob: glob, target_dir: target_dir, format: format}
  end

  # ... we pattern match on the fields that are relevant to the current step!
  defp validate_options(%Options{glob: glob, format: format} = options) do
    filenames = Path.wildcard(glob)

    cond do
      Enum.empty?(filenames) ->
        {:error, "No images found."}

      !Enum.member?(~w[jpg png], format) ->
        {:error, "Unrecognized format: #{format}"}

      true ->
        {:ok, options}
    end
  end

  defp prepare_conversion(%Options{glob: glob, target_dir: target_dir}) do
    File.mkdir_p!(target_dir)

    Path.wildcard(glob)
  end

  defp convert_images(filenames, target_dir, format) do
    results =
      Enum.map(filenames, fn filename ->
        Converter.convert_image(filename, target_dir, format)
      end)

    results
  end

  defp report_results(target_dir, results) do
    IO.puts("Wrote #{Enum.count(results)} images to #{target_dir}.")
  end

  defp report_error(error) do
    IO.puts("[error] #{error}")
  end
end

Our new Options struct takes a role similar to the one Ecto.Changeset plays: It helps us to fulfil a specific task (parsing and validating options for the conversion process).

To achieve this, our run/1 function had to be restructured:

validation =
  %Options{argv: argv}
  |> parse_options()
  |> validate_options()

case validation do
  {:ok, options} ->
    filenames = prepare_conversion(options)

    results = convert_images(filenames, options.target_dir, options.format)

    report_results(options.target_dir, results)

  {:error, error} ->
    report_error(error)
end

While that does not look overly complex, the same function from our first article looked like this:

argv
|> parse_options()
|> validate_options()
|> prepare_conversion()
|> convert_images()
|> report_results()

Let’s try to gain back some of that clarity …

One Token to Rule Them All!

We can gain back clarity by using a single token for the fulfilment of our use-case from start to finish. This is what Plug does with Plug.Conn: each request and its response are represented as a single token, which accompanies the whole business process of answering a web request, from getting the original request all the way to sending out the response.

What would this look like for our example?

We are getting a “request” to convert images in a given directory to a given format and answer this “request” by returning the converted images and printing their names on the terminal (or presenting an error message if the “request” was malformed).

We will call our new struct Token and let it flow through our program (green tasks):

process_revisited_token.svg

defmodule Converter.Token do
  defstruct argv: nil,
            glob: nil,
            target_dir: nil,
            format: nil,
            errors: nil,
            halted: nil,
            results: nil
end

Now we can pass a Token in at the “top of the pipe” in our run/1 function.

defmodule Mix.Tasks.ConvertImages do
  use Mix.Task

  alias Converter.Token

  @default_glob "./image_uploads/*"
  @default_target_dir "./tmp"
  @default_format "jpg"

  def run(argv) do
    %Token{argv: argv}
    |> parse_options()
    |> validate_options()
    |> prepare_conversion()
    |> convert_images()
    |> report_results()
  end

  # Each stage of the conversion process takes the `Token` as argument ...
  defp parse_options(%Token{argv: argv} = token) do
    {opts, args, _invalid} =
      OptionParser.parse(argv, switches: [target_dir: :string, format: :string])

    glob = List.first(args) || @default_glob
    target_dir = opts[:target_dir] || @default_target_dir
    format = opts[:format] || @default_format

    %Token{token | glob: glob, target_dir: target_dir, format: format}
  end

  # ... we pattern match on the fields that are relevant to the current step  ...
  defp validate_options(%Token{filenames: filenames, format: format} = token) do
    errors = [
      if(Enum.empty?(filenames), do: "No images found."),
      if(!Enum.member?(~w[jpg png], format), do: "Unrecognized format: #{format}")
    ]

    %Token{token | errors: errors, halted: Enum.any?(errors)}
  end

  # ... we skip steps by matching on `halted: true`  ...
  defp prepare_conversion(%Token{halted: true} = token), do: token

  # ... we put in new information gathered at the current stage  ...
  defp prepare_conversion(%Token{target_dir: target_dir} = token) do
    File.mkdir_p!(target_dir)
    filenames = Path.wildcard(glob)

    %Token{token | filenames: filenames}
  end

  # ... we can skip steps by matching on `halted: true` ...
  defp convert_images(%Token{halted: true} = token), do: token

  # ... also, we don't have to pattern match on the `Token` necessarily ...
  defp convert_images(token) do
    results =
      Enum.map(token.filenames, fn filename ->
        Converter.convert_image(filename, token.target_dir, token.format)
      end)

    %Token{token | results: results}
  end

  # ... and at the end we can report errors by matching on `halted: true`  ...
  defp report_results(%Token{halted: true, errors: errors} = token) do
    Enum.each(errors, fn error ->
      IO.puts("- #{error}")
    end)

    token
  end

  # ... or report the results of the success program execution.
  defp report_results(token) do
    IO.puts("Wrote #{Enum.count(token.results)} images to #{token.target_dir}.")

    token
  end
end

Our new Token interacts very much like Plug.Conn does: It is handed down from function to function during the execution of our business process.

Let’s summarize the properties of this approach:

  1. Each step of the program’s execution is a function, which takes the Token struct as a first argument and we pattern match on the fields that are relevant to the current step.
  2. We can easily skip steps by matching on halted: true.
  3. We can put in new information gathered at the current stage by modifying the Token before returning it. At the end, we can report errors by matching on halted: trueor report the results of the successful program execution.

Comparison of the three approaches

Let’s look at the properties of each approach once again:

# Approach 1: using `|>`
def run(argv) do
  argv
  |> parse_options()
  |> validate_options()
  |> prepare_conversion()
  |> convert_images()
  |> report_results()
end

The |> approach forces the code to adopt Elixir’s idiomatic style of putting the to-be-transformed data as the first argument in any function. This provides a kind of natural “interface” or “contract”.

# Approach 2: using `with`
def run(argv) do
  with {glob, target_dir, format} <- parse_options(argv),
        :ok <- validate_options(glob, format),
        filenames <- prepare_conversion(glob, target_dir),
        results <- convert_images(filenames, target_dir, format) do
    report_results(results, target_dir)
  end
end

The with approach shines when collaborating in a fast-moving environment: you might not want to be that dependent on another programmer’s return values early on and with provides more flexibility in dealing with the called function’s result.

# Approach 3: using a `Token`
def run(argv) do
  %Token{argv: argv}
  |> parse_options()
  |> validate_options()
  |> prepare_conversion()
  |> convert_images()
  |> report_results()
end

Finally, the Token approach can combine the benefits of both worlds, if applied in the right place: If you have a well defined, narrow use-case, where you want to provide a unified interface and data structure (like Ecto.Changeset), you might find this approach benefical. It is also a great idea for the most top-level flow of your program, where your use-case is the very purpose of your program and you might want to establish an explicit data contract between different interacting parts of your app (like Plug.Conn does for web apps).

In these situations, we can can combine the benefits of the first two approaches, since it provides a binding contract between all parts of your app, but also allows teams to work independently.

We can also do “flowy things” like skipping steps by matching on the Token:

# skip this step since execution is halted
defp prepare_conversion(%Token{halted: true} = token) do
  token
end

# fulfil this step since execution is not halted
defp prepare_conversion(%Token{target_dir: target_dir} = token) do
  File.mkdir_p!(target_dir)

  token
end

We will take a more detailed look at these “flowy things” as well as the Pros and Cons of the Token approach in future articles.

Did you like this article?

Ovidiu Deac

See other articles by Ovidiu

Related jobs

See all

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Title

The company

  • Remote

Related articles

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

12 Sep 2021

JavaScript Functional Style Made Simple

JavaScript Functional Style Made Simple

Daniel Boros

12 Sep 2021

WorksHub

CareersCompaniesSitemapFunctional WorksBlockchain WorksJavaScript WorksAI WorksGolang WorksJava WorksPython WorksRemote Works
hello@works-hub.com

Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ

108 E 16th Street, New York, NY 10003

Subscribe to our newsletter

Join over 111,000 others and get access to exclusive content, job opportunities and more!

© 2025 WorksHub

Privacy PolicyDeveloped by WorksHub