data pipeline illustration man standing at end of ringed tube tunnel red green yellow

Have you been looking to do something in Elixir outside of the usual Phoenix website? Or do you need to do some data processing but don’t want to learn some other language to do it? This article is meant for you, lets have a look at a data pipeline in Elixir!

What is data processing? It’s a nebulous term meaning changing information on some large scale to fit your needs. Most commonly, it’s used to take data that a user inputs on a site and sanitize it to put into a database. But it can apply to a huge number of different algorithms or processes.

So why do data processing in Elixir? I’ll fully admit it isn’t always the best language for it. Golang is the language I’ve seen with the best raw processing power. Python has an amazing suite of libraries for data science. But you don’t need the best unless you really need to optimize. Elixir is easily good enough to handle the first ten iterations of whatever you’re trying to make!

This article explains concepts and building blocks of making a data pipeline in Elixir. For more in-depth analyses of top level tools or examples, see the bottom of the article for links to other articles of mine. Now let’s get started!

Understanding ETL

The first concept needed to create a data pipeline is ETL, which stands for Extract, Transform, and Load. In theory, ETL is pretty simple, but in practice it can get quite complicated. Let’s tackle the simple case.

Extraction

Extraction is pulling data from a source. It could be your app’s primary database; it could be some third party API. Unfortunately, most commonly it’s just some poorly formatted Excel file someone threw at you. If it’s something that can hold data, it can be a source.

Loading

Loading is inserting that data into a data store. The data store could be the same database the original data came from, an ElasticSearch instance, or part of a machine learning dataset. Like with Extraction, if it can hold data, it can be a destination.

Transforming

Transformation is the process of getting the data to a cleaner state. Once you’ve chosen the destination, there will be some requirements of the data in order to load it in. This could be things like formatting or data normalization. In almost all circumstances the loaded data will be “cleaner” than the extracted data.

Pipeline complexity

Let’s look at a real example of something I worked on:

  • A previous ETL process has taken the contents of a public social media post and inserted it into a database. That database will be our source.
  • Our destination is the same database table, but specifically a single field, the sentiment score. A sentiment score measures how positive the connotations of words are. It is formatted as a float ranging from -1 (very negative) to 1 (very positive).
  • So, therefore, the Transformation had to be a process that took text and boiled it down to a singular float. This can be achieved by checking each word against a database to get a single score for the word, then averaging all scores from the post.
  • (However, in the real-life example, we used a third-party software.)

This example illustrates some of the complexities that can creep into pipelines. Notice that ETL pipelines can be chained together or just the transformations can be chained together. They can also branch; imagine that you do some transformation but then load the results into two different data stores. As previously stated, “in practice it can get quite complicated.”

ETL and GenServers

Whereas ETL exists in all languages, the next principle is Elixir (and Erlang) specific: GenServer. A GenServer is a process that receives messages from other processes, can send messages, and through tail-call optimized recursion holds state. That is a term-dense definition, and there’s not really time to unpack it. Docs can be found here: https://hexdocs.pm/elixir/GenServer.html. Instead, we’ll talk about usage in ETL.

Here are two ways to do a simple ETL in a GenServer:

  1. Have the ETL all contained in one process. The pipeline would start by passing the process a message about where to find the resource and which IDs to grab (if not processing all of the records) and would end with some kind of report.
  2. Use three processes where the output is passed as a message to the next as the inputs. The Extract process could still receive the input that #1 is getting. And the Loading process could also output a report. #2 definitely has more overhead than #1.

GenStage

Like in most things with programming, there’s a layer of abstraction above GenServers, and it’ll be preferable to use the abstraction. In this case, it’s called GenStage.

GenStage has three flavors of GenServers: Producers, ProducerConsumers, and Consumers. If I have communicated clearly, it should be evident how these relate to ETL. A producer can be programmed to pull data from a source. A ProducerConsumer is what you’d use to make a transformer. The Consumer would then take the transformed data and place it into its destination. The out-of-the-box features you get from GenStage are a set way to have the stages talk to each other and back-pressure.

Broadway and Flow

But wait, there’s more! There are two packages built on top of GenStage, Broadway, and Flow. We’ll cover them briefly here and in more detail in their own post as a comparison of their capabilities.

Flow is much simpler but also much easier to use! Under the hood, it is generating GenStage stages on the fly. So all you have to do is use the API, which is similar to Enum or Stream. Here’s a very brief example.

Stream.repeatedly(&:rand.uniform/0)

|> Stream.take(10_000)

|> Flow.from_enumerable()

|> Flow.map(&(&1 * 2))

|> Enum.to_list()

Amazingly simple right? Now, this example is definitely using a sledgehammer to crack a nut. But it doesn’t take much complexity for it to start benchmarking much better than the single-threaded equivalent.

Broadway is built on top of GenStage and offers several more features, such as acknowledgments, batching, fault-tolerance, graceful shutdowns, etc. One of its best features is pre-built producers for some of the most popular sources such as RabbitMQ or Kafka.

Unfortunately, using it is too complex of a process to write here with any amount of succinctness. I’m planning on writing a post for examples of all of these libraries in the future, but in the meantime, you can find the documentation at www.hexdocs.pm/broadway.

Oban

I have an awful truth to tell you now. It’s good to build things yourself to understand how things work, but in a business context, it’s better to just implement out-of-the-box solutions. Just save yourself the trouble of debugging your own code and just use Oban.

Oban stores jobs in a database and has independent workers for each job type. Yes, it means data storage overhead and a slight processing delay as your worker needs to fetch the job, but the persistence even during catastrophic failure and the decoupling of worker logic from job management logic more than makes up for it. There are many more reasons why it’s better, but I’ll just stick to the most important two. It’s easy, and it’s scalable.

Conclusion

Here is how I would break down “when to use what”:

Flow – When you just need to optimize one or two functions that are using Enum and Stream already.

Broadway – When you’re optimizing an ETL pipeline, and you really have stretched Oban to its max. Reach for it when you need to overhaul and optimize something for sheer speed.

Oban – All other times.

I hope this article has been enjoyable and insightful. Go now and build some cool things!

Further Reading

GenStage

Flow

Broadway

Oban

Find more Revelry Elixir Articles here!

 

We're building an AI-powered Product Operations Cloud, leveraging AI in almost every aspect of the software delivery lifecycle. Want to test drive it with us? Join the ProdOps party at ProdOps.ai.