Ensuring Data Integrity in Real-time Synchronization: A Phoenix LiveView Tale

In the dynamic landscape of web applications, particularly those dealing with real-time data synchronization between systems, the challenge of handling duplicate requests is not merely theoretical.

This blog post delves into a complex issue I encountered while working on a Phoenix application designed for bidirectional data synchronization between two online enterprise platforms. My journey through identifying and solving the problem of duplicate webhook calls illustrates the importance of idempotent operations in maintaining data integrity and system reliability.

The Challenge of Synchronization

The Phoenix-based OTP application was designed to facilitate seamless real-time data synchronization between two systems. Leveraging Elixir's GenServers for asynchronous data processing, I encountered an unexpected hurdle: duplicate webhook calls, threatening the central database's integrity by risking duplicate records.

Encountering Duplication

The discovery of webhook events firing multiple times for the same data underscored a significant threat to data consistency. My initial architecture, though efficient, lacked a robust mechanism to prevent the processing of duplicate requests.

Crafting the Solution

The solution required a blend of creativity and technical expertise. My goal was to implement a deduplication mechanism that could reliably identify duplicate requests, ensuring that each unique piece of data was processed exactly once. So I created a module that I will be calling Deduplicator moving forward, just for the sake of reference.

The Genesis of the Deduplicator module

Deduplicator emerged from the necessity to intercept and evaluate incoming webhook calls before proceeding with any data manipulation. The module's design was centered around generating unique identifiers for each request based on the request's payload. By serializing the entity part of the request and hashing it, I could create a distinctive fingerprint for each operation.

Tutorial: Implementing the Deduplicator module

Here's how I brought Deduplicator to life, step by step:

  1. Unique Identifier Generation:

For each incoming request, serialize the entity payload into a JSON string and generate a SHA-256 hash. This hash serves as a unique identifier, encapsulating the essence of the request.
Below is a codesnippet for the unique identifier generator function

defp generate_unique_id(entity) do
    encoded_entity =
      entity
      |> Jason.encode!()

    :crypto.hash(:sha256, encoded_entity)
    |> Base.encode16()
end

2. Implementation of the Deduplicator Module:

This module, leveraging Elixir's GenServer and ETS (Erlang Term Storage), is designed to ensure idempotent operations, preventing duplicate data processing. Here's a deeper dive into its implementation and integration within our Phoenix application.

The GenServer Foundation

The Deduplicator module begins its life as a GenServer, a cornerstone of Elixir applications for maintaining state and executing background work asynchronously. Using GenServer allows Deduplicator to run continuously in the background, monitoring for duplicate requests.

defmodule Deduplicator do
  use GenServer

 def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
end

The init/1 function

It serves as the foundational setup for the Deduplicator module. Upon the GenServer's initialization, this function is called to perform essential setup tasks crucial for the module's operation.

def init(_) do
  :ets.new(:dedup_table, [:set, :public, :named_table])
  {:ok, %{}}
end

The line :ets.new(:dedup_table, [:set, :public, :named_table]) is instrumental in establishing an Erlang Term Storage (ETS) table named :dedup_table. This table is configured with a few options:

  • :set: This option ensures that the table behaves as a set, meaning each entry is unique based on its key. This is crucial for our deduplication logic, as it allows us to store each request's unique identifier without duplicates.
  • :public: This option makes the table accessible to all processes, enabling different parts of the application to query or update the deduplication status of requests.
  • :named_table: This allows the table to be referenced by its name, :dedup_table, facilitating easier access throughout the application.

Marking Requests as Processed

When a request is processed, its unique identifier is stored in the ETS table along with the current system time. This marks the request as processed, preventing future duplications.

def mark_as_processed(unique_id) do
  :ets.insert(:dedup_table, {unique_id, :erlang.system_time()})
end

The mark_as_processed/1 function is a pivotal part of the MyDeduplicator module, encapsulating the mechanism that records the processing of requests to prevent duplicate handling. This function demonstrates an effective use of Elixir's Erlang Term Storage (ETS) to maintain the idempotency of operations within our application.

At the heart of this function lies the :ets.insert/2 call, which adds a new record into the :dedup_table ETS table. Each record is a tuple consisting of two elements: the unique_id of the request and the current system time captured by :erlang.system_time().

  • Unique Identifier: The unique_id serves as the key for the record. It is a hash derived from the request's payload, ensuring that each request can be uniquely identified based on its content. This uniqueness is crucial for detecting and preventing duplicate processing of the same request.
  • Timestamp: The inclusion of the current system time as the second element of the tuple serves a dual purpose. First, it timestamps when the request was processed, providing traceability. Second, it facilitates the cleanup process, allowing the system to determine which records are old and should be removed based on their age.

Checking for Duplicates

Before processing any request, Deduplicator checks the ETS table to see if the request's unique identifier already exists, indicating it has been processed.

def already_processed?(unique_id) do
  case :ets.lookup(:dedup_table, unique_id) do
    [{^unique_id, _timestamp}] -> true
    _ -> false
  end
end

The already_processed?/1 function is a critical component of the MyDeduplicator module, serving as the gatekeeper in the deduplication strategy. This function scrutinizes requests to determine if they have been processed before, thus preventing redundant operations on the same data.

Here's a closer look at its implementation and significance:

  • ETS Lookup: The function begins with an :ets.lookup/2 call, querying the :dedup_table ETS table for a record matching the provided unique_id. This unique_id is a hash derived from the request's payload, ensuring each request can be uniquely identified.
  • Match Found: If the lookup returns a tuple matching the unique_id, the function interprets this as the request having been processed before. The presence of this record in the table indicates that the specific data payload associated with this unique_id has already been handled, signaling the function to return true.
  • No Match Found: Conversely, if no matching record is found in the ETS table, the function concludes that the request has not been processed previously and returns false. This outcome indicates that it is safe to proceed with processing the request, as there is no risk of duplicating effort or data.

Cleaning up ETS to save up on memory usage

In any application that relies on in-memory storage for rapid data access and manipulation, managing memory usage efficiently is paramount. This is particularly true for our MyDeduplicator module, which utilizes Erlang Term Storage (ETS) to keep track of processed requests and prevent duplicates. However, without proper management, the memory consumed by the ETS table could grow indefinitely, potentially degrading system performance over time. To address this concern, we've implemented a cleanup mechanism designed to periodically remove old entries from the ETS table, thereby conserving memory and maintaining optimal performance.

Implementing Periodic Cleanup

The cleanup process is orchestrated through two primary functions: schedule_cleanup/0 and handle_info/2. Here's how they work together to ensure the ETS table remains efficient and does not grow unbounded:

defp schedule_cleanup do
  Process.send_after(self(), :cleanup, @cleanup_interval)
end
  • Scheduling Cleanup: The schedule_cleanup/0 function leverages Process.send_after/3 to schedule a message (:cleanup) to be sent to the GenServer itself after a predefined interval (@cleanup_interval). This periodic messaging acts as a trigger for the cleanup operation, ensuring that the process is automatically repeated at regular intervals.

Handling the Cleanup Process

When the GenServer receives the :cleanup message, it triggers the handle_info/2 function, which is responsible for the actual cleanup logic:

def handle_info(:cleanup, state) do
  current_time = :erlang.system_time()

  :ets.tab2list(:dedup_table)
  |> Enum.each(fn
    {id, timestamp} when current_time - timestamp > @ttl ->
      :ets.delete(:dedup_table, id)

    _ ->
      :noop
  end)

  schedule_cleanup()
  {:noreply, state}
end
  • Executing Cleanup: Upon receiving the :cleanup message, this function retrieves all entries from the :dedup_table ETS table and iterates over them. Each entry is assessed to determine if its timestamp (indicating when it was added to the table) is older than the allowed time-to-live (@ttl). If an entry is found to be older, it is removed from the table, freeing up the memory it consumed.
  • Recurrence of Cleanup: After performing the cleanup, the function calls schedule_cleanup/0 again to ensure that the cleanup operation continues to run at regular intervals, thus maintaining the ongoing efficiency of the ETS table.

Integrating Cleanup with Initialization

To kickstart the cleanup process when the Deduplicator GenServer is initialized, we include the schedule_cleanup/0 call within the init/1 function:

def init(_) do
    :ets.new(:dedup_table, [:set, :public, :named_table])
    schedule_cleanup()
    {:ok, %{}}
end
  • Ensuring Immediate Effectiveness: By invoking schedule_cleanup/0 during initialization, we ensure that the cleanup mechanism is active right from the start, preventing the ETS table from ever becoming a memory concern.

Now, the final version of the Deduplicator module:

defmodule Duplicator do
  use GenServer

  @cleanup_interval :timer.minutes(1)
  @ttl :timer.hours(1)

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def init(_) do
    :ets.new(:dedup_table, [:set, :public, :named_table])
    schedule_cleanup()
    {:ok, %{}}
  end

  def mark_as_processed(unique_id) do
    :ets.insert(:dedup_table, {unique_id, :erlang.system_time()})
  end

  def already_processed?(unique_id) do
    case :ets.lookup(:dedup_table, unique_id) do
      [{^unique_id, _timestamp}] -> true
      _ -> false
    end
  end

  defp schedule_cleanup do
    Process.send_after(self(), :cleanup, @cleanup_interval)
  end

  def handle_info(:cleanup, state) do
    current_time = :erlang.system_time()

    :ets.tab2list(:dedup_table)
    |> Enum.each(fn
      {id, timestamp} when current_time - timestamp > @ttl ->
        :ets.delete(:dedup_table, id)

      _ ->
        :noop
    end)

    schedule_cleanup()
    {:noreply, state}
  end
  
  def generate_unique_id(entity) do
    encoded_entity =
      entity
      |> Jason.encode!()

    :crypto.hash(:sha256, encoded_entity)
    |> Base.encode16()
end
end

Utilizing the Solution in a Phoenix LiveView application

Integrating MyDeduplicator with a Supervisor

To ensure Deduplicator's resilience and reliability, it's integrated into my application's supervision tree. This guarantees that Deduplicator is automatically restarted in case of failures, maintaining the application's robustness.

defmodule Deduplicator.Supervisor do
 use Supervisor

  def start_link(arg) do
    Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
  end

  @impl true
  def init(_arg) do
    children = [
      {Duplicator, []}
    ]

    # Define the restart strategy
    opts = [strategy: :one_for_one, name: DeduplicatorSupervisor]
    Supervisor.init(children, opts)
  end
end

This Supervisor oversees Deduplicator, utilizing the :one_for_one strategy, which specifies that if the GenServer crashes, it will be the only process to be restarted.

Adding Deduplicator to the Phoenix Application's Supervision Tree

Integrating Deduplicator into the application's main supervision tree ensures it's started at launch, ready to deduplicate requests from the get-go. This is achieved by modifying the application's root supervisor to include Deduplicator's Supervisor as a child.

Utilizing Deduplicator in the Controller

With Deduplicator operational, we modify our Phoenix controller to leverage it for handling potential duplicate requests. Before processing any data, we check if it has already been processed, ensuring idempotency.

def handle_duplicates(%{assigns: _assigns} = conn, %{} = params) do
    entity = params["entity"]

    unique_id = Deduplicator.generate_unique_id(entity)

    if Deduplicator.already_processed?(unique_id) do
      respond(conn)
    else
      Deduplicator.mark_as_processed(unique_id)

      process_entity(%{} = entity)
      respond(conn)
    end
end

Lessons Learned and Concluding Thoughts

This journey illuminated the critical role of idempotency in ensuring data integrity across distributed systems. The development of Deduplicator not only solved my immediate challenge but also enriched my architectural approach, emphasizing resilience and reliability.

As I move forward, the insights gained from this experience will inform my future architectures, emphasizing the power of Elixir and Phoenix in building robust, fault-tolerant applications. For fellow engineers navigating similar challenges, I hope this account serves as both a guide and an inspiration.

Ciao 👋