Testing multi-node pubsub Phoenix apps with ExUnit

Published April 16, 2019 by Toran Billups

In February I attended the "Building resilient, distributed systems with Elixir" training from Ben Marx and Chris Keathley at Lonestar Elixir. I really enjoyed the intensive workshop they put together and because I was working in Phoenix at the time I decided to learn what would be required to test drive a pubsub example with pg2. In the weeks that followed I found a handful of gotchas and learned a few pro-tips worth sharing with the wider community.

TL;DR If you prefer to skip the story and see the code you can find everything on github

Disclaimer

It was important that I present a simple distributed state problem so I could focus my efforts on testing the cluster. As a result, the production code I'm using to put the problem front and center was intended for learning only and should not be used as-is in application code.

The problem

When the application boots up we fetch all users from the database and store this locally in ets. Later this cache is used to verify the user's session as part of the authentication pipeline.

    def authentication(conn, _opts) do
      case get_session(conn, :user_id) do
        nil ->
          conn
        id ->
          username = Example.Logon.get(:logon, "#{id}")
          assign(conn, :current_user, %{id: id, username: username})
      end
    end
  

The final plug in our authentication pipeline will redirect the user back to login if the session is without a current user.

    def redirect_unauthorized(conn, _opts) do
      current_user = Map.get(conn.assigns, :current_user)
      if current_user != nil and current_user.username != nil do
        conn
      else
        conn
          |> put_session(:return_to, conn.request_path)
          |> redirect(to: Routes.login_path(conn, :index))
          |> halt()
      end
    end
  

In a single node deployment this works because the registration process will insert any new user record into ets after ecto completes the database insert.

    def handle_call({:put, username, password}, _timeout, state) do
      changeset = User.changeset(%User{}, %{id: id, username: username})
      case Users.insert(changeset) do
        {:ok, _result} ->
          UserCache.insert(id, username, hash)
          {:reply, {:ok, {id, username}}, state}
        {:error, changeset} ->
          ...
      end
    end
  

The problem arises when we decide to cluster the application because after the user creates a new account, only the node that served our create request will have the proper state to validate subsequent requests. As we progress I will show how we can solve this by distributing state across the cluster with pg2 so hang tight!

Local Cluster

The first tool I pulled in for this test case was the Elixir library local cluster. You can use local cluster to spawn nodes for distributed state testing without all the moving parts of a full on deployment (ie: docker or kubernetes).

To start using local cluster you open the test helper and add the line `LocalCluster.start()` before `ExUnit.start()` at the top of this file.

    :ok = LocalCluster.start()
    ExUnit.start()
    Ecto.Adapters.SQL.Sandbox.mode(Example.Repo, :manual)
  

It was at this point that I took my first stumble. In both the training at Lonestar and my personal persuit weeks later I found that from a cold boot the command `mix test` would throw an error stating `no distribution` until I would run `iex -S mix phx.server` or something equivalent one time.

    (MatchError) no match of right hand side value: {:error, }, {:child, :undefined, :net_sup_dynamic, {:erl_distribution, :start_link, [[:"[email protected]"], false]}, :permanent, 1000, :supervisor, [:erl_distribution]}}}
    test/test_helper.exs:1: (file)
    (elixir) lib/code.ex:767: Code.require_file/2
    (elixir) lib/enum.ex:769: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir) lib/enum.ex:769: Enum.each/2
  

I still never found a solution or (more) practical workaround but after I post this I do plan to follow up with the open source maintainer to learn more about why this occurs and how we can fix it to unblock others who might follow.

Update: I got word from Chris shortly after posting this blog that running `epmd -daemon` from a cold boot will unlock distribution without the need for iex as mentioned above.

Configure ports

With local cluster functional I started to write the first test case that I hoped would offer fast feedback while I solved the distributed state issue. I borrowed heavily from an existing controller style test in the Phoenix application so I knew it should work without much hassle.

    defmodule ExampleWeb.ClusterTest do
      use ExampleWeb.ConnCase, async: false
    
      test "login will authenticate the user regardless of node", %{conn: conn} do
        name = "toran"
        password = "abcd1234"
        login = %{username: name, password: password}
    
        LocalCluster.start_nodes("example", 2)
    
        created = post(request, Routes.registration_path(conn, :create, login))
        assert Map.get(created.assigns, :current_user) == nil
    
        denied = get(created, Routes.budget_path(conn, :index))
        assert redirected_to(denied, 302) =~ "/"
        assert Map.get(denied.assigns, :current_user) == nil
    
        authenticated = post(denied, Routes.login_path(conn, :create, login))
        assert html_response(authenticated, 302) =~ "redirected"
        assert Map.get(authenticated.assigns, :current_user) == nil
    
        authorized = get(authenticated, Routes.budget_path(conn, :index))
        assert String.match?(html_response(authorized, 200), ~r/.*main.js.*/)
      end
    end
  

But just as I started writing out this test I realized that the built in ConnCase helpers like `post` and `get` didn't specify a particular host or port. I then looked back at the source code from the training and learned that in those multi node test cases a specific enviornment variable was set for each node local cluster generated. But knowing how to dynamically set the port for a Phoenix application was a mystery I'd need to solve for myself because the example in that workshop wasn't Phoenix based.

So next I opened the config file for test to confirm the port was indeed hard coded to port 4002. Then I did some searching around the internet in hopes that someone had blogged about a similar trial but sadly the result set was limited. I did however find a stackoverflow post that got me looking closer at the endpoint file and eventually I landed on a clever extension point.

    defmodule ExampleWeb.Endpoint do
      use Phoenix.Endpoint, otp_app: :example
    
      defp port do
        name = Node.self()
        env =
          name
          |> Atom.to_string
          |> String.replace(~r/@.*$/, "")
          |> String.upcase
    
        String.to_integer(System.get_env("#{env}_PORT") || "4000")
      end
    
      def init(_key, config) do
        {:ok, Keyword.put(config, :http, [:inet6, port: port()])}
      end
    end
  

This init function allowed me the hook I needed to get the name of the running node and pull from that any enviornment variable set for the port. To verify this manually I was able to spin up Phoenix from the command line with a specific name and port.

    EXAMPLE1_PORT=4001 elixir --sname "example1" -S mix phx.server
  

I made my way back to the test so I could properly configure the enviornment variable for port and give local cluster a name that would match.

    defmodule ExampleWeb.ClusterTest do
      use ExampleWeb.ConnCase, async: false
    
      @node1 "http://localhost:4001"
      @node2 "http://localhost:4002"
    
      setup do
        System.put_env("EXAMPLE1_PORT", "4001")
        System.put_env("EXAMPLE2_PORT", "4002")
    
        {:ok, conn: Phoenix.ConnTest.build_conn()}
      end
    
      test "login will authenticate the user regardless of node", %{conn: conn} do
        LocalCluster.start_nodes("example", 2)
        ...
      end
    end
  

Phoenix Server

I now had 2 nodes spun up and ports configured but what about the `post` and `get` helpers? I spent a good deal of time trying to crack open the connection `conn` and alter the host to match as I constructed each request. But as I did this I noticed that my logs printed out :"[email protected]" instead of :"[email protected]" which told me I wasn't yet emulating the http request as nginx would in my docker enviornment.

It was at this point I decided to make the switch from Phoenix helpers to using HTTPoison. The basic helpers aren't much to write home about but I figured I would show the basics for anyone interested. One tradeoff worth mentioning is that I failed to monkey patch the CSRF token verification inside of Phoenix so I had to extract this from the form html by hand.

    def post(response, url, params) do
      %HTTPoison.Response{body: body, headers: headers} = response
      [ csrf_token ] = Floki.find(body, "input[name=_csrf_token]") |> Floki.attribute("value")
      payload = params |> Map.put(:_csrf_token, csrf_token)
      cookie = set_cookie(headers)
    
      headers = [
        {"Content-Type", "application/x-www-form-urlencoded"}
      ]
    
      HTTPoison.post!(url, URI.encode_query(payload), headers, hackney: [cookie: [cookie]])
    end
    
    def get(response, url) do
      %HTTPoison.Response{headers: headers} = response
      cookie = set_cookie(headers)
    
      HTTPoison.get!(url, %{}, hackney: [cookie: [cookie]])
    end
  

After I altered my test case to use the above helpers I got an error running `mix test`

    ** (HTTPoison.Error) :econnrefused
  

I turns out Phoenix doesn't start up a real server when you run `mix test` but after some hunting around for options I discovered you can opt in when you've got a test case that requires it.

    def launch_server do
      endpoint_config =
        Application.get_env(:example, ExampleWeb.Endpoint)
        |> Keyword.put(:server, true)
      :ok = Application.put_env(:example, ExampleWeb.Endpoint, endpoint_config)
    
      :ok = Application.stop(:example)
      :ok = Application.start(:example)
    end
  

Introducing PG2

With a running server, dynamic port configuration and a few http helper functions we finally had everything required to test drive that distributed state problem.

    defmodule ExampleWeb.ClusterTest do
      use Example.DataCase, async: false
    
      @node1 "http://localhost:4001"
      @node2 "http://localhost:4002"
      @password "abcd1234"
    
      setup do
        System.put_env("EXAMPLE1_PORT", "4001")
        System.put_env("EXAMPLE2_PORT", "4002")
    
        launch_server()
    
        {:ok, response: %HTTPoison.Response{}, login: %{username: random_string(), password: @password}}
      end
    
      test "login will authenticate the user regardless of node", %{response: response, login: login} do
        LocalCluster.start_nodes("example", 2)
    
        # create the user on [email protected]
        response = get(response, "#{@node1}/signup")
        assert response.status_code == 200
        response = post(response, "#{@node1}/signup", login)
        assert response.status_code == 302
    
        # authenticate w/ [email protected]
        response = get(response, "#{@node1}/")
        assert response.status_code == 200
        response = post(response, "#{@node1}/", login)
        assert response.status_code == 302
        response = get(response, "#{@node1}/budget")
        assert response.status_code == 200
    
        # authenticate w/ [email protected]
        eventually(fn ->
          response = get(%HTTPoison.Response{}, "#{@node2}/budget")
          assert response.status_code == 302
          response = get(response, "#{@node2}/")
          assert response.status_code == 200
          response = post(response, "#{@node2}/", login)
          assert response.status_code == 302
          response = get(response, "#{@node2}/budget")
          assert response.status_code == 200
        end)
      end
    end
  

Before we can connect up the nodes we need to create the process group. I typically do this step in the application.ex file myself as I only want this to happen once as the software boots up.

    defmodule Example.Application do
      use Application
    
      def start(_type, _args) do
        children = [
        ]
    
        :pg2.create(:example)
    
        opts = [strategy: :one_for_one, name: Example.Supervisor]
        Supervisor.start_link(children, opts)
      end
    end
  

Next we need to join that process group from the logon GenServer we use to insert user data.

    defmodule Example.Logon do
      use GenServer
    
      @impl GenServer
      def init(:ok) do
        :pg2.join(:example, self())
    
        ... 
      end
    end
  

The single change needed to publish a message to other nodes during user creation is a 2 step process. First we pull the insert from the block of code we have today and replace it with a `GenServer.cast` for each member in the process group.

    @impl GenServer
    def handle_call({:put, username, password}, _timeout, state) do
      changeset = User.changeset(%User{}, %{id: id, username: username})
      case Users.insert(changeset) do
        {:ok, _result} ->
    
          members = :pg2.get_members(:example)
          Enum.map(members, fn (pid) ->
            GenServer.cast(pid, {:merge, id, username, hash})
          end)
    
          {:reply, {:ok, {id, username}}, state}
        {:error, changeset} ->
          ...
      end
    end
  

Next we need to implement the handler for that cast to insert the user into the cache on each node. After this callback is functional the test should pass because each node will eventually be consistent.

    @impl GenServer
    def handle_cast({:merge, id, username, hash}, state) do
      UserCache.insert(id, username, hash)
      {:noreply, state}
    end
  

Netsplits

What about testing when a node loses connection with other nodes in the cluster? It turns out Chris Keathley wrote a library for exactly this type of distributed failure testing. Schism allows you to both partition and heal to validate the nodes behave themselves.

    defmodule ExampleWeb.ClusterTest do
      use Example.DataCase, async: false
    
      @node1 "http://localhost:4001"
      @node2 "http://localhost:4002"
    
      test "authenticate works after network partition heals", %{response: response, login: login} do
        [n1, n2] = LocalCluster.start_nodes("example", 2)
    
        Schism.partition([n1])
    
        # create the user on [email protected]
        response = get(response, "#{@node1}/signup")
        assert response.status_code == 200
        response = post(response, "#{@node1}/signup", login)
        assert response.status_code == 302
    
        # authenticate w/ [email protected]
        response = get(response, "#{@node1}/")
        assert response.status_code == 200
        response = post(response, "#{@node1}/", login)
        assert response.status_code == 302
        response = get(response, "#{@node1}/budget")
        assert response.status_code == 200
    
        # [email protected] split so authenticate fails
        response = get(%HTTPoison.Response{}, "#{@node2}/budget")
        assert response.status_code == 302
        response = get(response, "#{@node2}/")
        assert response.status_code == 200
        response = post(response, "#{@node2}/", login)
        assert response.status_code == 200
    
        Schism.heal([n1, n2])
    
        # [email protected] heal so authenticate works
        eventually(fn ->
          response = get(%HTTPoison.Response{}, "#{@node2}/budget")
          response = get(response, "#{@node2}/")
          response = post(response, "#{@node2}/", login)
          assert response.status_code == 302
          response = get(response, "#{@node2}/budget")
          assert response.status_code == 200
        end)
      end
    end
  

In this scenario we start with a network partition to test drive the healing required to get each node in sync after connectivity is restored in the cluster. This will be failing after the heal because right now we don't handle the up or down messages sent when a node becomes connected/disconnected.

To solve this we first explicitly monitor the nodes by altering the logon GenServer.

    defmodule Example.Logon do
      use GenServer
    
      @impl GenServer
      def init(:ok) do
        :net_kernel.monitor_nodes(true)
    
        :pg2.join(:example, self())
    
        ... 
      end
    end
  

Now that we are monitoring the logon GenServer will get both `:nodeup` and `:nodedown` messages. For the baseline implementation today we will add `handle_info` and pattern match on the `:nodeup` so we can begin the healing process after we are notified that connectivity was restored. Note: this exact implementation will publish a message for each user and should be considered naive.

    @impl GenServer
    def handle_info({:nodeup, node}, state) do
      for {id, {username, hash}} <- UserCache.all() do
        GenServer.cast({__MODULE__, node}, {:merge, id, username, hash})
      end
      {:noreply, state}
    end
  

When the node goes down this GenServer will receive a `:nodedown` message so to avoid crashing we need to implement the `handle_info` with a generic argument. Alternatively you could match on the `:nodedown` and just not do anything at the moment.

    def handle_info(_msg, state) do
      {:noreply, state}
    end
  

Registry Incompatible

When I first hacked together the `GenServer.cast` above the logon GenServer itself was registered using my local Registry and a via tuple. I did some searching on github and it seems you cannot send a message to another node like this so I updated my `start_link` to instead use the module name.

    defmodule Example.Logon do
      use GenServer
    
      def start_link(_args) do
        GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
      end
    end
  

Docker

So after all this hard work we've got 2 passing tests and some pubsub but can we deploy this and have it be fully operational? Unfortunately these tests don't drive out the docker-compose changes necessary but to see this work end to end feels like the goal so let's keep going shall we.

First we need to provide a name to the node like our tests did. In my specific setup I'm using a simple bash script called entrypoint.sh. In this file I'm updating the `phx.server` command to take a random string for a name. Note: This deployment setup isn't (yet) using releases but it does work.

    #!/bin/bash
    
    str=`date -Ins | md5sum`
    name=${str:0:10}
    
    mix run transform.exs
    mix phx.digest
    mix ecto.create
    mix ecto.migrate
    mix run priv/repo/seeds.exs
    
    elixir --sname $name --cookie monster -S mix phx.server
  

Next we need some code that allows each node to communicate "hello, I'm online! Please connect with me". My first attempt was successful using the host filesystem but the database makes for another fine alternative so I'll show that approach instead.

I created a simple database table called `cluster` that will be used to push data in and query for the node to node chatter. Start by creating the schema file for cluster.

    defmodule Example.Cluster do
      use Ecto.Schema
    
      import Ecto.Changeset
    
      @primary_key {:id, :string, []}
      schema "clusters" do
    
        timestamps()
      end
    
      def changeset(cluster, attrs) do
        cluster
          |> cast(attrs, [:id])
      end
    
    end
  

Next create the database migration to insert the new table.

    defmodule Example.Repo.Migrations.CreateClusters do
      use Ecto.Migration
    
      def change do
        create table(:clusters, primary_key: false) do
          add :id, :string, primary_key: true
    
          timestamps()
        end
      end
    end
  

We also need a simple `upsert` function to insert the node name so create the file clusters.exs

    defmodule Example.Clusters do
      import Ecto.Query, warn: false
    
      alias Example.Repo
      alias Example.Cluster
    
      def all do
        Repo.all(Cluster)
      end
    
      def upsert_by(attrs) do
        case Repo.get_by(Cluster, attrs) do
          nil -> %Cluster{}
          cluster -> cluster
        end
        |> Cluster.changeset(attrs)
        |> Repo.insert_or_update
      end
    end
  

And finally, the cluster sync GenServer that will do the heavy lifting to establish connectivity by way of `Node.ping`

    defmodule Example.ClusterSync do
      use GenServer
    
      @one_second :timer.seconds(1)
      @two_seconds :timer.seconds(2)
      @ten_seconds :timer.seconds(10)
    
      alias Example.Clusters
    
      def start_link(_args) do
        GenServer.start_link(__MODULE__, :ok, name: via(:sync))
      end
    
      defp via(name), do: Example.Registry.via(name)
    
      @impl GenServer
      def init(:ok) do
        Process.send_after(self(), :write, @one_second)
        Process.send_after(self(), :query, @two_seconds)
    
        id = Atom.to_string(Node.self)
        {:ok, %{id: id}}
      end
    
      @impl GenServer
      def handle_info(:write, state) do
        Clusters.upsert_by(state)
        {:noreply, state}
      end
    
      @impl GenServer
      def handle_info(:query, state) do
        %{:id => node} = state
    
        Clusters.all()
          |> Enum.map(&Map.from_struct(&1))
          |> Enum.filter(fn (%{:id => id}) -> id != node end)
          |> Enum.map(fn (%{:id => id}) -> String.to_atom(id) end)
          |> Enum.map(&({&1, Node.ping(&1) == :pong}))
    
        Process.send_after(self(), :query, @ten_seconds)
    
        {:noreply, state}
      end
    
    end
  

Now when you run `docker-compose up --scale app=2` you should see 2 connected nodes that publish messages to each other and keep the cluster eventually consistent. Thanks for joining me on this epic adventure! Huge thanks to Chris Keathley and Ben Marx for the inspiration at Lonestar Elixir earlier this year!

You can find the source code from my adventure on github

update: I also put together a Redis implementation that solves the cluster registration problem.


Twitter / Github / Email