Cookies
Diese Website verwendet Cookies und ähnliche Technologien für Analyse- und Marketingzwecke. Durch Auswahl von Akzeptieren stimmen Sie der Nutzung zu, alternativ können Sie die Nutzung auch ablehnen. Details zur Verwendung Ihrer Daten finden Sie in unseren Datenschutz­hinweisen, dort können Sie Ihre Einstellungen auch jederzeit anpassen.
Engineering

Building a replicated cache in 65 lines using ConCache and Phoenix.PubSub

6
Minuten Lesezeit
A treasure chest with outgoing arrows to three other treasure chests
Christoph Grothaus

ConCache is a lightweight, performant caching library on top of ETS. But it is confined to a single Elixir node. Phoenix.PubSub is a Publisher/Subscriber service that lets you broadcast messages to all nodes of your Elixir cluster. Building on the two libraries, we can create a replicated cache in only 65 lines of code.

Use Case

But first let us take a step back and have a look at our use case. The caching requirements are quite simple. For our Phoenix app, authorization is handled by a backend system that is not under our control. We get a user's permissions from it, and that request takes 500 to 700 ms. Sad, but true. Unfortunately, we need the permissions to authorize almost every user request to our app. We cannot afford to query them every time. Luckily, they are a perfect fit for caching: they are small and they change very rarely. We can safely cache them for many minutes.

Now, you could argue, why not use existing Elixir caching libraries that already support replication or distribution, like Cachex or Nebulex? Both are really powerful tools. But that has a drawback. They are complex, it takes more mental load to understand them, compared to a simpler tool. Sometimes less is more. Therefore, we decided to "teach" <code>ConCache</code> how to replicate values across the cluster, using <code>Phoenix.PubSub</code>.

Requirements

  • Cache small pieces of data that are costly to produce.
  • Replicate writes across the cluster.
  • Replicate deletions across the cluster.

Explicitly not a requirement:

  • Replicate existing cache values to a new Elixir node connecting to the cluster.

Context:

  • More reads than writes, no high write throughput.

Solution

From ConCache's expansive toolkit, our app specifically requires two functions: get_or_store/3 and fetch_or_store/3. Both read values from the cache or, should they not exist, produce them on the fly and insert them into the cache. They are very similar, they only differ in the fact that <code>fetch_or_store</code> handles <code>{:error, reason}</code> tuples. Furthermore, we need delete/2.

We build a <code>ReplicatedConCache</code> module that is a wrapper around <code>ConCache</code>. Our app code only interacts with the wrapper, but we don't entirely hide <code>ConCache</code>. Instead, we assume that a <code>ConCache</code> instance is configured and started in the app's <code>Application</code> module, and that this is done consistently (with the same configuration) on every node.

The replication is built using <code>Phoenix.PubSub</code>. Every Phoenix app generated by the default generator already includes a PubSub system. The sender side does broadcast/4 a message to a topic, and the receiver side is a <code>GenServer</code> that does subscribe/3 to the topic and then <code>handle_info/2</code> the incoming messages.

This is the complete code for <code>ReplicatedConCache</code>:

defmodule MyApp.ReplicatedConCache do
  @moduledoc """
  Utility to replicate local ConCache operations via Phoenix.PubSub.
  Only those ConCache functions that we need are replicated.

  Prerequisites:
  - for all relevant cache ids, there is a ConCache instance running on
    each node connected to the PubSub system
  - these ConCache instances are all configured same (TTL etc.)

  Usage:
  - start up the ReplicatedConCache process, e.g. by adding it to the list
    of children of your OTP application
  - use ReplicatedConCache functions as a drop-in replacement of ConCache
    functions

  Notes:
  - when a new elixir node connects to a cluster, the existing ConCache
    entries are NOT replicated
  - the server implementation (which receives the replication messages
    via PubSub) is a single GenServer process, it is NOT optimized for
    high throughput
  """

  use GenServer
  alias Phoenix.PubSub
  require Logger

  @pubsub_system MyApp.PubSub
  @topic "replicated_con_cache"

  ##### Client implementation #####

  # `fetch` is local, `store` is replicated
  def fetch_or_store(cache_id, key, producer_fun) do
    ConCache.fetch_or_store(cache_id, key, fn ->
      with {:ok, value} <- producer_fun.() do
        broadcast_put(cache_id, key, value)
        {:ok, value}
      end
    end)
  end

  # `get` is local, `store` is replicated
  def get_or_store(cache_id, key, producer_fun) do
    ConCache.get_or_store(cache_id, key, fn ->
      value = producer_fun.()
      broadcast_put(cache_id, key, value)

      value
    end)
  end

  def delete(cache_id, key) do
    ConCache.delete(cache_id, key)
    broadcast_delete(cache_id, key)
  end

  defp broadcast_put(cache_id, key, value),
    do: broadcast_message({:put, node(), cache_id, key, value})

  defp broadcast_delete(cache_id, key),
    do: broadcast_message({:delete, node(), cache_id, key})

  defp broadcast_message(message) do
    log_debug("broadcast message #{message |> inspect()}")
    PubSub.broadcast(@pubsub_system, @topic, message)
  end

  ##### Server implementation #####
  
  def start_link(_) do
    GenServer.start_link(__MODULE__, nil)
  end

  def init(_) do
    log_debug("init - subscribing to PubSub '#{@pubsub_system}' topic '#{@topic}'")

    case PubSub.subscribe(@pubsub_system, @topic) do
      :ok ->
        log_debug("init - subscribed to PubSub")
        {:ok, nil}

      {:error, reason} ->
        {:stop, reason}
    end
  end

  def handle_info({:put, sender_node_name, cache_id, key, value} = message, _) do
    unless sender_node_name == node() do
      log_receive(message)
      ConCache.put(cache_id, key, value)
    end

    {:noreply, nil}
  end

  def handle_info({:delete, sender_node_name, cache_id, key} = message, _) do
    unless sender_node_name == node() do
      log_receive(message)
      ConCache.delete(cache_id, key)
    end

    {:noreply, nil}
  end

  ##### Shared implementation #####
  defp log_receive(message),
    do: log_debug("received message #{message |> inspect()}")

  defp log_debug(message), do: Logger.debug("#{__MODULE__}: #{message}")
end

Have a look at <code>fetch_or_store</code> and <code>get_or_store</code>: the reading part is local. It relies on the wrapped <code>ConCache</code> to look up the cache key. When that key does not exist, the <code>producer_fun</code> lambda comes into play. Normally, <code>ConCache</code> uses it to produce the value, cache it under the given cache key, and return the value as the result. But we wrap it in another lambda, produce the value, and first broadcast it to other nodes in the cluster, before handing it back to <code>ConCache</code> to store it locally. Thus, the writing part is replicated. Side note: <code>fetch_or_store</code> adheres to the semantics of its <code>ConCache</code> counterpart, it only broadcasts <code>{:ok, value}</code> results.

<code>delete</code> is always replicated.

That's all for the client implementation. Now to the server implementation. During <code>init</code>, it subscribes to the <code>PubSub</code> topic. Then it only needs to handle two kinds of messages, either <code>:put</code> or <code>:delete</code>. That's all.

Let's clarify one more detail: the messages that we construct contain the sender <code>node()</code> name. On the receiver side, we only act on a message if we are not on the same node as the sender. Why is that? Because otherwise we would write to the cache or delete from the cache twice. Once in the process that calls the client interface function like <code>fetch_or_store</code> (which typically is a Phoenix request handler process), and once in the <code>ReplicatedConCache</code> <code>GenServer</code> process.

To bring it to life, you have to start a <code>ReplicatedConCache</code> instance alongside a <code>ConCache</code> instance in your application's supervision tree.

Conclusion

Overall, our solution is straightforward, and it does its job very well. For our use case, where write throughput is low, and values are small, it reliably replicates the values to the caches on all cluster nodes.

We also have extensive tests for it. You can find them, together with the implementation shown above, in this Gist.

Feel free to share your thoughts or any modifications you've made with us. Your feedback is very welcome!

Partner für digitale Geschäftsmodelles

Ihr sucht den richtigen Partner für eure digitalen Vorhaben?

Lasst uns reden.