ETL extract transform load using Oban Revelry blog post

In a previous article, we talked about ETL and I gave a short synopsis of various packages used in implementing an ETL pipeline. In this and subsequent articles, I will provide an example application using one of the tools – Oban in this case – and review some of the main aspects of it.

For my various examples, I will try to always accomplish the same task, so that there isn’t any time wasted on trying to understand what is being done – just how it is being done. The goal is to create some data using Faker to generate some superheroes; generate some stats for them (like people saved for a given day, etc.); and create a digest on which superhero is the best. For consistency between runs, I will seed 100 fake superheroes into the database.

While both parts (stats generation and daily digest) will be made into ETL processes here, stats generation in the real world would be collected from receiving API calls or constructing news site crawler. I prefer talking about the daily digest process, unless there is something in the stats generator that is of interest.

The repo is located here for reference.

So, let’s get to it! To begin, I followed the installation directions found here. There is further configuration needed, but that will come later on. Next, I created the context, schema, and migrations for the heroes, stats, and rankings. Again, I won’t go into detail here since the purpose of the app isn’t important, but if you want to know specifics, please reference the repo. Now, the groundwork is set so we can get into examples of the workers.

Defining Worker

Oban.Worker is a behavior module that provides a __using__ macro for defining the worker. I created two workers: StatisticsGenerator and DailyRankingProcessor. Both use the defaults, which as mentioned here, can be set by using use Oban.Worker with no options, but it can be useful to explicitly set them for the sake of documentation.

use Oban.Worker, 
  queue: :default,
  max_attempts: 20,
  priority: 0,
  unique: [],
  tags: []

First, let’s go over where they are used and then what they mean. Oban.Job defines a function called new/2. If we look at the documentation for that function, we see the same keys are available as opts that we can pass in. Oban.Worker has a callback with a default also called new/2. It is actually a helper function that creates a new Job with the options for the worker. It’s all overridable though. Calling DailyRankingProcessor.new(%{}, max_attempts: 10) is perfectly valid. Now for what they mean.

  1. :max_attempts sets how many times an errored job will retry. There is a backoff provided that will keep it from spamming the server.
  2. Oban queues are priority queues, meaning if two jobs are enqueued at the same time, but the second one has a higher priority, it will be processed first. :priority establishes the order with an integer from 0-3; the lower the number, the higher the priority.
  3. :unique can specify ways to prevent duplicate jobs to be inserted. This is a more complex option, but definitely check it out in the documentation.
  4. :tags specifies strings to be added on the row in a jsonb field in the database. As far as I have seen/can tell, the tags don’t have any effect on functionality, but they are useful if you have access to Oban.Web. They can also be used to write queries or help with logging.
  5. I save :queue for last, because it corresponds to additional configuration we have to do. You can insert a Job with any queue you’d like, but if there isn’t a configured queue, it will just sit in the database.

Configuring Queues

When configuring queues, there is a long form and a short form. The long form, events: [limit: 10, paused: true], is only used when starting a queue paused. Otherwise just do events: 10 for the short form. The limit is how many jobs will be run concurrently.

Defining the Mandatory perform/1

Now back to the worker. With use Oban.Worker called, the only thing required to do is define the perform callback. Let’s look at the DailyDigestProcessor:

def perform(%Oban.Job{args: %{"date_id" => date_id}}) do
  case Feats.get_daily_ranking_by_date(date_id) do
    nil -> digest(date_id)
    digest -> {:cancel, "digest already found for this day"}
  end
end

defp digest(date_id) do
  stats = Feats.all_superhero_daily_statistics_for_date(date_id)

  sorted_ids =
    stats
    |> Enum.sort(fn hero1, hero2 -> hero1.people_saved >= hero2.people_saved end)
    |> Enum.map(fn hero -> hero.superhero_id end)

  Feats.create_daily_ranking(%{date_id: date_id, ranked_superhero_ids: sorted_ids})
end

perform/1 takes an Oban.Job as its argument. In this case, I pattern match on a date if given, otherwise it just runs it for the current date. Providing a date is useful if I want to run it on a day when the server wasn’t online for whatever reason. perform/1 needs to return a tuple or an atom, but there are a few options for that return that results in different behavior. The basic case is :ok for a successful run. You can do an ok tuple if you’d like, but it doesn’t affect the lifecycle of a job. If errored, a reason needs to be provided with the error tuple. On an error tuple, the job will retry up to the max_attempts. After the retries are exhausted, the job will move to a discarded state. 

Discard is also one of the return types, but it is soft-deprecated at this point, since it’s only the system that should discard a job. Instead perform should return a cancel tuple if there’s a reason the job shouldn’t be retried. In both cases, it will be in the form of a tuple with the other element being a reason. Lastly, you can snooze a job for a given number of seconds. Rather than undoing the last attempt, the system instead increments the attempt count, as well as the max_attempts. In the example above, there are two different return types. It returns :ok (in a tuple, because I’m a creature of habit) if everything worked as expected, but cancels the job if there are already entries found for the given day. 

What if the error isn’t a Postgrex.Error? In the Executor module responsible for running the job, there is an error-rescue/try-catch block that transforms errors into a proper error tuple. Note: Just generally capturing a general Postgrex.Error and assuming it means there’s already statistics isn’t advisable, but I didn’t want to spend forever on a simple example. As my mother says, “Do as I say, not as I do.”

Enqueuing Jobs

Finally, notice the StatisticsGenerator spawns another job for the DailyRankingProcessor worker.

%{date_id: date.id}
|> DailyRankingProcessor.new()
|> Oban.insert()

This is a pretty common practice. Because of the concurrency limits placed on queues, it’s much easier to not overwhelm the database connection pool than it would be if we were to spawn a Task.async/1. If you have access to Oban.Pro, there is also a Workflow that defines dependencies when adding jobs to the workflow. Dependent jobs must finish before the subsequent jobs start. The other Oban.Pro worker types are again beyond the scope of this article.

Enqueueing Jobs with Cron Plugin

To start the whole process off, we need to enqueue the StatisticsGenerator job. To do this in a daily manner, we will look at the final form of the configuration, specifically the plugins section.

config :oban_example, Oban,
  repo: ObanExample.Repo,
  plugins: [
    {Oban.Plugins.Cron,
     crontab: [
       {"@reboot", ObanExample.StatisticsGenerator},
       {"@daily", ObanExample.StatisticsGenerator}
     ]}
  ],
  queues: [default: 10]

We’ll use the Cron Plugin to define two ways to enqueue the job. The first part of the tuple is what Oban calls a nickname. @reboot enqueues a job at startup of the app. @daily enqueues a job at midnight every day. This would cause duplicate jobs to be done on deploys, but there are strategies for handling it. You can either rely on the second job canceling when it finds entries already made, or set the unique key for a jobs options. The second one is probably more advisable, but the first would be good to do even if the unique key is set.

ETL + Oban

Let’s quickly review what all this has to do with ETL. While both workers technically define an ETL process, the DailyRankingProcessor is the core ETL pipeline. It loads statistics for the day, transforms them by sorting them into an ordered list, and then loads that list into a new database row.

Summary

And that’s all you need to run ETL processes in Oban! This is just scratching the surface of Oban’s functionality, but it’s also all you really need to do most simple tasks. Take a look around the docs (especially the Job and Worker modules) and see what you can do with it!

Want to chat more on this topic? Connect with one of our software strategy, design and development experts. We love this stuff!

We're building an AI-powered Product Operations Cloud, leveraging AI in almost every aspect of the software delivery lifecycle. Want to test drive it with us? Join the ProdOps party at ProdOps.ai.