In February I attended the "Building resilient, distributed systems with Elixir" training from Ben Marx and Chris Keathley at Lonestar Elixir. I really enjoyed the intensive workshop they put together and because I was working in Phoenix at the time I decided to learn what would be required to test drive a pubsub example with pg2. In the weeks that followed I found a handful of gotchas and learned a few pro-tips worth sharing with the wider community.
TL;DR If you prefer to skip the story and see the code you can find everything on github
Disclaimer
It was important that I present a simple distributed state problem so I could focus my efforts on testing the cluster. As a result, the production code I'm using to put the problem front and center was intended for learning only and should not be used as-is in application code.
The problem
When the application boots up we fetch all users from the database and store this locally in ets. Later this cache is used to verify the user's session as part of the authentication pipeline.
def authentication(conn, _opts) do
case get_session(conn, :user_id) do
nil ->
conn
id ->
username = Example.Logon.get(:logon, "#{id}")
assign(conn, :current_user, %{id: id, username: username})
end
end
The final plug in our authentication pipeline will redirect the user back to login if the session is without a current user.
def redirect_unauthorized(conn, _opts) do
current_user = Map.get(conn.assigns, :current_user)
if current_user != nil and current_user.username != nil do
conn
else
conn
|> put_session(:return_to, conn.request_path)
|> redirect(to: Routes.login_path(conn, :index))
|> halt()
end
end
In a single node deployment this works because the registration process will insert any new user record into ets after ecto completes the database insert.
def handle_call({:put, username, password}, _timeout, state) do
changeset = User.changeset(%User{}, %{id: id, username: username})
case Users.insert(changeset) do
{:ok, _result} ->
UserCache.insert(id, username, hash)
{:reply, {:ok, {id, username}}, state}
{:error, changeset} ->
...
end
end
The problem arises when we decide to cluster the application because after the user creates a new account, only the node that served our create request will have the proper state to validate subsequent requests. As we progress I will show how we can solve this by distributing state across the cluster with pg2 so hang tight!
Local Cluster
The first tool I pulled in for this test case was the Elixir library local cluster. You can use local cluster to spawn nodes for distributed state testing without all the moving parts of a full on deployment (ie: docker or kubernetes).
To start using local cluster you open the test helper and add the line `LocalCluster.start()` before `ExUnit.start()` at the top of this file.
:ok = LocalCluster.start()
ExUnit.start()
Ecto.Adapters.SQL.Sandbox.mode(Example.Repo, :manual)
It was at this point that I took my first stumble. In both the training at Lonestar and my personal persuit weeks later I found that from a cold boot the command `mix test` would throw an error stating `no distribution` until I would run `iex -S mix phx.server` or something equivalent one time.
(MatchError) no match of right hand side value: {:error, {{:shutdown, {:failed_to_start_child, :net_kernel, {:EXIT, :nodistribution}}}, {:child, :undefined, :net_sup_dynamic, {:erl_distribution, :start_link, [[:"[email protected]"], false]}, :permanent, 1000, :supervisor, [:erl_distribution]}}}
test/test_helper.exs:1: (file)
(elixir) lib/code.ex:767: Code.require_file/2
(elixir) lib/enum.ex:769: Enum."-each/2-lists^foreach/1-0-"/2
(elixir) lib/enum.ex:769: Enum.each/2
I still never found a solution or (more) practical workaround but after I post this I do plan to follow up with the open source maintainer to learn more about why this occurs and how we can fix it to unblock others who might follow.
Update: I got word from Chris shortly after posting this blog that running `epmd -daemon` from a cold boot will unlock distribution without the need for iex as mentioned above.
Configure ports
With local cluster functional I started to write the first test case that I hoped would offer fast feedback while I solved the distributed state issue. I borrowed heavily from an existing controller style test in the Phoenix application so I knew it should work without much hassle.
defmodule ExampleWeb.ClusterTest do
use ExampleWeb.ConnCase, async: false
test "login will authenticate the user regardless of node", %{conn: conn} do
name = "toran"
password = "abcd1234"
login = %{username: name, password: password}
LocalCluster.start_nodes("example", 2)
created = post(request, Routes.registration_path(conn, :create, login))
assert Map.get(created.assigns, :current_user) == nil
denied = get(created, Routes.budget_path(conn, :index))
assert redirected_to(denied, 302) =~ "/"
assert Map.get(denied.assigns, :current_user) == nil
authenticated = post(denied, Routes.login_path(conn, :create, login))
assert html_response(authenticated, 302) =~ "redirected"
assert Map.get(authenticated.assigns, :current_user) == nil
authorized = get(authenticated, Routes.budget_path(conn, :index))
assert String.match?(html_response(authorized, 200), ~r/.*main.js.*/)
end
end
But just as I started writing out this test I realized that the built in ConnCase helpers like `post` and `get` didn't specify a particular host or port. I then looked back at the source code from the training and learned that in those multi node test cases a specific enviornment variable was set for each node local cluster generated. But knowing how to dynamically set the port for a Phoenix application was a mystery I'd need to solve for myself because the example in that workshop wasn't Phoenix based.
So next I opened the config file for test to confirm the port was indeed hard coded to port 4002. Then I did some searching around the internet in hopes that someone had blogged about a similar trial but sadly the result set was limited. I did however find a stackoverflow post that got me looking closer at the endpoint file and eventually I landed on a clever extension point.
defmodule ExampleWeb.Endpoint do
use Phoenix.Endpoint, otp_app: :example
defp port do
name = Node.self()
env =
name
|> Atom.to_string
|> String.replace(~r/@.*$/, "")
|> String.upcase
String.to_integer(System.get_env("#{env}_PORT") || "4000")
end
def init(_key, config) do
{:ok, Keyword.put(config, :http, [:inet6, port: port()])}
end
end
This init function allowed me the hook I needed to get the name of the running node and pull from that any enviornment variable set for the port. To verify this manually I was able to spin up Phoenix from the command line with a specific name and port.
EXAMPLE1_PORT=4001 elixir --sname "example1" -S mix phx.server
I made my way back to the test so I could properly configure the enviornment variable for port and give local cluster a name that would match.
defmodule ExampleWeb.ClusterTest do
use ExampleWeb.ConnCase, async: false
@node1 "http://localhost:4001"
@node2 "http://localhost:4002"
setup do
System.put_env("EXAMPLE1_PORT", "4001")
System.put_env("EXAMPLE2_PORT", "4002")
{:ok, conn: Phoenix.ConnTest.build_conn()}
end
test "login will authenticate the user regardless of node", %{conn: conn} do
LocalCluster.start_nodes("example", 2)
...
end
end
Phoenix Server
I now had 2 nodes spun up and ports configured but what about the `post` and `get` helpers? I spent a good deal of time trying to crack open the connection `conn` and alter the host to match as I constructed each request. But as I did this I noticed that my logs printed out :"[email protected]" instead of :"[email protected]" which told me I wasn't yet emulating the http request as nginx would in my docker enviornment.
It was at this point I decided to make the switch from Phoenix helpers to using HTTPoison. The basic helpers aren't much to write home about but I figured I would show the basics for anyone interested. One tradeoff worth mentioning is that I failed to monkey patch the CSRF token verification inside of Phoenix so I had to extract this from the form html by hand.
def post(response, url, params) do
%HTTPoison.Response{body: body, headers: headers} = response
[ csrf_token ] = Floki.find(body, "input[name=_csrf_token]") |> Floki.attribute("value")
payload = params |> Map.put(:_csrf_token, csrf_token)
cookie = set_cookie(headers)
headers = [
{"Content-Type", "application/x-www-form-urlencoded"}
]
HTTPoison.post!(url, URI.encode_query(payload), headers, hackney: [cookie: [cookie]])
end
def get(response, url) do
%HTTPoison.Response{headers: headers} = response
cookie = set_cookie(headers)
HTTPoison.get!(url, %{}, hackney: [cookie: [cookie]])
end
After I altered my test case to use the above helpers I got an error running `mix test`
** (HTTPoison.Error) :econnrefused
I turns out Phoenix doesn't start up a real server when you run `mix test` but after some hunting around for options I discovered you can opt in when you've got a test case that requires it.
def launch_server do
endpoint_config =
Application.get_env(:example, ExampleWeb.Endpoint)
|> Keyword.put(:server, true)
:ok = Application.put_env(:example, ExampleWeb.Endpoint, endpoint_config)
:ok = Application.stop(:example)
:ok = Application.start(:example)
end
Introducing PG2
With a running server, dynamic port configuration and a few http helper functions we finally had everything required to test drive that distributed state problem.
defmodule ExampleWeb.ClusterTest do
use Example.DataCase, async: false
@node1 "http://localhost:4001"
@node2 "http://localhost:4002"
@password "abcd1234"
setup do
System.put_env("EXAMPLE1_PORT", "4001")
System.put_env("EXAMPLE2_PORT", "4002")
launch_server()
{:ok, response: %HTTPoison.Response{}, login: %{username: random_string(), password: @password}}
end
test "login will authenticate the user regardless of node", %{response: response, login: login} do
LocalCluster.start_nodes("example", 2)
# create the user on node@1
response = get(response, "#{@node1}/signup")
assert response.status_code == 200
response = post(response, "#{@node1}/signup", login)
assert response.status_code == 302
# authenticate w/ node@1
response = get(response, "#{@node1}/")
assert response.status_code == 200
response = post(response, "#{@node1}/", login)
assert response.status_code == 302
response = get(response, "#{@node1}/budget")
assert response.status_code == 200
# authenticate w/ node@2
eventually(fn ->
response = get(%HTTPoison.Response{}, "#{@node2}/budget")
assert response.status_code == 302
response = get(response, "#{@node2}/")
assert response.status_code == 200
response = post(response, "#{@node2}/", login)
assert response.status_code == 302
response = get(response, "#{@node2}/budget")
assert response.status_code == 200
end)
end
end
Before we can connect up the nodes we need to create the process group. I typically do this step in the application.ex file myself as I only want this to happen once as the software boots up.
defmodule Example.Application do
use Application
def start(_type, _args) do
children = [
]
:pg2.create(:example)
opts = [strategy: :one_for_one, name: Example.Supervisor]
Supervisor.start_link(children, opts)
end
end
Next we need to join that process group from the logon GenServer we use to insert user data.
defmodule Example.Logon do
use GenServer
@impl GenServer
def init(:ok) do
:pg2.join(:example, self())
...
end
end
The single change needed to publish a message to other nodes during user creation is a 2 step process. First we pull the insert from the block of code we have today and replace it with a `GenServer.cast` for each member in the process group.
@impl GenServer
def handle_call({:put, username, password}, _timeout, state) do
changeset = User.changeset(%User{}, %{id: id, username: username})
case Users.insert(changeset) do
{:ok, _result} ->
members = :pg2.get_members(:example)
Enum.map(members, fn (pid) ->
GenServer.cast(pid, {:merge, id, username, hash})
end)
{:reply, {:ok, {id, username}}, state}
{:error, changeset} ->
...
end
end
Next we need to implement the handler for that cast to insert the user into the cache on each node. After this callback is functional the test should pass because each node will eventually be consistent.
@impl GenServer
def handle_cast({:merge, id, username, hash}, state) do
UserCache.insert(id, username, hash)
{:noreply, state}
end
Netsplits
What about testing when a node loses connection with other nodes in the cluster? It turns out Chris Keathley wrote a library for exactly this type of distributed failure testing. Schism allows you to both partition and heal to validate the nodes behave themselves.
defmodule ExampleWeb.ClusterTest do
use Example.DataCase, async: false
@node1 "http://localhost:4001"
@node2 "http://localhost:4002"
test "authenticate works after network partition heals", %{response: response, login: login} do
[n1, n2] = LocalCluster.start_nodes("example", 2)
Schism.partition([n1])
# create the user on node@1
response = get(response, "#{@node1}/signup")
assert response.status_code == 200
response = post(response, "#{@node1}/signup", login)
assert response.status_code == 302
# authenticate w/ node@1
response = get(response, "#{@node1}/")
assert response.status_code == 200
response = post(response, "#{@node1}/", login)
assert response.status_code == 302
response = get(response, "#{@node1}/budget")
assert response.status_code == 200
# node@2 split so authenticate fails
response = get(%HTTPoison.Response{}, "#{@node2}/budget")
assert response.status_code == 302
response = get(response, "#{@node2}/")
assert response.status_code == 200
response = post(response, "#{@node2}/", login)
assert response.status_code == 200
Schism.heal([n1, n2])
# node@2 heal so authenticate works
eventually(fn ->
response = get(%HTTPoison.Response{}, "#{@node2}/budget")
response = get(response, "#{@node2}/")
response = post(response, "#{@node2}/", login)
assert response.status_code == 302
response = get(response, "#{@node2}/budget")
assert response.status_code == 200
end)
end
end
In this scenario we start with a network partition to test drive the healing required to get each node in sync after connectivity is restored in the cluster. This will be failing after the heal because right now we don't handle the up or down messages sent when a node becomes connected/disconnected.
To solve this we first explicitly monitor the nodes by altering the logon GenServer.
defmodule Example.Logon do
use GenServer
@impl GenServer
def init(:ok) do
:net_kernel.monitor_nodes(true)
:pg2.join(:example, self())
...
end
end
Now that we are monitoring the logon GenServer will get both `:nodeup` and `:nodedown` messages. For the baseline implementation today we will add `handle_info` and pattern match on the `:nodeup` so we can begin the healing process after we are notified that connectivity was restored. Note: this exact implementation will publish a message for each user and should be considered naive.
@impl GenServer
def handle_info({:nodeup, node}, state) do
for {id, {username, hash}} <- UserCache.all() do
GenServer.cast({__MODULE__, node}, {:merge, id, username, hash})
end
{:noreply, state}
end
When the node goes down this GenServer will receive a `:nodedown` message so to avoid crashing we need to implement the `handle_info` with a generic argument. Alternatively you could match on the `:nodedown` and just not do anything at the moment.
def handle_info(_msg, state) do
{:noreply, state}
end
Registry Incompatible
When I first hacked together the `GenServer.cast` above the logon GenServer itself was registered using my local Registry and a via tuple. I did some searching on github and it seems you cannot send a message to another node like this so I updated my `start_link` to instead use the module name.
defmodule Example.Logon do
use GenServer
def start_link(_args) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
end
Docker
So after all this hard work we've got 2 passing tests and some pubsub but can we deploy this and have it be fully operational? Unfortunately these tests don't drive out the docker-compose changes necessary but to see this work end to end feels like the goal so let's keep going shall we.
First we need to provide a name to the node like our tests did. In my specific setup I'm using a simple bash script called entrypoint.sh. In this file I'm updating the `phx.server` command to take a random string for a name. Note: This deployment setup isn't (yet) using releases but it does work.
#!/bin/bash
str=`date -Ins | md5sum`
name=${str:0:10}
mix run transform.exs
mix phx.digest
mix ecto.create
mix ecto.migrate
mix run priv/repo/seeds.exs
elixir --sname $name --cookie monster -S mix phx.server
Next we need some code that allows each node to communicate "hello, I'm online! Please connect with me". My first attempt was successful using the host filesystem but the database makes for another fine alternative so I'll show that approach instead.
I created a simple database table called `cluster` that will be used to push data in and query for the node to node chatter. Start by creating the schema file for cluster.
defmodule Example.Cluster do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :string, []}
schema "clusters" do
timestamps()
end
def changeset(cluster, attrs) do
cluster
|> cast(attrs, [:id])
end
end
Next create the database migration to insert the new table.
defmodule Example.Repo.Migrations.CreateClusters do
use Ecto.Migration
def change do
create table(:clusters, primary_key: false) do
add :id, :string, primary_key: true
timestamps()
end
end
end
We also need a simple `upsert` function to insert the node name so create the file clusters.exs
defmodule Example.Clusters do
import Ecto.Query, warn: false
alias Example.Repo
alias Example.Cluster
def all do
Repo.all(Cluster)
end
def upsert_by(attrs) do
case Repo.get_by(Cluster, attrs) do
nil -> %Cluster{}
cluster -> cluster
end
|> Cluster.changeset(attrs)
|> Repo.insert_or_update
end
end
And finally, the cluster sync GenServer that will do the heavy lifting to establish connectivity by way of `Node.ping`
defmodule Example.ClusterSync do
use GenServer
@one_second :timer.seconds(1)
@two_seconds :timer.seconds(2)
@ten_seconds :timer.seconds(10)
alias Example.Clusters
def start_link(_args) do
GenServer.start_link(__MODULE__, :ok, name: via(:sync))
end
defp via(name), do: Example.Registry.via(name)
@impl GenServer
def init(:ok) do
Process.send_after(self(), :write, @one_second)
Process.send_after(self(), :query, @two_seconds)
id = Atom.to_string(Node.self)
{:ok, %{id: id}}
end
@impl GenServer
def handle_info(:write, state) do
Clusters.upsert_by(state)
{:noreply, state}
end
@impl GenServer
def handle_info(:query, state) do
%{:id => node} = state
Clusters.all()
|> Enum.map(&Map.from_struct(&1))
|> Enum.filter(fn (%{:id => id}) -> id != node end)
|> Enum.map(fn (%{:id => id}) -> String.to_atom(id) end)
|> Enum.map(&({&1, Node.ping(&1) == :pong}))
Process.send_after(self(), :query, @ten_seconds)
{:noreply, state}
end
end
Now when you run `docker-compose up --scale app=2` you should see 2 connected nodes that publish messages to each other and keep the cluster eventually consistent. Thanks for joining me on this epic adventure! Huge thanks to Chris Keathley and Ben Marx for the inspiration at Lonestar Elixir earlier this year!
You can find the source code from my adventure on github
update: I also put together a Redis implementation that solves the cluster registration problem.