44 Data Processing Applications and How They Interact With Data

44 Data Processing: How Applications Interact with Data #

Hello, I’m Chen Tian.

Whether we are working on server-side or client-side development, interacting with data is essential for us developers.

For the client-side, data fetched from the server often needs to be cached (either in-memory cache or SQLite cache), or even locally stored (through files or SQLite).

On the server-side, scenarios for dealing with data are even more abundant. Apart from databases and caches, there are extensive text data indexing (such as search engines), real-time message queues processing data in a streaming fashion, or non-real-time batch processing that performs ETL (Extract, Transform, and Load) on massive data sets in a data warehouse.

Image

Today, we will talk about how to use Rust for data processing, primarily focusing on two aspects: how to access relational databases with Rust, and how to analyze and process semi-structured data with Rust. Hopefully, by learning the content of this lecture, especially the second part, you can broaden your horizons and gain a deeper understanding of data processing.

Accessing Relational Databases #

As the most critical data storage and access tool for internet applications, relational databases are supported by nearly every programming language.

In Rust, there are drivers for almost all mainstream relational databases, such as rust-postgres, rust-mysql-simple, etc. However, we generally do not directly use these database drivers because that can make the application too tightly coupled with a specific database. Instead, we use ORMs (Object-Relational Mapping).

There are mature ORMs like diesel and emerging candidates such as sea-orm in Rust. Diesel does not support async operations, whereas sea-orm does. Therefore, it is reasonable to believe that as sea-orm matures, more and more applications will be built on top of it.

If you find ORMs too cumbersome with too many formalities, yet you don’t want to use a specific database driver, you can also turn to sqlx. Sqlx provides async access support for various databases (Postgres, MySQL, SQLite, MSSQL) without using DSLs and can perform compile-time checks on SQL queries, which is very lightweight. It can query data directly from the database or automatically convert row data into corresponding structures through derive macros.

Today, let’s try using sqlx to handle two very common features: user registration and login.

sqlx #

Build the following table structure to handle user login information:

CREATE TABLE IF NOT EXISTS users
(
    id              INTEGER PRIMARY KEY NOT NULL,
    email           VARCHAR UNIQUE      NOT NULL,
    hashed_password VARCHAR             NOT NULL
);

It is important to note that when storing user information in the database, especially sensitive data like passwords, it must be stored using specific hash algorithms. OWASP provides the following security recommendations for password storage:

  1. Use Argon2id if it is available (requires at least 15MB of memory on the target machine).
  2. If Argon2id is not available, use bcrypt (at least 10 iterations of the algorithm).
  3. Consider scrypt/PBKDF2 afterwards.

Argon2id is a combination of Argon2d and Argon2i. Argon2d offers strong resistance to GPU cracking but can be susceptible to side-channel attacks under certain conditions, while Argon2i can prevent side-channel attacks but is slightly less resistant to GPU cracking. Thus, A_ARGON2ID is the preferred password hashing tool as long as the programming language supports it.

Rust has a complete set of password-hashes tools. We can use the argon2 crate from it, and a complete password hash generated by it looks like this:

$argon2id$v=19$m=4096,t=3,p=1$l7IEIWV7puJYJAZHyyut8A$OPxL09ODxp/xDQEnlG1NWdOsTr7RzuleBtiYQsnCyXY

This string includes the version of argon2id (19), the size of memory used (4096k), the number of iterations (3 times), the degree of parallelism (1 thread), and the base64-encoded salt and hash.

So, when a new user registers, we hash the input password with argon2 and store it in the database; when a user logs in with email/password, we find the user by email and then verify the password with argon2. We use sqlx to access the database. For simplicity, to avoid installing additional databases, we use SQLite to store data (if you have MySQL or PostgreSQL locally, you can replace the corresponding statements yourself).

With this in mind, we create a new project and add the related dependencies:

[dev-dependencies]
anyhow = "1"
argon2 = "0.3"
lazy_static = "1"
rand_core = { version = "0.6", features = ["std"] }
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "sqlite"] }
tokio = { version = "1", features = ["full" ] }

Then create examples/user.rs and add the code, you can refer to the detailed comments to understand:

// (The code sections are truncated, and due to space limitations, will not be displayed in full. Please refer to the original text for the complete code.)

In this code, we have somewhat wrapped the capabilities of argon2 to provide the generate_password_hash and verify_password methods for registration and login. For database access, we provide a connection pool SqlitePool for lock-free access.

You may have noticed this syntax:

let user: User = sqlx::query_as("SELECT * from users WHERE email = ?")
    .bind(email)
    .fetch_one(&self.pool)
    .await?;

Isn’t it surprising? Generally, this is the functionality found only in ORMs. Indeed, it showcases the power of Rust traits again: we do not need an ORM to associate data from the database with a specific Model; we only need to provide the desired structure T: FromRow when querying.

We can look at the definition of the query_as function and FromRow trait (code):

pub fn query_as<'q, DB, O>(sql: &'q str) -> QueryAs<'q, DB, O, <DB as HasArguments<'q>>::Arguments>
where
    DB: Database,
    O: for<'r> FromRow<'r, DB::Row>,
{
    QueryAs {
        inner: query(sql),
        output: PhantomData,
    }
}

pub trait FromRow<'r, R: Row>: Sized {
    fn from_row(row: &'r R) -> Result<Self, Error>;
}

If you want a data structure to support FromRow, it’s very simple, just use the sqlx::FromRow derive macro:

#[derive(Debug, sqlx::FromRow)]
pub struct User {
    id: i64,
    email: String,
    hashed_password: String,
}

I hope this example lets you appreciate the power and simplicity of database handling in Rust. We have written code with Node.js/Python that has never had such an intuitive feel. Additionally, sqlx is a very elegant crate, and I suggest you look at its source code if you have the time. The aforementioned sea-orm also uses sqlx at its core.

Special Note: If the above example fails to run, you can go to GitHub to copy the example.db to your local data directory, and then run it.

Analyzing Semi-Structured Data with Rust #

In production environments, we accumulate a large amount of semi-structured data, such as various logs, monitoring data, and analysis data.

Taking logs as an example, although they are usually ingested into log analysis tools for investigation and troubleshooting through visual interfaces, occasionally we need to write small tools ourselves to handle them. Typically, we would use Python for such tasks because it has very comfortable tools like pandas. However, pandas uses a lot of memory and isn’t very efficient. Is there a better option?

In lecture 6, we introduced polars, and we also used polars and sqlparser to write a tool for processing CSV files. In fact, Polars uses Apache Arrow underneath. If you are often involved in big data processing, you should be quite familiar with columnar data storage (columnar datastore) and Data Frame, and arrow is a columnar data storage that is used for storage and computation in memory, it is the foundation for building the next generation of data platforms.

Given the increasingly important role of Rust in the industry, Apache Arrow also has a fully implemented Rust version, which has given rise to an efficient in-memory query engine called datafusion, as well as a distributed query engine named ballista that can replace Spark in certain scenarios.

There are many heavyweight applications currently using Apache Arrow and datafusion, with the most exciting being InfluxDB IOx, which is the core engine for the next generation of InfluxDB.

Let’s feel the power of datafusion through a simple usage example:

use datafusion::prelude::*;
use datafusion::arrow::util::pretty::print_batches;
use datafusion::arrow::record_batch::RecordBatch;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
  // register the table
  let mut ctx = ExecutionContext::new();
  ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).await?;

  // create a plan to run a SQL query
  let df = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;

  // execute and print results
  df.show().await?;
  Ok(())
}

In this code, we infer the schema of the CSV using CsvReadOptions, register it as a logical example table, and can query it using SQL, which is very powerful.

Next, let us build a command-line analytics tool for Nginx logs using datafusion.

datafusion #

In the GitHub repo of this course, I’ve provided a sample log found online, renamed as nginx_logs.csv (note that the suffix should be csv), formatted as follows:

93.180.71.3 - - "17/May/2015:08:05:32 +0000" GET "/downloads/product_1" "HTTP/1.1" 304 0 "-" "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"
93.180.71.3 - - "17/May/2015:08:05:23 +0000" GET "/downloads/product_1" "HTTP/1.1" 304 0 "-" "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"
80.91.33.133 - - "17/May/2015:08:05:24 +0000" GET "/downloads/product_1" "HTTP/1.1" 304 0 "-" "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.17)"

This log has ten fields, and except for a few “-”, which are not clear what they represent, the other fields are rather guessable.

Since the format of nginx_logs is constructed in Nginx’s configuration, the log file does not have a header line like a CSV file. Without a header, datafusion cannot infer the schema for us directly. This means we need to explicitly tell datafusion what the schema of the log file looks like.

However, for datafusion, creating a schema is quite simple. For example:

let schema = Arc::new(Schema::new(vec![
    Field::new("ip", DataType::Utf8, false),
    Field::new("code", DataType::Int32, false),
]));

To maximize flexibility, we can build a simple schema definition file in which every field corresponds in order to the fields of the nginx log:

---
- name: ip
  type: string
- name: unused1
  type: string
- name: unused2
  type: string
- name: date
  type: string
- name: method
  type: string
- name: url
  type: string
- name: version
  type: string
- name: code
  type: integer
- name: len
  type: integer
- name: unused3
  type: string
- name: ua
  type: string

With this setup, if we encounter a different type of log file in the future, we can modify the schema definition without having to change the program itself.

For this schema definition file, we can use serde and serde-yaml to read it, and then implement the From trait to map SchemaField to datafusion’s Field structure:

// Serialization and Deserialization code and Traits

With this basic schema transformation functionality, we can construct our nginx log processing structure and functionalities:

// Nginx log processing code structure

With just around 80 lines of code, we completed the reading, parsing, and query functionalities of the nginx log file, with 50 of those lines dedicated to handling the schema configuration file. Isn’t it a bit unbelievable?

Arrow/datafusion is too powerful, isn’t it? Behind this simplicity lie the merits of 10w lines of arrow code and 1w lines of datafusion code.

Let’s write some more code to call it:

// Code to use nginx_log

In this snippet, we obtain content from stdin and treat each line of input as an SQL statement passed to nginx_log.query, then display the query results.

Let’s test it:

// Example command line usage and output

Isn’t it impressive? We can leverage the strong expressiveness of SQL for all sorts of complex queries. Not only that, you can even perform multiple queries at once from a file containing multiple SQL statements. For example, I created such a file analyze.sql. Then, I can get the results like this:

// Example command line usage and output

Summary #

Today we covered how to process structured data in relational databases and semi-structured data in file systems using Rust.

Although in the workplace, we may not use arrow/datafusion to create a “next-generation” data platform, having the ability to process semi-structured data can solve many very practical issues.

For example, scanning Nginx/CDN and server logs every 10 minutes for abnormal access in the past 10 minutes and then cutting off access to that user/device for a while. Such special requirements are difficult for general data platforms to handle, and we need to write code ourselves to implement them. In such times, tools like arrow/datafusion are very convenient.

Thinking Exercises #

  1. Please read the documentation for diesel or sea-orm yourself and then try to implement the user registration/login functionality we built directly with sqlx using diesel or sea-orm.
  2. Datafusion supports not only CSV but also semi-structured data types like ndJSON, parquet, and avro. If your company’s production environment has these types of semi-structured data, try reading the corresponding documents and use datafusion to read and query them.

Thank you for listening. Congratulations on completing the 44th Rust learning session, the learning journey is almost over, and I’ll see you in the next class.