Real-time page refresh using Phoenix LiveView and PubSub

I enjoy IoT development using Elixir programming language, Nerves IoT platform and Phoenix web framework. It is so much fun. I was able to build a real-time temperature and humidity monitoring system for my living room. My API server accepts a sensor measurement then broadcasts the update to all the users of my real-time dashboard.

Today I am going to write about real-time page refresh using Phoenix LiveView and Phoenix PubSub so that I can quickly implement the same thing in the future.

4/3(土) 00:00〜 4/5(月) 23:59開催のautoracex #21での成果です。

日本語版

hello-nerves-2

Implement PubSub utilities in a context module

First of all, I prepare utility functions for subscribing and broadcasting messages in a given context module. Here I add subscribe/0 and broadcast/2 to the Example.Environment context module. I use inspect(__MODULE__) as a topic so that I can ensure that the topic name is unique as well as saving time for decision making on the topic name and discovery of the topic name.

 defmodule Example.Environment do

   ...
+
+  @topic inspect(__MODULE__)
+
+  @doc """
+  Subscribe to this context module's messages.
+  """
+  def subscribe do
+    Phoenix.PubSub.subscribe(Example.PubSub, @topic)
+  end
+
+  @doc """
+  Broadcast a message to the subscribers when something happens.
+  """
+  def broadcast({:ok, record}, event) do
+    Phoenix.PubSub.broadcast(Example.PubSub, @topic, {event, record})
+    {:ok, record}
+  end
+
+  def broadcast({:error, _} = error, _event), do: error
+

Broadcast a message as needed

Now that I have PubSub utility functions, I can broadcast a message when something happens. In the following example, I notify all the subscribers of a newly-inserted measurement record.

 defmodule Example.Environment do

   ...

   @doc """
   Creates a measurement.

   ## Examples

       iex> create_measurement(%{field: value})
       {:ok, %Measurement{}}

       iex> create_measurement(%{field: bad_value})
       {:error, %Ecto.Changeset{}}

   """
   def create_measurement(attrs \\ %{}) do
     %Measurement{}
     |> Measurement.changeset(attrs)
     |> Repo.insert()
+    |> broadcast(:measurement_inserted)
   end

Subscribe to PubSub topic when LiveView is connected

In a LiveView, I subscribe to a PubSub topic using the PubSub utilities prepared above. It is important that the subscription has to be done after the LiveView connecton has been established.

 defmodule ExampleWeb.EnvironmentLive do
   use ExampleWeb, :live_view

   ...

   @impl true
   def mount(_params, _session, socket) do
+    if connected?(socket) do
+      Environment.subscribe()
+    end

     ...

     {:ok, socket, temporary_assigns: [measurements: []]}
   end

Handle PubSub message as needed

Once a LiveView has subscribed to a PubSub topic, it will receive a message whenever a mesage is broadcast on the subscribed topic. I can handle the event, pattern-matching the event in handle_info/2. In this example, an event is a tuple like {event_name, new_record} because that is the format that my broadcast function uses.

 defmodule ExampleWeb.EnvironmentLive do
   use ExampleWeb, :live_view

   ...

+  def handle_info({:measurement_inserted, new_measurement}, socket) do
+    # TODO: do something
+  end

Throttle incoming PubSub messages

I want to refresh the real-time dashboard based on incoming PubSub messages, but at the same time I want to control how often I refresh the dashboard no matter how fast PubSub messages are coming in. In other words, I do not want to overwhelm my LiveView rendering in case messages are coming in ridiculously fast.

So I calculate the time when I want to do the refresh next based on refresh_interval value, ignoring any messages until will_refresh_at has elapsed. I use Timex library so the time-related calculation is human-readable.

   def handle_info({:measurement_inserted, new_measurement}, socket) do
-    # TODO: do something
+    if refresh_interval_elapsed?(socket) do
+      {:noreply, assign(socket, last_measurement: new_measurement)}
+    else
+      {:noreply, socket}
+    end
   end
+
+  # Check if the refresh interval has elapsed. (next_refresh >= now)
+  defp refresh_interval_elapsed?(socket) do
+    next_refresh = DateTime.add(socket.assigns.last_measurement.measured_at, socket.assigns.refresh_interval)
+
+    case DateTime.compare(DateTime.utc_now(), next_refresh) do
+      :gt -> true
+      :eq -> true
+      _ -> false
+    end
+  end

That's it!