Real-time page refresh using Phoenix LiveView and PubSub
I enjoy IoT development using Elixir programming language, Nerves IoT platform and Phoenix web framework. It is so much fun. I was able to build a real-time temperature and humidity monitoring system for my living room. My API server accepts a sensor measurement then broadcasts the update to all the users of my real-time dashboard.
Today I am going to write about real-time page refresh using Phoenix LiveView and Phoenix PubSub so that I can quickly implement the same thing in the future.
4/3(土) 00:00〜 4/5(月) 23:59開催のautoracex #21での成果です。
Implement PubSub
utilities in a context module
First of all, I prepare utility functions for subscribing and broadcasting messages in a given context module. Here I add subscribe/0
and broadcast/2
to the Example.Environment
context module. I use inspect(__MODULE__)
as a topic so that I can ensure that the topic name is unique as well as saving time for decision making on the topic name and discovery of the topic name.
defmodule Example.Environment do
...
+
+ @topic inspect(__MODULE__)
+
+ @doc """
+ Subscribe to this context module's messages.
+ """
+ def subscribe do
+ Phoenix.PubSub.subscribe(Example.PubSub, @topic)
+ end
+
+ @doc """
+ Broadcast a message to the subscribers when something happens.
+ """
+ def broadcast({:ok, record}, event) do
+ Phoenix.PubSub.broadcast(Example.PubSub, @topic, {event, record})
+ {:ok, record}
+ end
+
+ def broadcast({:error, _} = error, _event), do: error
+
Broadcast a message as needed
Now that I have PubSub
utility functions, I can broadcast a message when something happens. In the following example, I notify all the subscribers of a newly-inserted measurement record.
defmodule Example.Environment do
...
@doc """
Creates a measurement.
## Examples
iex> create_measurement(%{field: value})
{:ok, %Measurement{}}
iex> create_measurement(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_measurement(attrs \\ %{}) do
%Measurement{}
|> Measurement.changeset(attrs)
|> Repo.insert()
+ |> broadcast(:measurement_inserted)
end
Subscribe to PubSub
topic when LiveView
is connected
In a LiveView
, I subscribe to a PubSub
topic using the PubSub
utilities prepared above. It is important that the subscription has to be done after the LiveView
connecton has been established.
defmodule ExampleWeb.EnvironmentLive do
use ExampleWeb, :live_view
...
@impl true
def mount(_params, _session, socket) do
+ if connected?(socket) do
+ Environment.subscribe()
+ end
...
{:ok, socket, temporary_assigns: [measurements: []]}
end
Handle PubSub
message as needed
Once a LiveView
has subscribed to a PubSub
topic, it will receive a message whenever a mesage is broadcast on the subscribed topic. I can handle the event, pattern-matching the event in handle_info/2
. In this example, an event is a tuple like {event_name, new_record}
because that is the format that my broadcast
function uses.
defmodule ExampleWeb.EnvironmentLive do
use ExampleWeb, :live_view
...
+ def handle_info({:measurement_inserted, new_measurement}, socket) do
+ # TODO: do something
+ end
Throttle incoming PubSub
messages
I want to refresh the real-time dashboard based on incoming PubSub
messages, but at the same time I want to control how often I refresh the dashboard no matter how fast PubSub
messages are coming in. In other words, I do not want to overwhelm my LiveView
rendering in case messages are coming in ridiculously fast.
So I calculate the time when I want to do the refresh next based on refresh_interval
value, ignoring any messages until will_refresh_at
has elapsed. I use Timex
library so the time-related calculation is human-readable.
def handle_info({:measurement_inserted, new_measurement}, socket) do
- # TODO: do something
+ if refresh_interval_elapsed?(socket) do
+ {:noreply, assign(socket, last_measurement: new_measurement)}
+ else
+ {:noreply, socket}
+ end
end
+
+ # Check if the refresh interval has elapsed. (next_refresh >= now)
+ defp refresh_interval_elapsed?(socket) do
+ next_refresh = DateTime.add(socket.assigns.last_measurement.measured_at, socket.assigns.refresh_interval)
+
+ case DateTime.compare(DateTime.utc_now(), next_refresh) do
+ :gt -> true
+ :eq -> true
+ _ -> false
+ end
+ end
That's it!