00 Heavy Meal Quick Start Into Scala Language

00 Heavy Meal Quick Start into Scala Language #

Hello, I’m Hu Xi. Recently, I noticed some students in the comments section mentioned that “Scala is not easy to understand.” So, I have decided to add an extra lecture to talk about the basic syntax of Scala, including variable and function definitions, tuple notation, loop statements in functional programming style, the unique case classes and the powerful match pattern matching feature, as well as the usage of Option objects.

After completing this lecture, I believe you will be able to grasp these practical Scala syntax in a relatively short period of time, especially the Scala language features used in Kafka source code, and completely remove the language barriers when reading the source code.

Java Functional Programming #

As I mentioned in the preface, it’s not necessary for you to be familiar with the Scala language, but you should have some understanding of Java 8 functional programming, especially with the usage of Java 8 Streams.

If you haven’t had much exposure to lambda expressions and Java 8 Streams before, I recommend a good book for you: “Java 8 in Action”. This book provides in-depth explanations of lambda expressions, Streams, and functional programming through numerous examples. You can give it a read.

Now, I’ll share a practical example with you to start our discussion today.

TopicPartition is a class defined by Kafka, which models the partition object of a Kafka topic. Here is the key code of TopicPartition:

public final class TopicPartition implements Serializable {
  private final int partition;
  private final String topic;
  // Other fields and methods...
}

For any partition, the most important fields of a TopicPartition instance are the topic and partition fields, which represent the topic and partition number of Kafka, respectively. Assuming we have a List of a group of partition objects, how can I write Java code to find the list of topics whose partition count is greater than 3 and starts with “test”? Take a moment to think about it before checking the answer below.

First, let me give you the Java 8 Stream style answer:

// Assuming the variable name for the partition object list is "list"
Set<String> topics = list.stream()
        .filter(tp -> tp.topic().startsWith("test-"))
        .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting()))
        .entrySet().stream()
        .filter(entry -> entry.getValue() > 3)
        .map(entry -> entry.getKey()).collect(Collectors.toSet());

This is a typical Java 8 Stream code, with a lot of operation operators like filter, map, and so on, as well as lambda expressions, making the code concise and readable.

Let me explain the purpose of each line starting from line 3: The filter method in line 3 filters the topics that start with “test”; line 4 calls the collect method to group by the topic and count the number of partitions, which generates a Map object; lines 5 to 7 extract all the entries from this Map object and then call the filter method again to extract the topics whose partition count is greater than 3; finally, the topics are collected into a set and returned.

In fact, by providing this example, I just want to illustrate that the coding style in Scala and Java 8 Stream are quite similar: On one hand, there are many operation operators like filter, map, flatMap, and so on in the code; on the other hand, the coding style is similar to the lambda expression syntax in Java.

If you don’t believe me, let’s take a look at the getLag method in Kafka that calculates the consumer lag:

private def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long] =
  offset.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))

You see, there are also filter and map operations here. It’s similar to the Java code above, isn’t it?

If you still don’t understand what this method’s code means, don’t worry, I’ll guide you step by step in the following sections. I believe that after completing this lesson, you will be able to understand the source code of the getLag method on your own. The getLag code is a typical example of Kafka source code. Once you are familiar with this coding style, you will be able to adapt it to other source code reading challenges.

Let’s start with variables in Scala. After all, no matter what programming language you learn, the most fundamental thing is to understand how variables are defined.

Defining Variables and Functions #

In Scala, there are two types of variables: val and var. val is equivalent to the final variable in Java, once initialized, it cannot be reassigned. On the contrary, var is a non-final variable, which can be reassigned. Let’s take a look at this code:

scala> val msg = "hello, world"
msg: String = hello, world

scala> msg = "another string"
<console>:12: error: reassignment to val
       msg = "another string"

scala> var a: Long = 1L
a: Long = 1

scala> a = 2
a: Long = 2

Quite straightforward, right? msg is a val, and a is a var, so msg cannot be reassigned, while a can. I want to remind you that you can specify the type of the variable by using “colon + type” after the variable name. For example, in line 6 of this code, “:Long” tells us that variable a is of type Long. Of course, it is also allowed to omit “:Long” because Scala can automatically infer the type of a based on its value “1L”.

However, in many cases, explicitly specifying the variable type can make the code more readable and maintainable.

Next, let’s take a look at how functions are defined in Scala. I will use the example of a Max function that takes two integers and returns the maximum value. Here is the code:

def max(x: Int, y: Int): Int = {
  if (x > y) x
  else y
}

First, the keyword def indicates that this is a function. max is the function name, and the variables x and y inside the parentheses are the function input parameters, both of which are of type Int. The “Int =” at the end indicates that the max function returns an integer.

Second, the max code uses the if statement to compare the values of x and y, and returns the greater value. However, it does not use the return keyword, but directly writes x or y. In Scala, the value of the last line of the function body code block will be returned as the function result. In this example, the last line in the if branch code block is x, so the if branch returns x. Similarly, the else branch returns y.

After explaining the max function, let me further help you understand Scala functions with a real function from the Kafka source code:

def deleteIndicesIfExist(
  // The default value of the parameter suffix is an empty string
  // Unit at the end of the function is similar to the void keyword in Java, indicating that the function does not return any result
  baseFile: File, suffix: String = ""): Unit = {
  info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
  val offset = offsetFromFile(baseFile)
  Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
  Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
  Files.deleteIfExists(Log.transactionIndexFile(dir, offset, suffix).toPath)
}

Compared to the max function above, this function has two additional syntax features that you need to understand.

The first feature is default parameter value, which is not supported in Java. The parameter suffix of this function has a default value of an empty string, so both of the following ways of calling the function are valid:

deleteIndicesIfExist(baseFile) // OK
deleteIndicesIfExist(baseFile, ".swap") // OK

The second feature is the return value of this function is Unit. Scala’s Unit is similar to Java’s void, so the return value of the deleteIndicesIfExist function is of type Unit, indicating that it only executes a piece of logic code and does not need to return any result.

Definition of Tuple #

Next, let’s take a look at the concept of tuples in Scala. A tuple is a container that holds data and once created, it cannot be changed. The data in a tuple can be of different data types. Defining and accessing a tuple is simple, as shown in the code below:

scala> val a = (1, 2.3, "hello", List(1,2,3)) // Define a tuple with 4 elements, where each element can have a different type
a: (Int, Double, String, List[Int]) = (1,2.3,hello,List(1, 2, 3))

scala> a._1 // Access the first element of the tuple
res0: Int = 1

scala> a._2 // Access the second element of the tuple
res1: Double = 2.3

scala> a._3 // Access the third element of the tuple
res2: String = hello

scala> a._4 // Access the fourth element of the tuple
res3: List[Int] = List(1, 2, 3)

In general, tuples are easy and elegant to use. There are many examples in the Kafka source code that use tuples, such as:

def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = { // Return a tuple of type (Boolean, Errors)
    ...
    if (minIsr <= curInSyncReplicaIds.size) {
        ...
        (true, Errors.NONE)
    } else
        (false, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
}

The method checkEnoughReplicasReachOffset returns a tuple of type (Boolean, Errors), where the first element or field of the tuple is of type Boolean and the second element is a custom Errors type defined in Kafka.

This method checks if the number of replicas in the ISR (In-Sync Replica) of a partition is greater than or equal to the minimum ISR replicas required. If it is, it returns the tuple (true, Errors.NONE), otherwise it returns the tuple (false, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND). For now, you don’t need to worry about the meaning of minIsr or curInSyncReplicaIds in the code, just focus on understanding the usage of tuples in the Kafka source code.

Looping in Scala #

Let’s take a look at the looping in Scala. There are two common ways to define loops: imperative programming and functional programming. We are familiar with the first approach, such as the following for loop code:

scala> val list = List(1, 2, 3, 4, 5)
list: List[Int] = List(1, 2, 3, 4, 5)

scala> for (element <- list) println(element)
1
2
3
4
5

Scala also supports loops in functional programming style, similar to the following code:

scala> list.foreach(e => println(e))
// output omitted...
scala> list.foreach(println)
// output omitted...

Especially with the second style in the code snippet, the code becomes extremely concise. Let me reinforce your memory with a real piece of Kafka source code. It is taken from the stopProcessingRequests method in the SocketServer component, which is responsible for stopping the processing of requests and new incoming TCP connections by the broker. The SocketServer component is an important component for implementing Kafka’s network communication, and we will discuss it specifically in the next 3 lessons. For now, let’s learn this clearly functional-style code:

// dataPlaneAcceptors: ConcurrentHashMap<Endpoint, Acceptor> object
dataPlaneAcceptors.asScala.values.foreach(_.initiateShutdown())

This one-liner first calls the asScala method to convert the Java ConcurrentHashMap into the concurrent.Map object in Scala. Then it retrieves all the Acceptor threads it holds and, through the foreach loop, calls the initiateShutdown method on each Acceptor object. If this logic were implemented using imperative programming, it would require at least several lines, or even more than ten lines, to accomplish.

case classes #

In Scala, case classes are similar to ordinary classes, but they have some very important differences. Case classes are ideal for representing immutable data. Additionally, their most useful feature is that case classes automatically define getter methods for all class fields, which saves a lot of boilerplate code. Let me give you an example to illustrate.

If we were to write a class representing a point on a plane in Java, the code would look something like this:

public final class Point {
  private int x;
  private int y;
  public Point(int x, int y) {
    this.x = x;
    this.y = y;
  }
  // setter methods......
  // getter methods......
}

I won’t list the complete getter and setter methods here, but if you have experience with Java, you probably know what those boilerplate code looks like. However, if we use a case class in Scala, we only need to write one line of code:

case class Point(x: Int, y: Int) // Default syntax. x and y cannot be modified.
case class Point(var x: Int, var y: Int) // Supports modification of x and y.

Scala will automatically create getter methods for x and y. By default, x and y cannot be modified. If you want to support modification, you need to use the syntax in the second line of the code above.

Pattern Matching #

With the foundation of case classes in place, we can now learn about the powerful pattern matching functionality in Scala.

Compared to Java’s switch, which can only compare numerical values and strings, Scala’s match is much more powerful. Let me give you an example:

def describe(x: Any) = x match {
  case 1 => "one"
  case false => "False"
  case "hi" => "hello, world!"
  case Nil => "the empty list"
  case e: IOException => "this is an IOException"
  case s: String if s.length > 10 => "a long string"
  case _ => "something else"
}

In this function, x is of type Any, which is equivalent to the Object type in Java, i.e., the superclass of all classes. Note the use of case _ in the second-to-last line. This is used as a fallback case. If none of the above case branches match, this branch will be executed. Additionally, pattern matching also supports some complex expressions, such as the case branch in the third-to-last line, which indicates that if x is of type String and its length is greater than 10, then this branch will be executed.

It’s worth noting that Java just recently introduced the same functionality in JDK 14, which demonstrates the power and convenience of Scala’s syntax.

Option Object #

Finally, let me introduce a small grammar feature or language characteristic: Option object.

In fact, Java has also introduced a similar class called Optional. Based on my understanding, whether it is Option in Scala or Optional in Java, they are both used to help us better avoid NullPointerException.

Option represents a container object that may or may not contain a value. Since it is a container, it is typically written as Option[Any]. The Any within the square brackets is the Any type mentioned earlier, which can be any type. If the value exists, you can use Some(x) to get the value or assign a value to it; otherwise, you can use None to represent it. Let me help you understand it with a piece of code:

scala> val keywords = Map("scala" -> "option", "java" -> "optional") // Create a Map object
keywords: scala.collection.immutable.Map[String,String] = Map(scala -> option, java -> optional)

scala> keywords.get("java") // Get the value with key "java". Returns Some(optional) since the value exists
res24: Option[String] = Some(optional)

scala> keywords.get("C") // Get the value with key "C". Returns None since the value does not exist
res23: Option[String] = None

Option objects are often used together with pattern matching syntax to implement different handling logic in different situations. For example, what code should be executed when the Option object has a value and when it does not have a value. You can refer to the following code for the specific syntax:

def display(game: Option[String]) = game match {
  case Some(s) => s
  case None => "unknown"
}

scala> display(Some("Heroes 3"))
res26: String = Heroes 3

scala> display(Some("StarCraft"))
res27: String = StarCraft

scala> display(None)
res28: String = unknown

Summary #

Today, we spent some time quickly learning the syntax of the Scala language, which can help you quickly get started with Kafka source code. Now, let’s take a look at the getLag method source code that I mentioned at the beginning of this lesson, and see if you can explain its meaning now. Here it is again:

private def getLag(offset: Option[Long], logEndOffset: Option[Long]): Option[Long] =
  offset.filter(_ != -1).flatMap(offset => logEndOffset.map(_ - offset))

Now, you should know that it is a function that takes two parameters of type Option[Long] and returns a result of type Option[Long]. The code logic is simple: first, it checks if the offset has a value and is not -1. This is done in the filter function. Then, it calls the flatMap method to calculate the difference between the logEndOffset value and the offset value, and finally returns this difference as the Lag.

After this lesson, language should no longer be a barrier to learning source code for you. Now, we can continue to focus on studying the source code. On this occasion, I would like to say a few more words to you.

Many times, we think that we need enough perseverance to persist in learning source code, but in fact, perseverance is cultivated during the process of reading source code.

Considering that source code is not as easy to grasp as concrete technology itself, I strive to explain this course in the clearest and easiest-to-understand way. So, I hope you can spend a little time studying with me every day. I believe that by the end of the course, you will not only understand the Kafka Broker source code, but also enhance your perseverance. The improvement of perseverance and execution may be even more precious than the improvement of technology itself.

In addition, I would like to share a trick with you: if you want to cultivate the habit of reading source code every day, it is best to break down the goal into small enough parts. The human brain is inherently lazy. Rather than saying “I have to read 1000 lines of source code every day”, it is more willing to accept “I only need to read 20 lines per day”. You may say, reading only 20 lines per day is too little, right? Actually, it’s not. As long as you read 20 lines of source code, you can definitely read more. The goal of “20 lines” is just to motivate you to start doing this. Moreover, even if you really only read 20 lines, so what? Reading 20 lines is better than not reading a single line, right?

Of course, when reading source code, you often encounter a situation where you can’t understand a certain part of the code. It’s okay to skip the code that you don’t understand.

If you are a perfectionist, I have a few suggestions for dealing with code that you don’t understand:

  1. Read it multiple times. Don’t underestimate this simple advice. Sometimes, our brains are very capricious. If we only let them read the code once, they might stubbornly say that they don’t understand. But if you let them read it multiple times, maybe they will suddenly understand it.

  2. Learn from various resources. For example, community or online design documents, source code comments, or source code test cases related to this part of the code. Especially understanding the test cases is often the fastest way for us to comprehend the essence of the code.

In short, reading source code is a long-term project. Don’t fantasize about shortcuts or instant success. Small accumulations can lead to big changes. Let’s work hard together.