29 Spark Mllib Pipeline Efficient Development of Machine Learning Applications

29 Spark MLlib Pipeline - Efficient Development of Machine Learning Applications #

Hello, I’m Wu Lei.

In the previous lessons, we have learned how to do feature engineering and model training under the Spark MLlib framework. Whether it is feature engineering or model training, for the same machine learning problem, we often need to try different feature processing methods or model algorithms.

Based on the previous numerous examples, you may have already noticed that there is a lot of duplicated code in the development process when different algorithm selections are made for the same problem.

Taking GBDT and random forest as examples, the process of data processing is similar. The raw data is transformed into training samples through three steps: StringIndexer, VectorAssembler, and VectorIndexer. The only difference is that GBDT uses GBTRegressor for regression, while random forest uses RandomForestClassifier for classification.

Image

Not only that, but we also did not have a complete loop when validating the model effect. We only checked the fitting effect on the training set and did not perform inference and validation on the test set. If we try to load new test data, all the feature processing steps need to be repeated on the test set. Undoubtedly, this also introduces a lot of redundant duplicate code.

So, is there a way to avoid the aforementioned redundant development and make the machine learning development under the Spark MLlib framework more efficient? The answer is yes. In today’s lecture, we will talk about Spark MLlib Pipeline and see how it can greatly improve the development efficiency of machine learning applications for developers.

Spark MLlib Pipeline #

What is Spark MLlib Pipeline? In simple terms, Pipeline is a high-level API based on DataFrame, which allows developers to build end-to-end machine learning pipelines in an efficient way. To understand this, let’s first take a look at the core components of the Pipeline and the functionalities they provide.

There are two types of core components in Pipeline: Transformers and Estimators. Transformers are used to transform data, while Estimators are used to generate models. The various feature processing functions we have encountered before, such as StringIndexer, MinMaxScaler, Bucketizer, and VectorAssembler, all belong to Transformers. The model algorithms mentioned in the previous three lessons are all Estimators.

Image

Transformer #

Let’s start with Transformers, which are data transformers. In terms of form, Transformers take DataFrame as input and output DataFrame as well. Based on specific data processing logic, Transformers create new data columns based on the original DataFrame columns, often containing features in different forms.

Taking StringIndexer as an example, its transformation logic is simple, which is to convert strings to numerical values. When creating an instance of StringIndexer, we need to use the setInputCol(s) and setOutputCol(s) methods to specify the original data columns and the expected output data columns. The content in the output data columns is the features we need, as shown in the following figure.

Image

From the diagram, we can see that Transformers consume data columns from the original DataFrame and then append the generated data columns to the DataFrame, resulting in a new DataFrame. In other words, Transformers do not modify the original DataFrame “in place”, but rather create a new DataFrame based on it.

In fact, each Transformer implements these two (interface) methods: setInputCol(s) and setOutputCol(s). In addition to these methods, Transformers also provide a transform interface, which encapsulates specific transformation logic. It is based on these core interfaces that Pipeline can concatenate various Transformers to build feature engineering pipelines.

Generally speaking, in a machine learning application, we often need multiple Transformers to perform various transformations on the data to generate the required training samples. Logically, multiple DataFrames that are different “versions” of the same original data generated based on each other will exist in the system.

However, benefiting from Spark’s lazy evaluation design, there will not be multiple redundant data occupying memory at runtime.

However, for development convenience, we still use var instead of val to name the original DataFrame. The reason is simple. If we use val, we need to repeatedly use new variable names to name the newly generated DataFrames. You can review the code from the previous lesson for this development detail.

Estimator #

Next, let’s talk about Estimators. Compared to Transformers, Estimators are much simpler. They are actually various model algorithms, such as GBDT, random forest, linear regression, and so on. The core interface of Estimator is fit, which can be translated as “fitting” in Chinese.

The role of Estimators is to define model algorithms and then produce models (Models) by fitting the training samples encapsulated in DataFrame. This is why I call Estimators “model generators”.

However, interestingly, although model algorithms are Estimators, the models produced by Estimators are undeniably Transformers.

To understand why models are Transformers, we need to first understand what models are. Machine learning models are essentially parameter matrices (also known as weights) plus a model structure. The model structure is related to the model algorithm, such as decision tree structure, GBDT structure, neural network structure, and so on.

The core purpose of the model is to perform inference or prediction. Given data samples, the model can infer house prices, infer housing types, and so on. In the Spark MLlib framework, data samples are often encapsulated by DataFrame, and the results of model inference are still stored in a (new) DataFrame, with the default column name “predictions”.

In fact, based on the trained inference logic, by adding a “predictions” column, a DataFrame can be transformed into a new DataFrame. Isn’t this what Transformers are doing? And that’s why we call the fit method on model algorithms, while we call the transform method on models for inference.

Building a Pipeline #

Now that we understand the Transformer and Estimator, we can use them to build a Pipeline and create an end-to-end machine learning workflow. In fact, once the Transformers and Estimators are ready, defining a Pipeline only requires one line of code, as shown below.

import org.apache.spark.ml.Pipeline

// Define various feature processing objects and model algorithms as before
val stringIndexer = _
val vectorAssembler = _
val vectorIndexer = _
val gbtRegressor = _

// Put all the Transformers and Estimators in an array
val stages = Array(stringIndexer, vectorAssembler, vectorIndexer, gbtRegressor)

// Define the Spark MLlib Pipeline
val newPipeline = new Pipeline().setStages(stages)

As you can see, to define a Pipeline, you just need to create a Pipeline instance and pass the previously defined Transformers and Estimators as parameters to the setStages method. It is important to note that a Pipeline can contain multiple Transformers and Estimators, but the last stage of the Pipeline must be an Estimator.

So far, we have covered the purpose, definition, and core components of a Pipeline. However, you may still wonder, “Although we have explained the concepts, I still don’t know how to use a Pipeline specifically, and what advantages it brings.” Don’t worry, actions speak louder than words. Next, we will discuss the specific usage of a Pipeline and how it can greatly improve development efficiency, using examples of GBDT and random forest.

First, let’s see how a Pipeline can help us improve efficiency in a machine learning application. In the previous lesson, we used GBDT to fit house prices and provided code examples.

Now, let’s slightly adjust the code to train the model using the Spark MLlib Pipeline. As a first step, we create a DataFrame from the file and separate the numeric and non-numeric fields, as shown below.

import org.apache.spark.sql.DataFrame

// rootPath is the root directory of the house price prediction dataset
val rootPath: String = _
val filePath: String = s"${rootPath}/train.csv"

// Read the file and create a DataFrame
var engineeringDF: DataFrame = spark.read.format("csv").option("header", true).load(filePath)

// All numeric fields
val numericFields: Array[String] = Array("LotFrontage", "LotArea", "MasVnrArea", "BsmtFinSF1", "BsmtFinSF2", "BsmtUnfSF", "TotalBsmtSF", "1stFlrSF", "2ndFlrSF", "LowQualFinSF", "GrLivArea", "BsmtFullBath", "BsmtHalfBath", "FullBath", "HalfBath", "BedroomAbvGr", "KitchenAbvGr", "TotRmsAbvGrd", "Fireplaces", "GarageCars", "GarageArea", "WoodDeckSF", "OpenPorchSF", "EnclosedPorch", "3SsnPorch", "ScreenPorch", "PoolArea")

// Label field
val labelFields: Array[String] = Array("SalePrice")

import org.apache.spark.sql.types.IntegerType

for (field <- (numericFields ++ labelFields)) {
    engineeringDF = engineeringDF
        .withColumn(s"${field}Int",col(field).cast(IntegerType))
        .drop(field)
}

Once the data is prepared, we can start building the components for the Pipeline: defining the Transformers and Estimators one by one. In the previous lesson, we used StringIndexer to convert non-numeric fields into numeric fields. We will do the same here.

import org.apache.spark.ml.feature.StringIndexer

// All non-numeric fields
val categoricalFields: Array[String] = Array("MSSubClass", "MSZoning", "Street", "Alley", "LotShape", "LandContour", "Utilities", "LotConfig", "LandSlope", "Neighborhood", "Condition1", "Condition2", "BldgType", "HouseStyle", "OverallQual", "OverallCond", "YearBuilt", "YearRemodAdd", "RoofStyle", "RoofMatl", "Exterior1st", "Exterior2nd", "MasVnrType", "ExterQual", "ExterCond", "Foundation", "BsmtQual", "BsmtCond", "BsmtExposure", "BsmtFinType1", "BsmtFinType2", "Heating", "HeatingQC", "CentralAir", "Electrical", "KitchenQual", "Functional", "FireplaceQu", "GarageType", "GarageYrBlt", "GarageFinish", "GarageQual", "GarageCond", "PavedDrive", "PoolQC", "Fence", "MiscFeature", "MiscVal", "MoSold", "YrSold", "SaleType", "SaleCondition")

// The output column names expected by StringIndexer
val indexFields: Array[String] = categoricalFields.map(_ + "Index").toArray

// Define StringIndexer instance val stringIndexer = new StringIndexer() // Batch specify input column names .setInputCols(categoricalFields) // Batch specify output column names, output column names must correspond to input column names one by one .setOutputCols(indexFields) .setHandleInvalid(“keep”)

In the previous lesson, after defining the StringIndexer instance, we immediately use it to transform engineeringDF. However, when building the Pipeline, we don’t need to do this. We just need to define this “component” well. Next, let’s create the next component: VectorAssembler.

import org.apache.spark.ml.feature.VectorAssembler

// Convert numeric fields to integers val numericFeatures: Array[String] = numericFields.map(_ + “Int”).toArray

val vectorAssembler = new VectorAssembler() /** The input column is: numeric fields + non-numeric fields Note that the column names of non-numeric fields should use indexFields instead of the original categoricalFields. Think about why? */ .setInputCols(numericFeatures ++ indexFields) .setOutputCol(“features”) .setHandleInvalid(“keep”)

Compared with the previous lesson, the definition of VectorAssembler is no different.

Next, let’s continue to create the third component: VectorIndexer, which is used to help model algorithms distinguish continuous features from discrete features.

import org.apache.spark.ml.feature.VectorIndexer

val vectorIndexer = new VectorIndexer() // Specify the input column .setInputCol(“features”) // Specify the output column .setOutputCol(“indexedFeatures”) // Specify the threshold for distinguishing continuous and discrete .setMaxCategories(30) .setHandleInvalid(“keep”)

So far, the Transformers are all defined. After the original data is transformed by StringIndexer, VectorAssembler, and VectorIndexer, a new DataFrame will be generated. In this latest DataFrame, there will be multiple data columns generated by different Transformers, and the data content contained in the “indexedFeatures” column is the feature vector.

Combining with the “SalePriceInt” column brought by the DataFrame, the feature vector and the target variable are finally combined together, which is what we commonly refer to as training samples. With the training samples, we can now start defining the Estimator.

import org.apache.spark.ml.regression.GBTRegressor

val gbtRegressor = new GBTRegressor() // Specify the target variable .setLabelCol(“SalePriceInt”) // Specify the feature vector .setFeaturesCol(“indexedFeatures”) // Specify the number of decision trees .setMaxIter(30) // Specify the maximum depth of the decision trees .setMaxDepth(5)

Okay, here we go. At this point, all the components required by the Pipeline have been created. With the components in place, we only need to assemble them into a Pipeline using the “assembly line technology” provided by Spark MLlib.

      .withColumn(field, testData(field).cast(IntegerType))
    }

接下来,我们只需调用之前构建好的Pipeline模型,对testData进行推断即可。

val predictedData = pipelineModel.transform(testData)

这样,我们就可以得到针对test.csv的预测结果了,进而提交到Kaggle线上平台,进行模型评估和排名。

通过这样的方法,我们在作业内复用了之前的Pipeline模型,避免了重复开发和测试样本不一致的问题。不仅仅如此,如果以后再有新的数据,我们只需要按照同样的步骤,加载数据并进行转换,即可用之前保存的Pipeline模型进行推断。

作业间的代码复用 #

在实际工作中,我们会面对很多类似的机器学习项目。比如说,接下来可能有人会提出一个课题,让我们预测一处房产的售价,这个课题和之前的房价预测有些类似。

对于熟悉机器学习的你来说,可以想象一下,我们可以把之前的Pipeline封装成一个专门的Transformer,并将其打包发布到生产环境,供其他开发人员在其他项目中使用。

Spark MLib提供了一个机制,叫做PipelineModel。它和Pipeline的关系有点像类和实例的关系。

对于同一个Pipeline,我们可以构建多个PipelineModel实例。比如我们可以用7:3的比例划分不同数据集,分别构建不同的PipelineModel实例,然后对比两个模型在不同数据集上的性能,通过这种方法来选择更优的模型。

同样地,如果我们有一个特别好用、可供项目间通用的Pipeline,我们可以构建多个PipelineModel实例,并在不同的项目中使用。

比如说,一个实际需求是要预测伊利诺伊州芝加哥市的各个行政区的人口密度,我们该怎么做呢?我们可以把之前的Pipeline封装成Transformer,并对它进行调整,让它适应新的数据集和新的预测需求,然后再重新训练。

val chicagoData: DataFrame = ???

val predictedDensity: DataFrame = pipelineModel.transform(chicagoData)

通过这种方式,我们可以在不同的项目中,使用同一个PipelineModel实例,方便地复用之前的模型和转换逻辑。

总结一下,Pipeline是一个非常好的工具,它能够帮助我们将多个数据转换和模型训练的步骤组合在一起,并实现代码的复用。我们可以在作业间或作业内复用Pipeline,大幅度提升开发效率。 .withColumn(s"${field}Int", col(field).cast(IntegerType)) .drop(field) }

Next, we only need to call the transform method of the Pipeline Model to make inferences on the test set. Remember? The model is a Transformer and transform is the universal interface for Transformer to transform data.

val predictions = pipelineModel.transform(testData)

With the Pipeline, we can eliminate the repetition of defining feature processing functions such as StringIndexer and VectorAssembler, improve development efficiency, and eliminate the hidden dangers of inconsistent samples. In addition to reusing the Pipeline within the same job, we can also reuse it between different jobs to further improve development efficiency.

Code reuse between jobs #

For the same machine learning problem, we often try different model algorithms in order to obtain better model performance. For example, for house price prediction, we can use GBDT or random forest. However, although the model algorithms are different, the training samples are often similar or even exactly the same. If we have to process the data from scratch every time we try a different model algorithm, it would be too inefficient and prone to errors.

With the Pipeline, it becomes extremely easy to choose the algorithm. Let’s take house price prediction as an example again. Previously, we tried to use GBTRegressor to train the model. This time, let’s try RandomForestRegressor, a random forest algorithm, to solve the regression problem. As usual, let’s explain it with code.

import org.apache.spark.ml.Pipeline

val savePath: String = _

// Load the Pipeline saved to disk before
val unfitPipeline = Pipeline.load(s"${savePath}/unfit-gbdt-pipeline")

// Get each stage (Transformer or Estimator) in the Pipeline
val formerStages = unfitPipeline.getStages

// Remove the last component, i.e., Estimator: GBTRegressor, from the Pipeline
val formerStagesWithoutModel = formerStages.dropRight(1)

import org.apache.spark.ml.regression.RandomForestRegressor

// Define the new Estimator: RandomForestRegressor
val rf = new RandomForestRegressor()
.setLabelCol("SalePriceInt")
.setFeaturesCol("indexedFeatures")
.setNumTrees(30)
.setMaxDepth(5)

// Concatenate the old Stages with the new Estimator
val stages = formerStagesWithoutModel ++ Array(rf)

// Define the new Pipeline
val newPipeline = new Pipeline()
.setStages(stages)

First, we reload the Pipeline saved before. Then, replace the original GBTRegressor with the new RandomForestRegressor. Finally, concatenate the original stages with the new Estimator to create the new Pipeline. Next, just call the fit method to trigger the operation of the new Pipeline and finally fit the new random forest model.

// Prepare data by creating a DataFrame from train.csv as before
var engineeringDF = _

val Array(trainingData, testData) = engineeringDF.randomSplit(Array(0.7, 0.3))

// Call the fit method to trigger the operation of the Pipeline and fit the new model
val pipelineModel = newPipeline.fit(trainingData)

As you can see, with just a few lines of code, we can easily complete the model selection. With that, the advantages of the Pipeline in terms of development efficiency and fault tolerance are undeniable.

Key Recap #

That’s all for today’s content. In today’s lecture, we learned about Spark MLlib Pipeline. You need to understand the advantages of Pipeline and master its core components and specific usage. The core components of Pipeline are Transformer and Estimator.

Among them, the Transformer completes the transformation from DataFrame to DataFrame, generating new data columns based on the inherent transformation logic. Estimator is mainly a model algorithm. It generates machine learning models based on the training samples encapsulated in the DataFrame. By concatenating multiple Transformers and Estimators together and calling the setStages method of Pipeline, you can create a Pipeline.

The core advantage of Pipeline is to improve the efficiency of machine learning application development and eliminate the fatal inconsistency between test samples and training samples. Pipeline can be used for code reuse within a job or between jobs.

Within the same job, Pipeline can easily perform data inference on the test set. Between jobs, developers can load the previously saved Pipeline and then use “new parts” to replace “old parts” to reuse most of the processing logic and create a new Pipeline, thus achieving an efficient model selection process.

In future machine learning development, we should fully utilize the advantages provided by Pipeline to reduce development costs and focus more on feature engineering and model tuning.

So far, we have covered all the content of the Spark MLlib module.

In this module, we mainly focused on feature engineering, model training, and machine learning pipelines, and summarized the various capabilities provided by the Spark MLlib sub-framework to developers. In other words, we now know what Spark MLlib can do and what it is good at. If we can master these capabilities, we can flexibly choose and use them in our daily machine learning development to meet the ever-changing business needs. Keep up the good work!

Daily Exercise #

Today we have been talking about the advantages of Pipeline. Can you mention some possible disadvantages of Pipeline?

Feel free to leave a comment and interact with me. I also recommend that you share this lecture with more colleagues and friends. Maybe it will help them further understand Pipeline.