Luigi the 10x Plumber: Containerizing & Scaling Luigi in Kubernetes

May 14, 2019 by Walter Menendez

Here at GIPHY, we love data. We love it because it lets us make data-driven decisions that improve the quality of results our users get from our search engine.

The more search traffic on GIPHY, the more insight we have into what’s popular and trending. We use these insights/signals to fine-tune search results we serve across desktop, mobile, or one of our integrations. In this way, we deliver the most relevant content and customize search experiences!

As you can imagine, with the volume of search traffic we get (over 500M daily active users), computing things like click through rate or A/B test performance in efficient ways quickly becomes unwieldy. At GIPHY we rely heavily on Amazon Elastic MapReduce (EMR) jobs to help us crunch the numbers. We run a lot of EMR jobs to send our search data through different computations and data transformations, often taking the output of one calculation and making it the input of another. To help us with these complex chains of work: Luigi!

Luigi is Spotify’s open source framework for task pipeline management. We like Luigi for a number of reasons. Primarily, we love how it’s written in Python and how easy and simple it is to use. When our data ecosystem first started, we leveraged Luigi’s simple interface and established a series of crons to kick off entire data pipelines, as recommended in its documentation. These crons were responsible for running our tasks based on how often we’d like them to run: hourly, daily, and every 15 minutes. In the beginning, there were only a handful of tasks that ran in each time frame, so the naive cron-based solution worked really well!

However, as we’ve added more and more task pipelines to handle more and more data, this cron-based approach started showing growing pains.

In particular:

– All our tasks were often grouped together in what is known as a sink task; a simple wrapper task that simply executes a list of tasks. A sink task would consider itself done only when all its listed dependent tasks were done, executing as many as it could. Through these sink tasks, any failures wouldn’t get retried until the next run, leading to delays in pipelines if shared dependencies were failing.

– Any runtime errors that occured in a sink task prevented the rest of the sink task’s dependencies from running. Given that the only commonality a lot of these tasks shared was frequency of execution, this vulnerability meant unrelated tasks prevented each other from running.

– The first thing that Luigi does before executing any task is check which tasks   – and their dependency chains – in the given list of required tasks are done. As that list of tasks has grown, Luigi spends upwards of 15 minutes just checking task history!

On top of these limitations, our Luigi infrastructure existed on non-containerized provisioned AWS EC2 instances. Legacy infrastructure creates additional issues, especially in terms of inconsistent environment configuration and individual server resource contention. Not to mention, since we were writing Python for Luigi, it was difficult maintaining clean installs of all our Python packages across all users with access to those boxes.

With these problems in mind, we resolved to do two things: first, separate the logic of deciding what tasks to run from executing the task itself; then containerize our Luigi infrastructure.

To handle the problem of managing current task state, we implemented an AWS SQS-based task queue. To handle the actual task execution, we implemented a message-triggered task worker.

The task queue continuously checks the current state of all our tasks,  then enqueues the needed tasks and their corresponding parameters to SQS.  In turn, the task worker is listening for messages from SQS and, upon receiving a message, will deserialize it to know which pipeline needs to be run with the specified parameters.

Sitting between both of these components is a separate container that is running the core Luigi scheduler daemon. The task queue pings the scheduler’s HTTP task API to get the current state of running and pending tasks, which gets updated as task workers register their current workload with the scheduler daemon.

As a result, we’ve seen a number of improvements:

– We’ve been able to run more tasks more frequently, going from a previous theoretical and somewhat arbitrary cap of 32 tasks to an order of magnitude more.

– We no longer have to worry about any lingering artifacts from previous run throughs because the Python runtime environment is created fresh for each run.

– Tasks are now more testable and inspectable. This improvement comes from moving to a Kubernetes container infrastructure, which may restart and interrupt pods that Luigi is running on at any time.

– Tasks can fail in isolation without interrupting the runs of others.

– We can very easily scale the number of workers to handle changes in workload, eg catching up after an outage.

There are still a number of things we can improve on:

– When we rolled out this initial implementation, the team had not yet developed enough K8s experience. Now that we know more about the platform, we’d like to look into using K8s jobs, inspired by the Luigi 3rd party source code for running tasks inside of K8s.

– In the case of a backlog, it is possible that one entire pipeline can take over the workers. We can solve this problem by dedicating workers to certain task types.

Overall, we love Luigi for being a powerful, simple-to-use Python-based task orchestration framework. We can’t wait to scale it even further to give you even more GIFs!

-Walter Menendez, Software Engineer, Data Engineering