5 Fantastic Data Pipeline Orchestration Tools For R

Photo by Daria Nepriakhina 🇺🇦 on Unsplash
Photo by Daria Nepriakhina 🇺🇦 on Unsplash

The data pipeline orchestration tool is critical for producing healthy and reliable data-driven decisions. R is one of the popular languages for data scientists. With R’s exceptional packages, the R programming language is great for data manipulation, statistical analysis, and visualization.

One pattern that often brings data scientists’ R local script to production is to rewrite using Python or Scala (Spark), then schedule the data pipeline and model building via modern data pipeline orchestration tools like Apache Airflow.

However, many modern data orchestration projects like Apache Airflow, Prefect, and Luigi are Python-based. Can they work seamlessly with R? Can you write in R to define a DAG? In this article, let’s explore the popular data pipeline orchestration tool for R scripts and review which fits your use case.

The Key Components of the Successful Data Pipeline Orchestration

Data pipeline orchestration can be broken down into three main components from my experience: DAG (dependencies), Scheduler, and Plugins.

DAG(Directed acyclic graph)

Dag defines the blueprint of the data pipeline. It provides a direction for the execution path. At the same time, you can track back the dependencies by looking at the dag.

Why is DAG crucial for the success of any data pipeline? It is because working with data needs to have an ordering to extract the insights from the data. This order cannot be changed from a business rule perspective. Otherwise, the output from the data is useless or errors out.

Looking at each node in the directed acyclic graph as an individual function, DAG provides an alignment that the current node has to follow the rules defined by upstream nodes. For example, the current node is only triggered if all the upstream nodes are successful; or the current node is executable when one of the upstream nodes fails.

The DAG conveniently provides a view of data lineage. It enhances visibility while significantly streamlining the ability to trace errors in the data pipeline. When it comes to the time that the data pipeline encounters the unpleasant error for the morning on-call, the instance of the DAG execution can quickly point out where the error occurs.

The power to visualize DAG as a blueprint and its instance at run time to check the job running status is crucial nowadays for any data orchestration tools. 

Scheduler

Scheduler is the driver for executing the data pipeline. A scheduler can be simple as a cron job. A more complex scheduler involves building your own, like the one in Airflow, which manages all the task states and aggressively snapshots.

What does the scheduler do? The scheduler is a daemon, which can be treated as a background process. It is supposed to run 24/7 and monitor a time or event when it reaches that point. If the time or event for scheduling is called, execute the task and watch for the next one.

Scheduling Cycle | Image By Author
Scheduling Cycle | Image By Author

Plugins

Plugins are for extensibility and are considered as the potential for the data pipeline. It’s common to leverage existing packages instead of reinventing the wheel. The richness of the plugins of the orchestration tools can save you time to concentrate on business logic than spending days googling & coding “How to write script submit a Spark job to EMR?” 

If a data orchestration tool grows, it attracts more vendors and additional community developers to add more plugins to draw additional users. It is also expensive to migrate to other data pipeline orchestration tools. 

Options Not Cover

  • taskscheduleR: a Windows-specific scheduler with the Windows task scheduler. If you are on Windows, definitely an option to explore.
  • https://github.com/kirillseva/ruigi — It is an admirable attempt. However, the project seems idle, and no more activities since May 26, 2019 Git

Explore Data Pipeline Orchestration Tools For R

We will discuss the following 5 different tools, which fit distinct use cases. 

  1. cronR
  2. targets
  3. Kestra
  4. Apache Airflow
  5. Mage

1. cronR

One of the key components for successful data pipeline orchestration is scheduling. A scheduler gives you peace of mind to run a data pipeline without human intervention. To schedule an R script, cronR is the first solution you’d explore. 

The package enhanced a set of wrappers to crontab make adopting more straightforward using R only. Thus, you don’t need to worry about setting up the crontab, and the cronR provides an interface that reduces the complexity. 

				
					library(cronR)
f   <- system.file(package = "cronR", "extdata", "helloworld.R")
cmd <- cron_rscript(f)
## schedule R script daily at 7 am
cron_add(
  command = cmd,
  frequency = 'daily',
  at = '7AM',
  id = 'my_first_cronR',
  description = 'schedule R script daily at 7 am'
)
## schedule same R script every 15 mins
cron_add(cmd,
         frequency = '*/15 * * * *',
         id = 'my_second_cronR',
         description = 'schedule same R script every 15 mins')
				
			

Usage Suggestion

This option is a quick and lightweight solution if you only want to schedule the R script. It is suitable for use cases such as ad-hoc scheduling, simple dependencies, or less state management involved. 

Limitation

Since cronR only gives you a scheduler. You’d need to build workflow dependencies by yourself. It’s doable if a single R script is manageable. However, if the script’s size becomes enormous and intermedia stages are required, you’d want to break it down. Those are the times cronR isn’t sufficient as it only handles the scheduling part without DAG definition and plugins.

2. targets

The targets package is a Make-like pipeline toolkit for Statistics and data science in R.

targets is initiated as an R programming language for data pipelines. You can easily define a DAG to create the dependencies graph. The main goal for targets is to provide reproducible workflow. It doesn’t come with a scheduler, but connecting with the other tools I mentioned here shouldn’t be challenging to schedule the DAG generated from targets. 

				
					# functions
get_data <- function() {
  print("getting data")
}
transform_data1 <- function() {
  print("transforming data 1")
}
transform_data2 <- function() {
  print("transforming data 2")
}
loading_data <- function() {
  print("loading data")
}
# _targets.R file
library(targets)
tar_option_set(packages = c("readr", "dplyr", "ggplot2"))
list(
  tar_target(extraction, get_data()),
  tar_target(transform_data, transform_data1_1(extraction)),
  tar_target(transform_data2, transform_data2(extraction)),
  tar_target(loading_data, loading_data(transform_data, transform_data2))
)
				
			

The pipeline and the function definition are divided into two R files. You can build the dependencies tree within _targets.R file and visualize it by tar_visnetwork() and get a DAG automatically.

targets visualize DAG | Image by Author
targets visualize DAG | Image by Author

Usage Suggestion

You’d need native support utilizing R for the data pipeline. The targets package can help R users to improve their efficiency in their day-to-day data analysis work. It doesn’t require an additional daemon running in the background to get a DAG for visualization and running on demand. Suppose you are looking for a solution you can run manually and seek a DAG management solution. targets is an excellent option for R users.

Limitation

If you decide to go with targets, you have a powerful DAG management tool. Nevertheless, you’d need to get a scheduler and additional plugins to make it a data pipeline orchestrator in production. Combining with a scheduler like cronR can give you a purely R solution. 

3. Kestra

Kestra is a generic data pipeline orchestration tool. It currently only supports three types of scripts: Bash, Node, and Python

Although the R language isn’t included as the supported script. You can still achieve the goal of orchestrating the R script by using the bash script with Rscript command. 

				
					id: "r_script"
type: "io.kestra.core.tasks.scripts.Bash"
commands:
- 'Rscript my_awesome_R.R'
				
			

A more complex DAG can be set up using Flowable Task in the YAML file. The scheduling is also done in the YAML file by Schedule

Usage Suggestion

As R users, Kestra allows you to orchestrate the R code via the bash command. Additionally, Kestra has the flexibility to run the data pipeline in various languages. It shows you a feeling of a modern version of Oozie. If you are familiar with Oozie, Kestra should be much simpler to onboard.

Limitation

Running R isn’t natively supported. Retrieving the metadata at run time isn’t trivial for just using Rscript command. Everything is based on type , finding the proper core or plugin to develop more effectively might take extra time to learn. 

4. Apache Airflow

Airflow is by far the most popular data pipeline orchestration tool. However, to write a DAG, you’d need to write Python. You can use the Airflow operator to execute your R script. However, to define DAG, R is not part of the implementation. 

The Airflow community had a proposal to create an ROperator. The proposal is to leverage rpy2, which creates an interface for R running embedded in a Python process. The core idea is to pass the r_command, then copy the R script to a temporary file and source it.

There are many people interested in this pull request from the R users. 

However, it didn’t land. Scanning through the pull request. We noticed the R language support isn’t something Airflow CI does at that point, and other options could execute the R scripts, so the priority to have a ROperator is low. There are a few options if you’d want to stick with the Airflow ecosystem. 

  • Use BashOperator and to run R code. If you can run your R script from the terminal, the BashOperator meets that requirement. Using the BashOperator also makes it easy to build the DAG relationship and pass arguments to bash_command , and add more complex logic like retry and email alert
				
					run_this = BashOperator(
    task_id="my_first_r_task",
    bash_command="Rscript my_awesome_R.R",
)
				
			
  • Run R in Docker containers (see rocker) and use the DockerOperator . This option is similar to BashOperator . A nice thing about using a Docker container is you spend less time configuring R script to execute within the same environment as Airflow. The Docker container gives you a fresh environment for R to run each time. It’s a nice and clean solution. 
  • [If you still want to have a dedicated ROperator] you can copy and paste r_operator.py from the pull request above and make them add them from your Airflow infra team.

Similar situation for Prefect. it has an open pull request without resolution, and the PR has been marked as low priority. 

Many modern data pipeline orchestrations are built using Python. However, when it comes to leveraging another popular data-related language like R, the possibility of prioritizing it becomes low as it didn’t start with other languages while designing it. Later, the project becomes too big to make fundamental changes and requires tremendous effort from the ground.

Usage Suggestion

This option is good if you already have Airflow infrastructure as the data pipeline orchestrator. Airflow provides the three key elements of a successful data pipeline orchestration platform. You have the foundation and resources set up, and running R is possible with the abovementioned options. 

Limitation

R isn’t the first citizen in Airflow. In Airflow, running R isn’t natively supported. You have multiple workarounds, but R is still treated as a foreign language. Whether you decide to go with BashOperator or DockerOperator or even fork that PR, there is still additional support you’d need to get to the data infra team to help you to make the R script runnable in Airflow. 

Another limitation is that it is not straightforward to pull the Airflow macros (Airflow’s metadata) at run time with R. You can still use a nontrivial solution by querying Airflow’s backend. However, it isn’t user-friendly for R users without depth knowledge of Airflow.

5. Mage

Mage is the new player in the data pipeline orchestration. The biggest win for our discussion is that Mage recognizes R as part of its support language by default and enables users to define DAG regardless of the choice of languages (python/SQL/R at this point). 

This is a milestone for R users. Users who love R don’t have to switch to Python syntax when they define a DAG that wraps the R script in a limited supported tool. 

Mage allows users to write the main ETL (Extraction, Transformation, and Loading) blocks using R. Mage constructs the DAG by maintaining DAG dependencies relationships in a YAML file. This becomes a flexible option by bypassing the choices of programming language. You can also visualize the DAG along with the R code block while developing your DAG

Mage Pipeline Edit Mode Using R | Image by author
Mage Pipeline Edit Mode Using R | Image by author

Below are the 3 main blocks I used to demonstrate how to use R to code the pipeline and build the DAG in Mage. Furthermore, you can access the scheduler metadata like execution_date easily in R. 

				
					## Data Loader (Extraction)
## You can download dataset here https://www.kaggle.com/datasets/yanmaksi/big-startup-secsees-fail-dataset-from-crunchbase
load_data <- function() {
    df <- read.csv(file='~/Downloads/big_startup_secsees_dataset.csv')
    ## Access scheduler metadata or user defined variables
    ## This part is powerful that you can access data orchestration metadata at runtime
    df['date'] <- global_vars['execution_date']
    df
}
## --------------------------------------------------------------##
## Trasformation (Transformation)
library("pacman") ## install pacman before
p_load(dplyr) ## dplyr makes it easier to recognize the dataframe column
transform <- function(df_1, ...) {
    ## filter on USA startup
    df_1 <- filter(df_1, country_code == 'USA')
    df_1
}
## --------------------------------------------------------------##
## Data Exporter (Loading)
export_data <- function(df_1, ...) {
    # You can write to file to locally
    write.csv(df_1, "~/Downloads/usa_startup_dataset.csv")
}
				
			

Once the pipeline has been developed in Mage, you can attach a trigger by scheduling the pipeline with a crontab. 

Mage Pipeline Scheduling | Image by author
Mage Pipeline Scheduling | Image by author

Internally, Mage still uses Python as its core and parses R script into a tmp file, then run the Rscript command with that file.

				
					subprocess.run(
    [
        'Rscript',
        '--vanilla',
        file_path
    ],
    check=True,
)
				
			

If you’d want to learn more about Mage as an alternative to Apache Airflow, I wrote an article on it. 

Usage Suggestion

R becomes the first citizen in Mage. You can write the R blocks and access the scheduler metadata smoothly without worrying about how to inject or query the backend. Developing in Mage is also interactive. Engineers can rapidly iterate on testing while developing; They can visualize the result than having a giant DAG to debug.

Limitation

Mage is a new project founded in 2021 and is in its early stages. A lot of documentation needs to improve. Additionally, the number of plugins in Mage isn’t comparable to Airflow. 

Final Thoughts

Many data pipeline orchestration options haven’t been uncovered here. For R users, better integration with R language for data pipeline orchestration reduces the pressure to move initial data analysis to the data pipeline in production. I hope the options here can give you better insights into various data pipeline orchestration tools for R users.

About Me

I hope my stories are helpful to you. 

For data engineering post, you can also subscribe to my new articles or becomes a referred Medium member that also gets full access to stories on Medium.

In case of questions/comments, do not hesitate to write in the comments of this story or reach me directly through Linkedin or Twitter.

More Articles

Source: Aron Visuals from Unsplash
Data Engineering

Airflow Schedule Interval 101

The airflow schedule interval could be a challenging concept to comprehend, even for developers work on Airflow for a while find difficult to grasp. A confusing question arises every once a while on StackOverflow is “Why my DAG is not running as expected?”. This problem usually indicates a misunderstanding among the Airflow schedule interval.

Read More »

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
Share via
Copy link