Cover Page

Backend Page

Rust with axum

Cargo.toml: add dependencies

Change to your chatterd folder and edit the file Cargo.toml to add the 3rd-party libraries we will be using.

server$ cd ~/reactive/chatterd 
server$ vi Cargo.toml

In Cargo.toml, add the following lines below the existing [dependencies] tag:

futures = "0.3.31"
regex = "1.11.1"

and replace the reqwest line with:

reqwest = { version = "0.12.20", features = ["json", "stream"] }

handlers

Edit src/handlers.rs:

server$ vi src/handlers.rs

First modify the use imports at the top of the file:

Next define these three structs to help llmchat() deserialize JSON received from clients. Add these lines right below the use block:

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct OllamaMessage {
    role: String,
    content: String,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct OllamaRequest {
    appID: String,
    model: String,
    messages: Vec<OllamaMessage>,
    stream: bool,
}

#[derive(Default, Deserialize)]
#[allow(dead_code)]
pub struct OllamaResponse {
    model: String,
    created_at: String,
    message: OllamaMessage,
    done: bool,
}

To store the client’s conversation context/history with Ollama in the PostgreSQL database, llmchat() first confirms that the client has sent an appID that can be used to tag its entries in the database. Here’s the signature of llmchat(). The Json deserialization will check for existence of appID and return an HTTP error if it is absent:

pub async fn llmchat(
    State(appState): State<AppState>,
    ConnectInfo(clientIP): ConnectInfo<SocketAddr>,
    Json(mut ollama_request): Json<OllamaRequest>, // will check for appID
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {

    // insert into DB
}

Once we confirm that the client has an appID, we insert its current prompt into the database, adding to its conversation history with Ollama. Replace the comment // insert into DB with the following code:

    let chatterDB = appState
        .pgpool
        .get()
        .await
        .map_err(|err| logServerErr(clientIP, err.to_string()))?;

    let messages = ollama_request.messages;
    for msg in messages {
        chatterDB
            .execute(
                "INSERT INTO chatts (username, message, id, appID) VALUES ($1, $2, gen_random_uuid(), $3)",
                &[&msg.role, &msg.content, &ollama_request.appID], // preserve prompt formatting
            )
            .await
            .map_err(|err| logClientErr(clientIP, StatusCode::NOT_ACCEPTABLE, err.to_string()))?;
    }

    // retrieve history

Then we retrieve the client’s conversation history, including the recently inserted, current prompt, as the last entry, and put them in a JSON format expected by Ollama’s chat API. Replace // retrieve history with:

    ollama_request.messages = chatterDB
        .query(
            "SELECT username, message FROM chatts WHERE appID = $1 ORDER BY time ASC",
            &[&ollama_request.appID],
        )
        .await
        .map_err(|err| logServerErr(clientIP, err.to_string()))?
        .into_iter()
        .map(|row| OllamaMessage {
            role: row.get(0),
            content: row.get(1),
        })
        .collect();

    // send request to Ollama

We send the request so constructed to Ollama, declare an accumulator variable, full_response, to assemble the reply tokens Ollama streams back, and make a clone of the Postgres pool, pgpool, to process the replies (the clone is needed to satisfy Rust’s memory borrow checker). As we saw in the first tutorial, llmPrompt, Ollama streams the replies as a NDJSON stream. We transform this NDJSON stream into a stream of SSE events (more details later), which we pass to the creation of an axum’s Sse response instance, to be returned to the client. Replace // send request to Ollama with:

    let response = appState
        .client
        .post(format!("{OLLAMA_BASE_URL}/chat"))
        .json(&ollama_request)
        .send()
        .await
        .map_err(|err| logServerErr(clientIP, err.to_string()))?;

    let full_response = Arc::new(Mutex::new(String::new()));
    let pgpool = appState.pgpool.clone();

    // process NDJSON elements to produce SSE events

    logOk(clientIP);
    Ok(Sse::new(ndjson_map_sse))

We explore two ways to transform NDJSON stream to SSE events. You only need one of the two. In the above, we assume ndjson_map_sse is used over ndjson_yield_sse.

ndjson_map_sse

Compared to ndjson_yield_sse below, this variable is initialized in a functional manner: for each incoming NDJSON element, we accumulate it to full_response and pass it on after mapping it into an SSE event. Once the NDJSON stream is done, we insert the full_response into the PostgreSQL to form part of the client’s context. Replace // process NDJSON elements to produce SSE events with:

    let ndjson_map_sse = response.bytes_stream()
        .filter_map(move |chunk| {
            let full_response = full_response.clone();
            let pgpool = pgpool.clone();
            let appID = ollama_request.appID.clone();

            async move {
                match chunk {
                    Err(err) => Some(vec![Ok(Event::default()
                        .event("error")
                        .data(json!({ "error": err.to_string() }).to_string()))]),

                    Ok(bytes) => {
                        let line = String::from_utf8_lossy(&bytes).replace('\n', "");
                        let ollama_response: OllamaResponse =
                            from_slice(&bytes).unwrap_or_default();

                        if ollama_response.model.is_empty() {
                            return Some(vec![Ok(Event::default()
                                .event("error")
                                .data(line.replace("\\\"", "'")))]);
                        }

                        // SSE conversion and accumulate completion

In the above, we converted each incoming NDJSON line into an OllamaResponse type. If the conversion is unsuccessful and the model property of the type is empty, we return an SSE error event and move on to the next NDJSON line. Otherwise, we append the content in the OllamaResponse to the full_response variable. Then we encode the full NDJSON line into an SSE Message event and put it in an array of SSE events, as the array’s first element. Replace // SSE conversion and accumulate completion with:

                        full_response.lock().unwrap()
                            .push_str(&ollama_response.message.content);

                        // initialize vector of events to be returned with data line
                        let mut events = vec![Ok(Event::default().data(line))];

                        // insert full response into database

When we reach the end of the NDJSON stream, we insert the full Ollama response into PostgreSQL database as the assistant’s reply. Replace // insert full response into database with:

                        if ollama_response.done {
                            let assistant_response = full_response.lock().unwrap().clone();
                            let wsRegex = Regex::new(r"[\s]+").unwrap();

                            match pgpool.get().await {
                                Ok(chatterDB) => {
                                    if let Err(err) = chatterDB
                                        .execute(
                                            "INSERT INTO chatts (username, message, id, appID) \
                                            VALUES ('assistant', $1, gen_random_uuid(), $2)",
                                            // replace 'assistant' with NULL to test error event
                                            &[&wsRegex.replace_all(&*assistant_response, " "), &appID],
                                        )
                                        .await
                                    { // report error as an SSE error event
                                        events.push(Ok(Event::default()
                                            .event("error")
                                            .data(json!({ "error": err.to_string() }).to_string())));
                                    }
                                    // don't care about Ok(<#of lines modified>)
                                }
                                Err(err) => { // report error as an SSE error event
                                    events.push(Ok(Event::default()
                                        .event("error")
                                        .data(json!({ "error": err.to_string() }).to_string())));
                                }
                            }
                        } // ollama_response.done

                        // return vector of SSE events as individual events

If we encountered any error in the insertion above, we add an SSE error event and append it to the array of SSE events. Now we return the vector of SSE events and flatten the array out into individual events for the SSE stream. Replace // return vector of SSE events as individual events with:

                        Some(events) // return a vector of events

                    } // Ok(bytes)
                } // match chunk
            } // async move
        })
        .flat_map(stream::iter); // flatten vector of events into individual stream elements

ndjson_yield_sse

While the perceived functionality is equivalent to that of ndjson_map_sse, this variable is initialized with a function that works in an imperative iterative manner: for each incoming NDJSON element, we accumulate it to full_response and transform it into an SSE event, which we then yield into a separate, new stream. If you’re not used to the functional approach, this solution may be more approachable.

:point_right:WARNING: Should you want to use ndjson_yield_sse and already have ndjson_map_sse implemented in your code, you need to comment out ndjson_map_sse, and vice versa. Otherwise, the differing return types of the two confuses the compiler.

Why not use ndjson_yield_sse?

Fully functional generator, and its ``yield` operator, is still unstable in Rust.

The yield used in this solution is not the unstable yield, instead it is a customized to the async-stream crate and works only within this crate. It cannot be used stand alone to build a general-purpose generator. As an example, when chatterdDB.execute() returns an error, we cannot put yield inside a map_err()’s asynchronous coroutine; you’ll get a compiler error saying yield is not supported in the rust toolchain.

Also, this solution falls into one of the rare three cases where an object must be manually pinned in memory—in this case, for interoperability between rust modules. Pin was originally intended only to be used in implementing low-level libraries.

To use the ndjson_yield_sse version, add the following to your Cargo.toml:

async-stream = "0.3.6"

Add to the use block of handlers.rs:

use async_stream::stream;
use tokio::{ pin, };

add to your serde_json block:

    from_slice, 

and remove stream from your futures block (or leave it as is and ignore the compiler warning that this import is unused).

Then instead of ndjson_map_sse variable above, replace the // process NDJSON elements to produce SSE events comment with:

    let ndjson_yield_sse = stream! {
        let byte_stream =  response.bytes_stream();
        pin!(byte_stream); // Pin the stream so we can loop over it

        while let Some(chunk) = byte_stream.next().await {
            match chunk {
                Err(err) => {
                    yield Ok(Event::default().event("error")
                        .data(json!({ "error": err.to_string() }).to_string(),))
                }
                Ok(bytes) => {
                    let line = String::from_utf8_lossy(&bytes).replace('\n', "");
                    // replace replaces *all* occurences of given characters

                    let ollama_response: OllamaResponse = from_slice(&bytes).unwrap_or_default();

                    if ollama_response.model.is_empty() {
                        // didn't receive an ollamaresponse, likely got an error message
                        yield Ok(Event::default().event("error")
                            .data(line.replace("\\\"", "'")));
                        continue
                    }

                    // SSE conversion and accumulate completion

In the above, we converted each incoming NDJSON line into an OllamaResponse type. If the conversion is unsuccessful and the model property of the type is empty, we return an SSE error event and continue with the while loop, moving on to the next NDJSON line. Otherwise, we append the content in the OllamaResponse to the full_response variable. Then we encode the full NDJSON line into an SSE Message event and yield it as an element of the SSE stream. Replace // SSE conversion and accumulate completion with:

                    full_response.lock().unwrap().push_str(ollama_response.message.content.as_str());

                    yield Ok(Event::default().data(line));

                    // insert full response into database

When we reach the end of the NDJSON stream, we insert the full Ollama response into PostgreSQL database as the assistant’s reply. Replace // insert full response into database with:

                    if ollama_response.done {
                        let assistant_response = full_response.lock().unwrap().clone();
                        let wsRegex = Regex::new(r"[\s]+").unwrap(); // \s means whitespace
                        
                        match pgpool.get().await {
                            Ok(chatterDB) => {
                                if let Err(err) = chatterDB.execute(
                                    "INSERT INTO chatts (username, message, id, appID) \
                                     VALUES ('assistant', $1, gen_random_uuid(), $2)",
                                    // replace 'assistant' with NULL to test error event
                                    &[&wsRegex.replace_all(&*assistant_response, " "), &ollama_request.appID],
                                    // replace all multiple whitespaces with single whitespace
                                ) .await {
                                    yield Ok(Event::default().event("error")
                                        .data(json!({ "error": err.to_string() }).to_string()))
                                }
                                // don't care about Ok(<#of lines modified>)
                            }
                            Err(err) => {
                                yield Ok(Event::default().event("error")
                                    .data(json!({ "error": err.to_string() }).to_string()))
                            }
                        }
                    } // ollama_response done
                } // Ok(bytes)
            } // match chunk
        } // byte_stream.next()
    }; // ndjson_yield_sse

If we encountered any error in the insertion above, we yield an SSE error event.

Finally, replace the ndjson_map_sse variable in the last line of llmchat() with ndjson_yield_sse:

    Ok(Sse::new(ndjson_yield_sse))

We’re done with handlers.rs. Save and exit the file.

main.rs package

Edit src/main.rs:

server$ vi src/main.rs

Find the router = Router::new() instantiation statement and add this route right after the route for /llmprompt:

        .route("/llmchat", post(handlers::llmchat))

We’re done with main.rs. Save and exit the file.

Build and test run

To build your server:

server$ cargo build --release

As before, it will take some time to download and build all the 3rd-party crates. Be patient.

Linking error with cargo build?

When running cargo build --release, if you see:

  error: linking with cc failed: exit status: 1
  note: collect2: fatal error: ld terminated with signal 9 [Killed]

below a long list of object files, try running cargo build --release again. It usually works the second time around, when it will have less remaining linking to do. If the error persisted, please talk to the teaching staff.


:point_right:Rust is a compiled language, like C/C++ and unlike Python, which is an interpreted language. This means you must run cargo build each and every time you made changes to your code, for the changes to show up in your executable.

To run your server:

server$ sudo ./chatterd
# Hit ^C to end the test

The cover backend spec provides instructions on Testing llmChat API and SSE error handling.

References


Prepared by Chenglin Li, Xin Jie ‘Joyce’ Liu, and Sugih Jamin Last updated August 10th, 2025