Rust with axum

Cover Page

Back-end Page

Cargo.toml: add dependencies

Change to your chatterd folder and edit the file Cargo.toml to add the 3rd-party libraries we will be using.

server$ cd ~/reactive/chatterd 
server$ vi Cargo.toml

In Cargo.toml, add the following lines below the existing [dependencies] tag:

async-stream = "0.3.6"
futures = "0.3.31"
regex = "1.12.2"

and add “stream” feature to the reqwest line:

reqwest = { version = "0.12.24", features = ["json", "stream"] }

handlers

Edit src/handlers.rs:

server$ vi src/handlers.rs

First modify the use imports at the top of the file:

Next define the following structures to help llmchat() deserialize JSON received from clients. Add these lines right below the use block:

#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct OllamaMessage {
    role: String,
    content: String,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct OllamaRequest {
    appID: String,
    model: String,
    messages: Vec<OllamaMessage>,
    stream: bool,
}

#[derive(Default, Deserialize)]
#[allow(dead_code)]
pub struct OllamaResponse {
    model: String,
    message: OllamaMessage,
}

To store the client’s conversation context/history with Ollama in the PostgreSQL database, llmchat() first confirms that the client has sent an appID to be used to tag its entries in the database. In Rust, the Json deserialization automatically checks for existence of appID and return an HTTP error if it is absent. Here’s the signature of llmchat():

pub async fn llmchat(
    State(appState): State<AppState>,
    ConnectInfo(clientIP): ConnectInfo<SocketAddr>,
    Json(mut ollamaRequest): Json<OllamaRequest>, // will check for appID
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {

    // insert into DB
}

Once we confirm that the client has an appID, we retrieve a connection to PostgreSQL from the connection pool and use it to insert the arriving prompt into the database, adding to the user’s conversation history with Ollama. Replace the comment // insert into DB with the following code:

    let appState_clone = appState.clone();
    let chatterDB = appState_clone.pgpool
        .get()
        .await
        .map_err(|err| logServerErr(&clientIP, err.to_string()))?;

    // insert each message into the database
    for msg in ollamaRequest.messages {
        chatterDB
            .execute(
                "INSERT INTO chatts (name, message, id, appID) \
                VALUES ($1, $2, gen_random_uuid(), $3)",
                &[&msg.role, &msg.content, &ollamaRequest.appID,],
            )
            .await
            .map_err(|err| logClientErr(&clientIP, StatusCode::NOT_ACCEPTABLE, err.to_string()))?;
    }

    // retrieve history
    

Then we retrieve the client’s conversation history chronologically by timestamp, including the just inserted, current prompt, and put it in a JSON format expected by Ollama’s chat API. Replace // retrieve history with:

    // reconstruct ollamaRequest to be sent to Ollama:
    // - add context: retrieve all past messages by appID,
    //   incl. the one just received,
    // - convert each back to OllamaMessage, and
    // - insert it into ollamaRequest
    ollamaRequest.messages = chatterDB
        .query(
            "SELECT name, message FROM chatts WHERE appID = $1 ORDER BY time ASC",
            &[&ollamaRequest.appID],
        )
        .await
        .map_err(|err| logServerErr(&clientIP, err.to_string()))?
        .into_iter()
        .map(|row| OllamaMessage {
            role: row.get(0),
            content: row.get(1),
        })
        .collect();

    // create a stream driven by Ollama prompt completion

We next create a stream ndjson_yield_sse that will be fed by Ollama prompt completion. We use a linenl variable to buffer incoming NDJSON line(s). Then we declare an accumulator variable, completion, to accumulate the reply tokens Ollama streams back. To reduce storage requirement, we compact multiple consecutive whitespaces into one during the token accumulation process. The regular expression used for the compaction is stored in the wsRegex variable. Due to Rust’s memory ownership rules, we cannot reuse the PostgreSQL connection from llmchat() in the stream. Instead, we must obtain a separate one to be used in the stream. Once we’ve obtained a PostgreSQL connection, we can initiate and start the stream. With the stream defined, we instantiate axum’s Sse streaming object around ndjson_yield_sse. Replace // create a stream driven by Ollama prompt completion with:

    let ndjson_yield_sse = stream! {
        let mut linenl= String::with_capacity(192);// = String::new();
        let mut completion = String::with_capacity(1024);// = String::new();
        let wsRegex = Regex::new(r"[\s]+").unwrap();

        match appState.pgpool.get().await {
            Err(err) => {
                yield Ok(Event::default().event("error")
                    .data(json!({ "error": err.to_string() }).to_string()));
            },
            Ok(chatterDB) => {
                
                // send request to Ollama and yield reply as SSE stream

            } // Ok(chatterDB)
        } // match pgppool
    }; // ndjson_yield_sse

    logOk(&clientIP);
    Ok(Sse::new(ndjson_yield_sse))
    

Replace the comment, // send request to Ollama and yield reply as SSE stream with the following code:

                match appState.client
                .post(format!("{OLLAMA_BASE_URL}/chat"))
                .json(&ollamaRequest)  // convert the request to JSON
                .send()                // send request to Ollama
                .await {
                    Err(err) => {
                        yield Ok(Event::default().event("error")
                            .data(json!({ "error": err.to_string() }).to_string()))
                    },
                    Ok(response) => {
                        let bytes_stream_with_io_error =
                            response.bytes_stream().map_err(|err| Error::new(ErrorKind::Other, err));
                        let async_buf = StreamReader::new(bytes_stream_with_io_error);
                        let mut line_reader = BufReader::new(async_buf);

                        // accumulate tokens and yield data lines

                        // insert full response into database

                    } // Ok(response)
                } // match appState.send().await

If Ollama returns an error, we yield an SSE error. Otherwise, we read each line and convert it into an OllamaResponse type. After removing duplicated whitespaces, we accumulate the tokens in each line into completion and yield the line as an SSE Message data line. Replace // accumulate tokens and yield data lines with:

                        while line_reader.read_line(&mut linenl).await.unwrap_or_default() != 0 { // turn error into EOF (0)
                            let line = linenl.replace('\n', "");
                            linenl.clear(); // Clear the buffer for the next line

                            // deserialize each line into OllamaResponse
                            let ollamaResponse: OllamaResponse = from_str(&line).unwrap_or_default();

                            // append response token to full assistant message
                            completion.push_str(&wsRegex.replace_all(&ollamaResponse.message.content, " "));

                            // send NDJSON line as SSE data line
                            yield Ok(Event::default().data(&line));
                        } // while next line, accumulate tokens into completion
                        

When we reach the end of the NDJSON stream, we insert the full Ollama response into PostgreSQL database as the assistant’s reply. It will later be sent back to Ollama as part of subsequent prompts’ context. Replace // insert full response into database with:

                        if !completion.is_empty() {
                            // save full response to db, to form part of next prompt's history
                            if let Err(err) = chatterDB.execute(
                                "INSERT INTO chatts (name, message, id, appid) \
                                    VALUES ('assistant', $1, gen_random_uuid(), $2)",
                                &[&completion, &ollamaRequest.appID],
                                // replace 'assistant' with NULL to test error event
                            ) .await {
                                yield Ok(Event::default().event("error")
                                    .data(json!({ "error": err.to_string() }).to_string()))
                            }
                        } // full response
Generator in Rust

Fully functional generator, and its ``yield` operator, is still unstable in Rust.

The yield used in this solution is not the unstable yield, instead it is customized to the async-stream crate and works only within this crate. It cannot be used stand alone to build a general-purpose generator. For example, when chatterdDB.execute() returns an error, we cannot put yield inside a map_err()’s asynchronous coroutine; you’ll get a compiler error saying yield is not supported in the rust toolchain.

We’re done with handlers.rs. Save and exit the file.

main.rs package

Edit src/main.rs:

server$ vi src/main.rs

Find the router = Router::new() instantiation statement and add this route right after the route for /llmprompt:

        .route("/llmchat", post(handlers::llmchat))

We’re done with main.rs. Save and exit the file.

Build and test run

To build your server:

server$ cargo build --release

As before, it will take some time to download and build all the 3rd-party crates. Be patient.

Linking error with cargo build?

When running cargo build --release, if you see:

  error: linking with cc failed: exit status: 1
  note: collect2: fatal error: ld terminated with signal 9 [Killed]

below a long list of object files, try running cargo build --release again. It usually works the second time around, when it will have less remaining linking to do. If the error persisted, please talk to the teaching staff.


:point_right:Rust is a compiled language, like C/C++ and unlike Python, which is an interpreted language. This means you must run cargo build each and every time you made changes to your code, for the changes to show up in your executable.

To run your server:

server$ sudo ./chatterd
# Hit ^C to end the test

The cover back-end spec provides instructions on Testing llmChat API and SSE error handling.

References


Prepared by Chenglin Li, Xin Jie ‘Joyce’ Liu, and Sugih Jamin Last updated January 17th, 2026