Day 9: Using the Elixir Registry to perform locking of resources
In yesterday’s blog post, I shared how I got a worker pool working with a Queue and GenStage. However, there was still one issue with that implementation. When there was more than one (worker which is the case in my setup), it had the potential to run multiple user syncs in multiple workers. Let us look at an example of how this happened.
- A sync event for user #1 arrives in the queue and is dispatched to worker#1.
- Another sync event for user #1 arrives and since the queue is empty, it gets filled up with this user id #1.
- And as soon as the queue has some data it is dispatched to the currently available workers, if we have 2 workers in our setup, this is dispatched to worker #2.
At this point we have 2 workers trying to sync the same user which is a recipe for disaster. One way to work around this is to use database locks provided by postgresql. However, I didn’t want to go down this route if it was possible.
ETS to store current workers
At this point I contemplated using a simple ETS table to store the {user_id, pid}
when a worker starts and removing it when a worker ends.
This would work. However, if a worker crashed after storing a user_id
, it would permanently stop the user_id
’s syncs from being processed.
I would then have to monitor the workers and clean up stuff if something crashed. However, at this point I was leaning more towards using the Registry
as
it did clean up the data associated with a process when it crashed.
Using the Registry
Using the Registry turned out to be much simpler than I thought.
I had to setup a supervisor for it in my Application
supervisor(Registry, [:unique, Danny.SyncQueue.Worker.registry_name]), # create registry to keep track of current worker jobs
Worker code before using Registry for locking
def handle_events([uid], _from, state) do
debug "syncing #{uid} on #{inspect self()}"
UserSync.sync(uid)
{:noreply, [], state}
end
Worker code after using Registry for locking
@registry_name :worker_wip
def registry_name, do: @registry_name
def handle_events([uid], _from, state) do
get_lock_or_re_nq(uid, fn ->
debug "syncing #{uid} on #{inspect self()}"
UserSync.sync(uid)
end)
{:noreply, [], state}
end
defp get_lock_or_re_nq(uid, fun) do
# register ourselves under the uid key
case Registry.register(@registry_name, uid, :ok) do
{:ok, _} ->
# this means, we are good to go ahead and do our processing
fun.()
Registry.unregister(@registry_name, uid) # unregister once our function is done
{:error, {:already_registered, pid} } ->
# someone else is already working with this uid
# let us re enqueue it so that it can be processed later
debug("re nqing on account of another worker: #{inspect pid} processing this job. self: #{inspect self()}")
spawn(fn ->
:timer.sleep(:timer.seconds(1)) # sleep for a second before nqing, to avoid being quickly picked up by another worker
Queue.nq([uid])
end)
end
end
The process, is pretty simple. I let the workers pick up whatever user_ids are available in the queue.
However, when they start processing, I try to register this user_id in the Registry with the current process’s pid.
If this user_id has already been registered with a different process, the Registry returns an {:error, {:alreday_registered, pid } }
.
At which point, I wait a for a second and re enqueue it. I do the waiting in a different process to avoid blocking a worker.
I am close to the half month mark and am hopefully done with the difficult product stuff. I’ll be spending more time trying to get feedback after releasing a private beta. Wish me luck!