R – Using DoParallel to Significantly Speedup Database Retrieval

Recently I ran into a unique problem concerning record retrieval from a Microsoft SQL database. The size of the data I had to retrieve coupled  with the non optimal structure of the data repository was causing query times to exceed 12 hours! Unfortunately, the database structure (indexes,keys, relationships) were from a legacy environment and were not going to improve for my needs. Processes that run for long periods of time, mundane manual processes and slow Internet connections all bother me deeply. Performance, reuseability, testibility and automation; these are areas I have in focus on all projects. A 12 hour data retrieval time is not acceptable, I will find another approach.

At first I found that breaking my database query into multiple grouped requests slightly improved performance, but it was still taking hours. Not fast enought. I needed these multiple requests running concurrently in order to get all required data in a timely manner. What about having 10-12 people kick off my R code from their local machines, output to flat files and then merge together on my machine. Uh, what happens when I need to rerun tomorrow, tonight… not sustainable. What about R Parallel Processing? My latency point is the database, I can simulate multiple calls from R code and automatically merge the results..

Here is my design:

image2

R doParallel will enable my code to make multiple database requests concurrently from my local machine. A few code changes are necessary, but worth it. Add the ‘doParallel’ package to your environment and then bring it in to the code.

 

require(doParallel)

 

Before I use any features of doParallel, I first break my larger list of records into smaller groups of unique IDs to optimize performance. I will use these IDs to retrieve the millions of detail records for my analysis. After some testing, I found that 25 groups of ~2000 unique IDs achieved my best results. Lets say the unique IDs are customer IDs and our system stores them as char(10), because much to my dismay, it really does happen. The customer ID is 10 characters long in a database field greater than 10 characters – nothing every seems to fit perfectly in the real world. Using R one would substring the field to the desired length of 10, then use unique to ensure we have no duplicates.

To build a sequence for grouping the unique customer IDs, use the seq function. Pass in the desired starting point (From parameter ) to the max record count (to parameter). I specified length.out as I only want 25 groups. The From value could and probably should be dynamic, but…. I knew 2000 records would fit nicely into my desired 25 groups, shame on me. I have an example of this for a solution involving sampling from very large files.

Finally, use lapply to actauly construct the customer ID groups by passing the sequence values to the lambda function.

#Data Source is Char > 10, but ID is 10 exactly, substring to 10 characters
 # and find unique customer IDscustomerIDs <- unique(substring(customerDT$CustomerID,1,10))#Get the count of unique CustomersrowCnt <- length(customerIDs)#Get Sequence from 2000 to row Cnt, 25 is the desired lenght of the sequence
 #For my purposes, 2000 and 25 produced the most optimal performance.

s <- seq(from=2000,to=rowCnt, length.out = 25)

#Group my customer ID using the sequence

groupBreakOut <- lapply(seq_along(s),function(i) customerIDs[(s-1999)[i]:s[i]])

 

Now for the really cool stuff, a parallel loop. First use makeCluster to specify the number of threads, or cores, and the path for the debug output. There will be diminishing margin of returns on the utilized cluster cores, so in this case, more parallel cores is not better in regards to optimal performance. I beleive 10 was my magic performance gain number. The R console will not display results while looping, calls to print will instead be written to the output file if one is specified, otherwise, there will be a lot of nothingness. I found this file very helpful as a means to monitor the run progress, in addition to debug messages. ClusterExport and ClusterEvalQ are used to make variables and packages available to the parallel code structure.

#Make a cluster with 12 cores, send debug output to debug.txt
cl <- makeCluster(12,outfile="debug.txt")#Enable access to the following variables
clusterExport(cl, c("con1","sqlcmdOrig","groupBreakOut","dtDetails"))#The Parallel loop will require access to the following libraries
clusterEvalQ(cl, library(RODBC))
clusterEvalQ(cl, library(data.table))#Register the Cluster
registerDoParallel(cl)

#Start the Parallel Loop
dtDetails <- foreach(j= 1:length(groupBreakOut),.combine = "rbind", .inorder = FALSE) %dopar% {

#print to debug file
print(paste(j," of ", length(groupBreakOut)))

#get an available group of the 25 (lenght.out was 25, should keep me at 25 groups)

ids <- groupBreakOut[j]

#Ensure database connection is established
con1 <- odbcReConnect(con1)

#Replace 'RCUSTOMERIDr' with a list of customer IDs suitable for MS SQL '1','2','3'.....

#sqlcmdorig contains the SQL code, could also be a stored procedure name
sqlcmd <- gsub("rCUSTOMERIDr",paste("'",paste(unlist(ids),sep=',',collapse="','"),"'",sep=""),sqlcmdOrig)

#get the database results
dfTemp <- sqlQuery(con1,sqlcmd,as.is=TRUE)

#(PLace for possible data conversion, calculations on results)
 #Insert into Data.Table
dtTemp <- data.table(dfTemp)

#Clean-up unused Data.Frame
rm(dfTemp)

#these results will be placed into dtDetails

dtTemp[,.(Transactions =.N, Sales=sum(Amt)),by=.(CustomerID)]

}

stopCluster(cl)
stopImplicitCluster()

 

I am using the R foreach loop to iterate through all customer ID groups (25 groups). The results of the concurrent processes will be combined together into the data.table dtDetails. The foreach .combine parameter specifies the rbind method to be utilized. Adding %dopar% at the end of the foreach statement establishes the parallelism functionality on the loop body. Everything inside this loop is now subject to the specified cluster conditions.

The original data retrieve query exceeded 12 hours. Temp tables, stored procedures, indexing were not viable options. Breaking the database call into 25 multiple calls still caused the retrieval process to run for longer than 1 hour. By using R and doParallel, I was able to get the process to run in 1-3 minutes on average; it has yet to run over 4 minutes.

It is not my intention for this post to be the “doParallel 101” guide that answers all questions on parallelism, but instead to demonstrate how it was a ‘lifesaver’ on a real project.

One comment

  1. Very usefull, thanks. There is no much documentation about doParallel over the net.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: