Gregory Kanevsky

Building Data Science Pipelines with R and Aster. Part 3: Data Manipulation

Blog Post created by Gregory Kanevsky on Jul 27, 2017

Previously in this series

In my last post I started exploring concrete implementation of data science pipelines with Aster and R. We covered programming environment, loading data into Aster, and importance and easiness of testing every step in the pipeline. In this post we'll focus on perhaps the most prevalent part of every workflow - data manipulation. To make examples more translatable we will embrace dual approach of illustrating each technique with 2 equivalent examples: one using  TeradataAsterR and another using dplyr package. Finally, in the spirit of embracing "test everything" each example will result in comparing them for equivalency.

 

The Grammar of Data Manipulation

dplyr function design follows clear separation of concern of the split-apply-combine process and the grammar of data manipulation:

  • filter: keep rows with matching conditions;
  • select: select, drop or rename attributes by name;
  • arrange: order rows by attributes;
  • mutate: add new attributes;
  • summarise: reduce multiple attribute values to a single value;
  • group_by: group data by one or more variables to apply a function.

Graph demonstrating Split-Apply-Combine.

 

TeradataAsterR functions are the mix of 2 approaches: classic R and split-apply-combine process embraced by dplyr. So having examples for both will add clarity and understanding to how things get done in both cases.

 

But before jumping to the grammar of data manipulation functions we begin with complimentary operation that combines data from two data sets into one. 

 

Joining Tables

Aster is a relational data store (sorry if I never made this important point before) so its support for joining tables comes naturally with fully SQL-compliant SELECT statement that includes:

  • Inner join
  • Left, right, and full outer joins
  • Semi- and anti-join (same as filtering joins in dplyr)
  • Cross join (Cartesian product)

In Aster R this translates  into two  functions that both perform joins resulting in a virtual data frame of query type and both act on two virtual data frames:

  • ta.merge with R style merge parameters that will be translated to SQL
  • ta.join that offers SQL-like parameters including choice of join type

I suggest using the latter as it offers full functionality with better transparency and compatibility with actual operations taking place in Aster.

 

For our examples we need to combine batting stats in Batting with players attributes from Master. This requires inner join and for additional clarity we demonstrate how to accomplish this same operation using three methods: R base function merge(), dplyr function inner_join(), and Aster R ta.join():

# R
big.df = merge(Batting, Master, by="playerID")

# dplyr
big.tbl = Batting %>%
  inner_join(Master, by="playerID")

# Aster R
big.ta = ta.join(batting.ta, master.ta, by="playerID")
ta.colnames(big.ta)[c('x.row_names','x.playerID')] = c('row_names','playerID')

Aster R required some post-processing to remove prefixes in the attribute names (prefixed by ta.join() only to those that were found in both tables). Other than that all three look boringly similar  (if you ignore syntactic sugar of magrittr pipes. Yes, I had option of not using it but the goal was to have each example as clear as possible).

As always we test all 3 methods produced same results (notice that we ran 2 comparisons taking advantage of transitive property of the equivalency):

compareDataInRandAster(big.df, big.tbl, 
  key = c("playerID", "yearID", "stint"))

compareDataInRandAster(big.tbl, big.ta,
  key = c("playerID", "yearID", "stint"))

Note that playerID is both a primary key in Master and a foreign key in Batting which was sufficient in join. But resulting data set inherited the key from Batting that consists of 3 attributes: plyaerID, yearID, and stint.

 

We'll use big.xxx data sets to illustrate the grammar of data manipulation functionality as part of the pipeline workflow (we will limit examples to dplyr and TeradataAsterR functions from now on).

 

Transform

Feature engineering is one of top reasons new attributes get introduced in the data science pipelines and is often a key ingredient of successful models. For this purpose the grammar of data manipulation contains mutate function that adds new or changes existing attributes: dplyr keeps calling it mutate() while Aster R has ta.transform().

 

The following example creates two attributes:

  • key containing concatenated elements of composite table key that uniquely references each row,
  • age with player's age calculated using year of birth and current year:
# dplyr
big.prepped.1.tbl = big.tbl %>%
  mutate(key = paste(playerID, yearID, stint, sep="-"),
         age = yearID - birthYear)

# Aster R
big.prepped.1.ta = ta.transform(big.ta,
    key = paste(playerID, yearID, stint, sep="-"),
    age = yearID - birthYear)

Again, examples contain strikingly similar code for both. Still, underneath, there are some differences to be aware of. For example, while mutate() can immediately use newly introduced variables inside the call ta.transform() can't.

 

As before (and after as well) we test results of both functions for equivalency:

compareDataInRandAster(big.prepped.1.tbl, big.prepped.1.ta,
    key = c("key"))

Note how new attribute key being a unique reference was put immediately to work in the test.

 

Subset

Arguably the most common data manipulation operation is reducing data set to smaller one which may take two forms:

  • selecting attributes (vertical subset);
  • selecting rows (horizontal subset).

While dplyr uses functions select() and filter() respectively, Aster R combines conventional R to select attributes and function ta.subset() to select rows.

 We continue our example by selecting batting metrics identified by their names and stored in a variable batting_metrics while removing all rows before year 1995 (yearID >= 1995) and players without home runs (!is.na(HR) which Aster translates to IS NOT NULL operator):

batting_metrics = c("G", "AB", "R", "H", "HR", "BB", "SO")

# dplyr
big.prepped.2.tbl = big.prepped.1.tbl %>%
  select_(quote(c(key, playerID, yearID, stint, age)),
          .dots=batting_metrics) %>%
  filter(!is.na(HR) & yearID >= 1995)

# Aster R
big.prepped.2.ta = ta.subset(big.prepped.1.ta[,c("key","playerID","yearID","stint","age",
    batting_metrics, "row_names")],
    !is.na(HR) & yearID >= 1995)

One distinct feature of dplyr is its use of non-standard evaluation (NSE, see vignette("nse") for details) which is great for translating R to SQL and interactive use but is hard to program with. For that reason dplyr offers SE versions of its functions designated with suffix _. With Aster R we use NSE inside ta.submit() but not with attribute selection that uses data frame syntax.

 

And our standard test comparing results in dplyr and Aster R completes this example:

compareDataInRandAster(big.prepped.2.tbl, big.prepped.2.ta, key = "key")

Summarization

Data manipulation function that reduces multiple attribute values to a single value is usually referred to as summarization. With Aster R we have at least three approaches to data summarization:

  • aggregates using SQL GROUP BY, e.g. MAX, MIN, AVG, etc. with ta.summarise() or ta.summarise.each(),
  • SQL window functions available with ta.transform(),
  • and in-database R using ta.tapply() or ta.by() that offers the most flexible  option of pushing arbitrary R code to run in parallel over partitioned data inside the Aster database.

Note that summarizing doesn't necessarily imply collapsing of data as reduced values could  be attached to the original rows (the case for window functions). What summarizing always includes is dividing operation into two steps:

  1. a grouping of data based on the values of certain attribute(s) (GROUP BY clause in SQL or INDEX argument in tapply() and ta.tapply())
  2. a computation step that computes or reduces data with aggregation, window or arbitrary R functions within each group.

In fact, dplyr embraces this concept fully by always invoking a group_by() function first followed by appropriate value reduction function.

 

Using SQL GROUP BY

SQL GROUP BY clause should always come to mind first. Aster R implements it using ta.summarise() that encompasses both grouping and computation, while dplyr as mentioned before invokes these steps explicitly by composing the group_by() and summarise():

# dplyr
summary.1.tbl = big.prepped.2.tbl %>%
     group_by(playerID, yearID, age) %>%
     summarise_each_(funs(sum), batting_metrics)

# Aster R
summary.1.ta = ta.summarise(big.prepped.2.ta,
     group.by = c("playerID", "yearID", "age"),
     G=sum(G), AB=sum(AB), R=sum(R), H=sum(H), HR=sum(HR),
     BB=sum(BB), SO=sum(SO)) 

compareDataInRandAster(summary.1.tbl, summary.1.ta,
     key = c("playerID","yearID"))

In this example we consolidated the same player's records within single year into one record for simplicity. While it's not common that players change teams within the same season it does happen. And Lahman designates a record for each player's team stint, which for our purpose we consider unnecessary detail. So aggregates were used to consolidate stats to one player per year by summing up player's stats from all teams he played that season. Again, with dplyr we used SE version summarize_each_() to take advantage of existing list of all metrics in batting_metrics. With Aster R we included all aggregates explicitly in ta.summarise().

 

Using Window Functions

We could devote a whole blog post (or a series, or a book, easily) to window functions. But let me instead quote three sources - one from Aster, one from PostgreSQL, and lastly from dplyr vignette:

Window functions allow the calculation of aggregates and other values on a per-row basis as opposed to a per-set or per-group basis... For example, a window function may be used to compute a running sum, a moving average, a delta between values in neighboring rows, or to apply ranking and row numbering to a table.

- Teradata Aster Database User Guide

A window function performs a calculation across a set of table rows that are somehow related to the current row. This is comparable to the type of calculation that can be done with an aggregate function. But unlike regular aggregate functions, use of a window function does not cause rows to become grouped into a single output row — the rows retain their separate identities. Behind the scenes, the window function is able to access more than just the current row of the query result.

- PostgreSQL 9.1 Manual

A window function is a variation on an aggregation function. Where an aggregation function, like sum() and mean(), takes n inputs and return a single value, a window function returns n values. The output of a window function depends on all its input values, so window functions don’t include functions that work element-wise, like + or round(). Window functions include variations on aggregate functions, like cumsum() and cummean(), functions for ranking and ordering, like rank(), and functions for taking offsets, like lead() and lag().

- Window functions vignette

Besides its per-row application another important feature of window functions is support for order within groups, which allows for enumerating rows, computing lags, cumulative and other order-dependent functions. Thus, the syntax supports both grouping and ordering options depending on specific window function.

For our example we will compute following five values:

  • current season number for each player with row_number();
  • current career HR value (up to current year) for each player with cummax();
  • career HR for each player with max() (we could compute same with summarise() but not without collapsing groups into single row unlike with window function when each row is preserved);
  • player's career start year with min();
  • player's career end year with max().
# dplyr
summary.2.tbl = summary.1.tbl %>% group_by(playerID) %>%
  mutate(seasonsTotal=n(),
    currentSeason=with_order(order_by = yearID, fun = row_number, x = yearID),
    currentTopHR=with_order(order_by = yearID, fun = cummax, x = HR),
    topHR=max(HR), startCareerYear = min(yearID), endCareerYear = max(yearID))

# Aster R
summary.2.ta = ta.transform(summary.1.ta,
  seasonsTotal=n(partition = playerID),
  currentSeason=row_number(order = yearID, partition = playerID),
  currentTopHR=cummax(HR, order = yearID, partition = playerID),
  topHR=max(HR, partition = playerID),
  startCareerYear = min(yearID, partition=playerID),
  endCareerYear = max(yearID, partition=playerID))

compareDataInRandAster(summary.2.tbl, summary.2.ta,
  key = c("playerID", "yearID"))

Again, let's go over differences and similarities between two implementations. As noted before dplyr always divides operation into composition of group_by() and mutate() where the latter invokes window and aggregate functions inheriting grouping and retaining each row. Aster R involves ta.transform() and applies window and aggregate functions inside the call.

 

Using In-Database R

At last we arrived at the most powerful option available with Aster R: in-database R execution. Looking at previous examples one may think that R programming really played a limited role as there were not much R code. Indeed, our Aster R constructs closely resembled SQL or grammar of data manipulation with both SQL GROUP BY and window functions inside the grouping and aggregation (data reduction) steps. While the same constructs preside over the option of in-database R the logic transitions to R-centric paradigm based on R tapply() function and functional feature of R.

 

With dplyr not much changes as its functionality naturally supports functional R and doesn't leave client R environment (at least without involving extensions). On the other hand Aster R provides us with function ta.tapply() that does two important things:

  1. it expands reach of tapply() onto Aster tables by enabling parallel and distributed execution within Aster database;
  2. and it transports R code for execution from Aster R client to Aster R sandbox environments configured and available with proper installation on the Aster database.

Availability and version of R on the Aster database could be easily checked with commands:

> ta.is.R.installed()
[1] TRUE
> ta.R.Version()

Please consult with Teradata Aster R documentation for more details on how to configure and test R in-database environment.

 

For our example we want to answer the following question: for each player find his age when he had a career HR season (i.e. a season when player hit most HRs in his career). For simplicity we'll use topHR attribute from the previous example when calculating player's age when he hit most HRs in a season:

# dplyr
summary.3.tbl = summary.2.tbl %>%
  group_by(playerID) %>%
  summarize(
    seasons_total = seasonsTotal[[1]],
    max_hr = topHR[[1]],
    top_hr_age = age[HR == max_hr][[1]])

# Aster R
summary.3.ta = ta.tapply(summary.2.ta, INDEX=summary.2.ta$playerID,
  FUN=function(x){c(x$seasonsTotal[[1]], # seasons_total
                    x$topHR[[1]],        # max_hr
                    x$age[x$HR == x$topHR[[1]]][[1]])}, # top_hr_age
  out.tadf = list(columns=c("playerID", "seasons_total",
                            "max_hr", "top_hr_age")))

compareDataInRandAster(summary.3.tbl, summary.3.ta, key = "playerID")

Because Aster R version does things almost letter by letter like dplyr it is advisable to review the latter first. Our goal is to calculate player's stats so summarization will reduce number of rows for each player to 1, hence, we use group_by() with summarize() construct. Next, seasons_total and max_hr have the same values within each player's group so we always pick very first value for each. Things become R-stylish in the line 7 that assigns value to top_hr_age: given vector (array) age we pick its element per condition of corresponding HR value is equal to career maximum (i.e. equal to max_hr) . With Aster R we repeat the same logic but within tapply-style function call: each player's rows are designated by the variable x with anonymous function (line 11) assigned to argument FUN. Lastly, argument out.tadf (line 14) controls how Aster saves results in the database: in this case we explicitly rename column names using argument columns.

 

What really happened when we called ta.tapply() is radically different from all previous examples: the anonymous function assigned to FUN was serialized and sent over network to Aster worker nodes running R sandboxes where it was deserialized and loaded for execution. After that (you may think that was just a prep) Aster read the specification on how to run it from 2 arguments: virtual data frame (1st argument) referred to underling table and INDEX identified a column value to use to partition the data (line 10). At this point Aster has all ingredients to execute anonymous R function in-database on distributed data and in parallel. It will save results into new table and returns to Aster R that was waiting for ta.tapply(). After execution in Aster completes it returns a virtual data frame with results. Lastly see compareDataInRandAster() that tests for the same results with dplyr and Aster R.

 

Note that we could pass any R function (including references to 3d party packages given that they are installed on Aster as well) with FUN. This method elevates Aster R to both a framework and a platform for executing R code in parallel across distributed data in a manner consistent with R tapply() function. With proper configuration and algorithm design running parallel R should become a standard function in the daily data scientist routine with Aster. 

 

In the next post we shift gears to show elements of data science workflow such as exploratory analysis, principal component analysis (PCA) and predictive model.

 

Image sources:

Red Hat Developer Program

The Programming Historian

Outcomes