Skip to main content
  1. Posts/

Concurrent Database Connection and Query execution in Rust

·944 words

Introduction #

This post can be more comprehensive when you have already read my article about Project Datguri. As this post is derived from the project Datguri.

In this post, what I found out with initializing database connection and executing query concurrently.

Example Code #

The provided Rust code defines two asynchronous functions,find_comment_by_room and find_best_comment_by_room. Both functions are designed to retrieve comments for a given room in a web application.

/// [GET /room/comment?offset=0&limit=10?is_user=true] find all comments by room
async fn find_comment_by_room(
    req: HttpRequest,
    body: web::Json<RoomUrlDto>,
    query: web::Query<RoomQueryString>,
    state: web::Data<AppState>,
) -> impl Responder {
    let url = Comment::remove_query_string(&body.url);
    let comments = match postgres::comment::find_comment_by_room(
        &state.postgres,
        &url,
        query.get_offset(),
        query.get_limit(),
    )
    .await
    {
        Ok(comments) => comments,
        Err(e) => {
            return HttpResponse::InternalServerError().json(JsonMessage {
                msg: format!("Error: {:?}", e),
            })
        }
    };

    if query.get_is_user() == false {
        return HttpResponse::Ok().json(comments);
    }

    let user_uuid = match validate_request(req, &state.postgres).await {
        Ok(uuid) => uuid,
        Err(e) => return e,
    };

    let mut result: Vec<CommentResponse> = vec![];
    // TODO can run asynchronously
    for comment in comments {
        let comment_like = match postgres::comment::find_one_comment_like(
            &state.postgres,
            &user_uuid,
            &comment.uuid,
        )
        .await
        {
            Ok(comment_like) => {
                if comment_like.is_none() {
                    0
                } else {
                    comment_like.unwrap().like
                }
            }
            Err(e) => return HttpResponse::InternalServerError().json(format!("Error: {:?}", e)),
        };

        result.push(CommentResponse::new(comment, comment_like));
    }

    HttpResponse::Ok().json(result)
}

/// [GET /room/comment/best?offset=0&limit=10&is_user=true] find all comments by room
async fn find_best_comment_by_room(
    req: HttpRequest,
    body: web::Json<RoomUrlDto>,
    query: web::Query<RoomQueryString>,
    state: web::Data<AppState>,
) -> impl Responder {
    let url = Comment::remove_query_string(&body.url);
    let comments = match postgres::comment::find_best_comment_by_room(
        &state.postgres,
        &url,
        query.get_offset(),
        query.get_limit(),
    )
    .await
    {
        Ok(comments) => comments,
        Err(e) => {
            return HttpResponse::InternalServerError().json(JsonMessage {
                msg: format!("Error: {:?}", e),
            })
        }
    };

    if query.get_is_user() == false {
        return HttpResponse::Ok().json(comments);
    }

    let user_uuid = match validate_request(req, &state.postgres).await {
        Ok(uuid) => uuid,
        Err(e) => return e,
    };

    let mut futures = vec![];
    let mut result: Vec<CommentResponse> = vec![];

    for comment in comments.iter() {
        futures.push(postgres::comment::find_one_comment_like(
            &state.postgres,
            &user_uuid,
            &comment.uuid,
        ));
    }

    let comment_like_list: Vec<i32> = futures::future::join_all(futures)
        .await
        .into_iter()
        .map(|res| match res {
            Ok(comment_like) => {
                if comment_like.is_none() {
                    0
                } else {
                    comment_like.unwrap().like
                }
            }
            Err(_) => 0,
        })
        .collect();

    let mut i = 0;
    for comment in comments {
        result.push(CommentResponse::new(comment, comment_like_list[i]));
        i += 1;
    }

    HttpResponse::Ok().json(result)
}
  1. find_comment_by_room: This function takes an HTTP request, a JSON body containing a RoomUrlDto, a query containing RoomQueryString, and the application state as input. It queries the database to find all comments associated with a given room. The function then checks whether the request is from a user or not. If not, it returns the comments as a JSON response. If the request is from a user, it iterates through the comments, retrieves the user’s comment_like for each comment, and constructs a CommentResponse object. Finally, it returns the CommentResponse objects as a JSON response.
  2. find_best_comment_by_room: This function has a similar structure to the find_comment_by_room function but has some differences in its implementation. It retrieves the best comments for a given room based on a specific criterion (e.g., most likes). If the request is not from a user, it returns the comments as a JSON response. If the request is from a user, it concurrently fetches the user’s comment_likes for all the comments using Rust’s futures library, which allows for improved performance. The function then constructs CommentResponse objects and returns them as a JSON response.

Here’s a more detailed breakdown of how find_best_comment_by_room utilizes the futures library:

  • First, it initializes an empty vector called futures to store the individual comment_like queries.
  • Then, it iterates through each comment and creates a new future for each comment_like query, pushing each future into the futures vector.
  • Once all the comment_like queries have been added to the futures vector, the join_all function is called. This function concurrently executes all the queries in the vector.
  • After the concurrent execution is complete, the results are processed to construct a CommentResponse object for each comment, which is then returned as a JSON response.

By using the futures library, find_best_comment_by_room can potentially achieve significantly improved performance compared to find_comment_by_room, especially when there are many comments to process. This concurrency-based approach reduces the overall time spent waiting for database queries to complete, resulting in faster response times and a more efficient web application.

Initial time spent when Postgres connection pool #

This section was the most valuable experience for me while working on this project. I just pointed out that, executing concurrent execution can save much time, and leads to better performance.

Surprisingly, in case of initial query execution, concurrent execution can take much more time compare to sequential execution.

Function NameInitial ExecutionAfter Initial Execution
find_comment_by_room620ms314ms
find_best_comment_by_room1313ms127ms

This result have give me a lot of surprise. As I thought concurrent execution can save much time. However, I soon realized that this discrepancy in response time was due to the time it took to create initial connections to the database. Since find_best_comment_by_room executes multiple queries at the same time, it also needs to initialize multiple connections to the database at the same time. This can take longer than initializing a single connection, as in the case of find_comment_by_room.

Once the connection pool was ready, I found that concurrent execution had a much more efficient response time. After the initial execution, find_comment_by_room took only 314ms to execute, while find_best_comment_by_room took only 127ms. This shows that concurrent execution can indeed be an effective way to optimize database query performance, but it’s important to keep in mind the impact of initial connection time.

Above code, is when POSTGRES_MAX_CONNECTION number is set to 10. When, I set POSTGRES_MAX_CONNECTION to 1, its performance is as follows.

Function NameInitial ExecutionAfter Initial Execution
find_comment_by_room575ms560ms
find_best_comment_by_room666ms637ms

Which the discrepancy is not that much compare to the previous result, where it should create multiple connections at once.