20 Hive Spark's Strong Alliance, the Unparalleled Choice for Distributed Data Warehouses

20 Hive - Spark’s Strong Alliance, the Unparalleled Choice for Distributed Data Warehouses #

Hello, I am Wu Lei.

In the lectures on data sources and data formats, as well as data transformation (lectures 15 and 16), we introduced the general steps for developing data analysis applications using Spark SQL.

Let’s briefly review: first, we create a DataFrame from a distributed file system using the SparkSession read API. Then, we perform various data transformations, filtering, aggregation, and other operations by creating temporary tables and using SQL statements or directly using the DataFrame API. Finally, we write the computation results back to the distributed file system using the SparkSession write API.

In fact, direct interaction with the file system is just one common scenario for Spark SQL data applications. Another very typical scenario is integrating with Hive to build a distributed data warehouse. As we know, a data warehouse refers to a collection of data sets with higher-level themes and aggregation, often in the form of a series of carefully designed data tables. In the field of data analysis, the application of data warehouses is very common.

In the combination of Hive and Spark, whose combination is like a “jack-of-all-trades,” Hive excels in metadata management, while Spark excels in efficient distributed computing. The combination of the two can be described as a “strong alliance.” In today’s lecture, we will discuss the two ways of integrating Spark with Hive, one from the perspective of Spark, which we call Spark with Hive, and the other from the perspective of Hive, commonly known as Hive on Spark.

Hive Architecture and Basic Principles #

Before explaining these two integration methods, let’s take some time to understand the architecture and working principles of Hive, so that students who are not familiar with Hive can avoid confusion.

Hive is a core component used by the Apache Hadoop community to build data warehouses. It is responsible for providing a variety of user interfaces that accept SQL queries submitted by users. After parsing and optimizing these queries, Hive often converts them into distributed tasks and executes them using Hadoop MapReduce.

Hive is a true “jack of all trades”. Its core components are the User Interface (1) and the Driver (3). Whether it is the metadata database (4), storage system (5), or computation engine (6), Hive “outsources” them to third-party independent components in a “plug-and-play” manner, as shown in the following figure.

Image

Hive’s User Interface provides SQL access services for developers. The specific access methods include Hive Server 2 (2), CLI, and Web Interface (web interface entry). Among them, CLI and Web Interface directly receive SQL queries locally, while Hive Server 2 allows developers to submit SQL query requests remotely by providing JDBC/ODBC client connections. Obviously, Hive Server 2 has more flexible access methods and is more widely used.

Let’s take responding to an SQL query as an example to see how Hive works. After receiving the SQL query, Hive’s Driver first uses its Parser component to convert the query into an Abstract Syntax Tree (AST).

Then, the Planner component generates an execution plan based on the AST, and the Optimizer further optimizes the execution plan. To complete this series of actions, Hive must have access to metadata of the corresponding data tables, such as table names, column names, field types, data file storage paths, file formats, etc. And all of these important metadata is stored in a database called the “Hive Metastore” (4).

Essentially, the Hive Metastore is just a regular relational database (RDBMS). It can be a free MySQL, Derby, or a commercial Oracle, IBM DB2. In fact, in addition to assisting in the parsing of SQL syntax, generating and optimizing execution plans, one of the important roles of the Metastore is to help the underlying computation engine efficiently locate and access data sources in the distributed file system.

The distributed file system here can be Hadoop’s HDFS or the cloud-native Amazon S3. In terms of execution, Hive currently supports three types of computation engines: Hadoop MapReduce, Tez, and Spark.

When Hive uses Spark as the underlying computation engine, we call this integration method “Hive on Spark”. Conversely, when Spark only considers Hive as a metadata management tool, we call this integration method “Spark with Hive”.

You may feel confused: “These two terms sound similar. What is the essential difference between the two integration methods?” Next, we will explain the “Spark with Hive” integration method first in the order of “easy first” and then introduce “Hive on Spark”.

Spark with Hive #

Before we start learning Spark with Hive in earnest, let’s talk about the core idea of this integration method. As we just mentioned earlier, the Hive Metastore uses an RDBMS to store the metadata of data tables, such as table name, table type, table data schema, table (partition) data storage path, and storage format, and so on. In other words, the Metastore is like a “household register”, which records the details of each dataset in the distributed file system.

By accessing this “household register” called the Hive Metastore, Spark SQL can expand its data sources. This is the core idea of Spark with Hive integration. Simply put, in this integration mode, Spark is the main body, and the Hive Metastore is just an auxiliary tool used by Spark to expand its data sources. Clarifying the relationship between Spark and Hive helps us differentiate between Hive on Spark and Spark with Hive.

As developers, we can achieve Spark with Hive integration through three approaches, which are:

  1. Creating SparkSession and accessing a local or remote Hive Metastore.
  2. Accessing a local Hive Metastore using the built-in spark-sql CLI of Spark.
  3. Accessing the Spark Thrift Server using the Beeline client.

SparkSession + Hive Metastore #

To better understand the relationship between Hive and Spark, let’s start with the first approach, which is accessing the Hive Metastore through SparkSession. First, we use the following command to start the Hive Metastore.

hive --service metastore

After starting the Hive Metastore, we need to let Spark know the access address of the Metastore, which means telling Spark where the “household register” of the data sources is located.

To pass this message, we have two ways. One way is to explicitly specify the hive.metastore.uris parameter through the config function when creating the SparkSession. Another way is to let Spark read the Hive configuration file hive-site.xml, which records various configuration options related to Hive, including hive.metastore.uris. By copying hive-site.xml to the conf subdirectory of the Spark installation directory, Spark can read the configuration content on its own.

Next, let’s demonstrate the first approach through a small example. Suppose there is a salary table named “salaries” in Hive, and each record contains two fields: id and salary. The table data is stored in HDFS. Now, by entering the following code in the spark-shell, we can easily access the data table in Hive.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame

val hiveHost: String = _
// Create a SparkSession instance
val spark = SparkSession.builder()
                   .config("hive.metastore.uris", s"thrift://hiveHost:9083")
                   .enableHiveSupport()
                   .getOrCreate()

// Read the Hive table and create DataFrame
val df: DataFrame = spark.sql("select * from salaries")

df.show

/** Result
+---+------+
| id|salary|
+---+------+
|  1| 26000|
|  2| 30000|
|  4| 25000|
|  3| 20000|
+---+------+
*/

In Lecture 16, we talked about creating temporary tables from data files using the createTempView function. Once the temporary table is created, we can use the SQL API of SparkSession to submit SQL queries. After connecting to the Hive Metastore, we can bypass the first step and directly use the SQL API to access existing tables in Hive. Isn’t it convenient?

More importantly, the temporary table created by the createTempView function has a lifespan limited to the Spark job. This means that once the job is completed, the temporary table no longer exists and cannot be reused by other applications. Hive tables, on the other hand, have their metadata persistently stored in the Hive Metastore, and different jobs, applications, or even computing engines (such as Spark, Presto, Impala, etc.) can access the Hive tables through the Hive Metastore.

In summary, in the SparkSession + Hive Metastore integration method, Spark’s access to Hive only involves the Metastore, and it does not touch other components of the Hive architecture. In other words, in this integration method, Spark simply “freeloads” the Hive Metastore. After obtaining the metadata of the dataset, Spark SQL loads and processes the data on its own, as shown in the following diagram.

Image In the first integration method, using the SQL API, you can directly submit complex SQL statements. You can also use various operators mentioned in Lesson 16 to implement business logic after creating a DataFrame.

spark-sql CLI + Hive Metastore #

However, you might ask, “Since we are building a data warehouse, can we directly input SQL queries like using a regular database, bypassing the SparkSession’s SQL API?”

The answer is yes. Next, we’ll talk about the second integration method of Spark with Hive: spark-sql CLI + Hive Metastore. Similar to spark-shell and spark-submit, spark-sql is also a built-in system command in Spark. By placing the hive-site.xml file with the configured hive.metastore.uris parameter in the conf directory of the Spark installation directory, we can directly use SQL statements in spark-sql to query or process Hive tables.

Obviously, in this integration mode, the relationship between Spark and Hive is the same as the previously mentioned SparkSession + Hive Metastore integration. Essentially, both Spark and Hive expand the data source through Hive Metastore.

However, compared to the former, the integration method of spark-sql CLI has an additional restriction in deployment: spark-sql CLI and Hive Metastore must be installed on the same compute node. In other words, spark-sql CLI can only access Hive Metastore locally and cannot do so remotely.

In most industrial production systems, different big data components are often deployed separately, and Hive and Spark are no exception. Since Hive Metastore can be used to serve different computing engines, such as Presto and Impala mentioned earlier, in order to reduce the workload on the nodes, Hive Metastore is often deployed on a relatively independent compute node.

In this context, it must be said that the restriction of local access in spark-sql CLI greatly weakens its applicability, which is also the fundamental reason why the spark-sql CLI + Hive Metastore integration method is rarely used. However, this does not prevent us from learning and understanding it, as it helps deepen our understanding of the relationship between Spark and Hive.

Beeline + Spark Thrift Server #

At this point, you may ask, “Since spark-sql CLI has such restrictions, are there any other integration methods that can be deployed in a production system and allow developers to write SQL queries?” The answer is “yes.” The third integration method of Spark with Hive is to use the Beeline client to connect to the Spark Thrift Server and access and process Hive tables.

Beeline was originally a Hive client that connects to Hive Server 2 via JDBC. Hive Server 2 can serve multiple clients simultaneously, providing multi-tenant Hive query services. Since Hive Server 2 implements the Thrift RPC protocol framework, we often refer to Hive Server 2 as “Hive Thrift Server 2”.

The query requests accessed through Hive Server 2 are parsed, planned, and optimized by the Hive Driver and then executed by the compute engine that Hive is running on. The query results are then returned to the Beeline client via Hive Server 2, as shown in the dashed box on the right side of the following figure.

Image

Spark Thrift Server was born from Hive Server 2. In terms of query reception, multi-tenant services, and permission management, the implementation logic of these two servers is almost the same. The biggest difference lies in the parsing, planning, optimization, and execution of SQL queries after they are accessed.

As we just mentioned, the “backend” of Hive Server 2 is Hive’s underlying infrastructure. After a SQL query is accessed by Spark Thrift Server, it is first optimized by the Spark SQL optimizer.

As mentioned in Lesson 14, with the help of Catalyst and Tungsten, Spark SQL performs syntax parsing, syntax tree construction, logical optimization, physical optimization, data structure optimization, and execution code optimization on SQL queries. Then, Spark SQL delivers the optimized execution plan to the Spark Core execution engine for execution.

Image

It is not difficult to see that the execution path of SQL queries after they are accessed by Spark Thrift Server is exactly the same as the execution path of DataFrames in Spark.

After understanding the differences and connections between Spark Thrift Server and Hive Server 2, let’s talk about how to start Spark Thrift Server and how to use Beeline. To start the Spark Thrift Server, we just need to call the start-thriftserver.sh script provided by Spark.

// SPARK_HOME environment variable pointing to the Spark installation directory
cd $SPARK_HOME/sbin

// Start Spark Thrift Server
./start-thriftserver.sh

After the script is successfully executed, the Spark Thrift Server will listen for JDBC/ODBC connection requests on port 10000 by default. Interestingly, regarding the setting of the listening port, Spark reuses the hive.server2.thrift.port parameter of Hive.Server2. Like other Hive parameters, hive.server2.thrift.port also needs to be set in the hive-site.xml configuration file.

Once the Spark Thrift Server is successfully started, we can access the service through the Beeline client on any node. After establishing a successful connection (connections) between the client and the server, we can use SQL statements in the Beeline client to process Hive tables. It is worth noting that in this integration mode, Spark is the optimization and compute engine behind the SQL statements.

/**
Connect to Spark Thrift Server using the Beeline client,
where hostname is the node where the Spark Thrift Server service is located
*/
beeline -u "jdbc:hive2://hostname:10000"

Alright, with that, we have finished talking about integration methods like Spark with Hive.

To consolidate what we have just learned, let’s summarize it. Regardless of whether it is SparkSession + Hive Metastore, spark-sql CLI + Hive Metastore, or Beeline + Spark Thrift Server, Spark plays the role of an execution engine, while Hive mainly provides the metadata of the underlying datasets through Metastore. It is not difficult to see that in these types of integration methods, Spark takes the “leading role,” while Hive takes the “supporting role”.

Hive on Spark #

At this point, you may be curious: “Are Hive and Spark communities equal? Are there times when Hive plays the main role and Spark plays the supporting role?” Yes, there is such a scenario, and it is called Hive on Spark integration.

Basic Principles #

At the beginning of this lecture, we briefly introduced the basic architecture of Hive. Hive’s loosely coupled design allows its metastore, underlying file system, and execution engine to be pluggable and replaceable.

In terms of the execution engine, Hive is by default based on Hadoop MapReduce, but it also supports Tez and Spark. “Hive on Spark” actually means that Hive uses Spark as its backend distributed execution engine, as shown in the following figure.

Image

From the user’s perspective, there is no difference between using Hive on MapReduce or Hive on Tez and using Hive on Spark. The switch between execution engines is completely transparent to the user. Regardless of which execution engine Hive chooses, the engine is only responsible for distributed computing tasks, and the parsing, planning, and optimization of SQL statements are all done by Hive’s driver.

To support different execution engines, Hive also needs to do some simple adaptation to translate the optimized execution plan into the semantics of the underlying compute engine.

For example, in the integration mode of Hive on Spark, after Hive transforms the SQL statements into an execution plan, it needs to translate the execution plan into a DAG in RDD semantics and then hand the DAG over to Spark Core for execution. From Lecture 14 to now, we have been emphasizing that besides playing the role of a data analysis sub-framework, Spark SQL also serves as Spark’s next-generation optimization engine.

In the Hive on Spark integration mode, the part that connects Hive and Spark is Spark Core, not Spark SQL, and this is something we need to pay special attention to. This is why, compared to Hive on Spark, the integration of Spark with Hive has better performance. After all, the combination of Spark SQL + Spark Core, as the original combination, has higher compatibility compared to the adapted combination of Hive Driver + Spark Core.

Integration Implementation #

After analyzing the principles, let’s talk about how to implement Hive on Spark integration.

First of all, since we want Hive to run on Spark, we need to prepare a complete Spark deployment in advance. As for the deployment mode of Spark, Hive does not impose any restrictions, and both Spark on Standalone, Spark on Yarn, and Spark on Kubernetes are acceptable.

Once the Spark cluster is ready, we can easily achieve Hive on Spark integration by modifying the relevant configuration items in hive-site.xml, as shown in the following table.

Image

Among them, hive.execution.engine is used to specify the backend execution engine of Hive, and the optional values are “mapreduce”, “tez”, and “spark”. Obviously, setting this parameter to “spark” means adopting the Hive on Spark integration mode.

After determining the execution engine, we naturally need to tell Hive: “Where is the Spark cluster deployed?” The spark.master parameter is used to achieve this. In addition, in order to facilitate Hive calling the relevant scripts and Jar files of Spark, we also need to specify the installation directory of Spark through the spark.home parameter.

After configuring these three parameters, we can submit query requests to Hive using Hive SQL, and Hive will first access the metastore on the driver end to complete the formulation and optimization of the execution plan. Then, the plan will be translated into a DAG in RDD semantics, and finally, the DAG will be handed over to the backend Spark for distributed computing.

When you see the words “Hive on Spark” in the terminal, it means that the execution engine behind Hive is indeed Spark, as shown in the following figure.

Image

Of course, in addition to the above three configuration items, Hive also provides more parameters for fine-tuning the interaction between Hive and Spark. You can refer to the Hive on Spark Configuration Property List to view these parameters. Moreover, in Lecture 12, we detailed the basic configuration items of Spark itself, and these configuration items can be configured in hive-site.xml to allow you to more precisely control the integration between Hive and Spark.

Key Review #

Alright, we’ve finished today’s content! There was a lot of information, so let’s summarize together.

In today’s lesson, you needed to understand the two common integration methods between Spark and Hive: Spark with Hive and Hive on Spark. The former is led by the Spark community, with Spark as the main component and Hive as the supplement. The latter is led by the Hive community, with Hive as the main component and Spark as the supplement. Each integration method has its own advantages and is suitable for different scenarios.

In the Spark with Hive integration method, Spark mainly utilizes the Hive Metastore to expand the data source, thereby reducing the management and maintenance costs of distributed files, such as path management, partition management, schema maintenance, and so on.

For Spark with Hive, we have at least three ways to achieve integration between Spark and Hive: SparkSession + Hive Metastore, spark-sql CLI + Hive Metastore, and Beeline + Spark Thrift Server. I have summarized these three integration methods in a table for your reference at any time.

Image

In contrast to Spark with Hive, the other integration method is Hive on Spark. This integration method essentially provides Hive users with a new option, which is to choose Spark as a more performant execution engine in addition to the existing options of MapReduce and Tez.

Therefore, with Spark’s current popularity, teams and developers who are accustomed to using Hive are more willing to try and adopt Spark as the backend execution engine.

Once you are familiar with the differences and applicable scenarios of different integration methods, you will be able to integrate Spark and Hive in a targeted and systematic manner in your future work. Keep up the good work!

Practice for Each Lesson #

  1. In the deployment mode of Hive on Spark, if you use another set of Spark deployments to access the Hive Metastore, for example, by creating a SparkSession and accessing the Hive Metastore to expand the data source. So, in this case, can you briefly describe the execution path of user code?

  2. Although the main topic of our column is Spark, I strongly recommend that you study and remember the architecture design of Hive. The loosely coupled design concept makes Hive itself very lightweight and gives it great scalability. It is because of this that Hive has always firmly occupied the position of the open-source data warehouse leader. The design philosophy of Hive is very worth our study. Such design philosophy can be widely applied to any place that requires architecture design, whether it is front-end, back-end, big data, or machine learning.

Feel free to interact with me in the comment section and share this lecture with more colleagues and friends.