Cover Page
Backend Page
Rust with axum
Cargo.toml: add dependencies
Change to your chatterd folder and edit the file Cargo.toml
to add the 3rd-party libraries we will be using.
server$ cd ~/reactive/chatterd
server$ vi Cargo.toml
In Cargo.toml, add the following lines below the existing [dependencies] tag:
futures = "0.3.31"
regex = "1.11.1"
and replace the reqwest line with:
reqwest = { version = "0.12.20", features = ["json", "stream"] }
handlers
Edit src/handlers.rs:
server$ vi src/handlers.rs
First modify the use imports at the top of the file:
- add this line inside the
use axumblock:response::sse::{Event, Sse}, - add this line inside the
use stdblock:convert::Infallible, sync::{Arc, Mutex}, - then add these new
uselines:use futures::{stream, Stream, StreamExt}; use regex::Regex;
Next define these three structs to help llmchat() deserialize
JSON received from clients. Add these lines right below the use
block:
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct OllamaMessage {
role: String,
content: String,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct OllamaRequest {
appID: String,
model: String,
messages: Vec<OllamaMessage>,
stream: bool,
}
#[derive(Default, Deserialize)]
#[allow(dead_code)]
pub struct OllamaResponse {
model: String,
created_at: String,
message: OllamaMessage,
done: bool,
}
To store the client’s conversation context/history with Ollama in the PostgreSQL
database, llmchat() first confirms that the client has sent an appID that can
be used to tag its entries in the database. Here’s the signature of llmchat().
The Json deserialization will check for existence of appID and return an HTTP
error if it is absent:
pub async fn llmchat(
State(appState): State<AppState>,
ConnectInfo(clientIP): ConnectInfo<SocketAddr>,
Json(mut ollama_request): Json<OllamaRequest>, // will check for appID
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {
// insert into DB
}
Once we confirm that the client has an appID, we insert its current prompt
into the database, adding to its conversation history with Ollama. Replace the
comment // insert into DB with the following code:
let chatterDB = appState
.pgpool
.get()
.await
.map_err(|err| logServerErr(clientIP, err.to_string()))?;
let messages = ollama_request.messages;
for msg in messages {
chatterDB
.execute(
"INSERT INTO chatts (username, message, id, appID) VALUES ($1, $2, gen_random_uuid(), $3)",
&[&msg.role, &msg.content, &ollama_request.appID], // preserve prompt formatting
)
.await
.map_err(|err| logClientErr(clientIP, StatusCode::NOT_ACCEPTABLE, err.to_string()))?;
}
// retrieve history
Then we retrieve the client’s conversation history, including the recently inserted,
current prompt, as the last entry, and put them in a JSON format expected
by Ollama’s chat API. Replace // retrieve history with:
ollama_request.messages = chatterDB
.query(
"SELECT username, message FROM chatts WHERE appID = $1 ORDER BY time ASC",
&[&ollama_request.appID],
)
.await
.map_err(|err| logServerErr(clientIP, err.to_string()))?
.into_iter()
.map(|row| OllamaMessage {
role: row.get(0),
content: row.get(1),
})
.collect();
// send request to Ollama
We send the request so constructed to Ollama, declare an accumulator variable,
full_response, to assemble the reply tokens Ollama streams back, and make a
clone of the Postgres pool, pgpool, to process the replies (the clone is needed
to satisfy Rust’s memory borrow checker). As we saw in the first tutorial,
llmPrompt, Ollama streams the replies as a NDJSON stream. We transform
this NDJSON stream into a stream of SSE events (more details later),
which we pass to the creation of an axum’s Sse response instance, to be
returned to the client. Replace // send request to Ollama with:
let response = appState
.client
.post(format!("{OLLAMA_BASE_URL}/chat"))
.json(&ollama_request)
.send()
.await
.map_err(|err| logServerErr(clientIP, err.to_string()))?;
let full_response = Arc::new(Mutex::new(String::new()));
let pgpool = appState.pgpool.clone();
// process NDJSON elements to produce SSE events
logOk(clientIP);
Ok(Sse::new(ndjson_map_sse))
We explore two ways to transform NDJSON stream to SSE events. You only need one of
the two. In the above, we assume ndjson_map_sse is used over ndjson_yield_sse.
ndjson_map_sse
Compared to ndjson_yield_sse below, this variable is initialized in a functional
manner: for each incoming NDJSON element, we accumulate it to full_response
and pass it on after mapping it into an SSE event. Once the NDJSON stream is
done, we insert the full_response into the PostgreSQL to form part of the
client’s context. Replace // process NDJSON elements to produce SSE events with:
let ndjson_map_sse = response.bytes_stream()
.filter_map(move |chunk| {
let full_response = full_response.clone();
let pgpool = pgpool.clone();
let appID = ollama_request.appID.clone();
async move {
match chunk {
Err(err) => Some(vec![Ok(Event::default()
.event("error")
.data(json!({ "error": err.to_string() }).to_string()))]),
Ok(bytes) => {
let line = String::from_utf8_lossy(&bytes).replace('\n', "");
let ollama_response: OllamaResponse =
from_slice(&bytes).unwrap_or_default();
if ollama_response.model.is_empty() {
return Some(vec![Ok(Event::default()
.event("error")
.data(line.replace("\\\"", "'")))]);
}
// SSE conversion and accumulate completion
In the above, we converted each incoming NDJSON line into an OllamaResponse type.
If the conversion is unsuccessful and the model property of the type is empty,
we return an SSE error event and move on to the next NDJSON line. Otherwise,
we append the content in the OllamaResponse to the full_response variable. Then we encode the full NDJSON line
into an SSE Message event and put it in an array of SSE events, as the array’s
first element. Replace // SSE conversion and accumulate completion with:
full_response.lock().unwrap()
.push_str(&ollama_response.message.content);
// initialize vector of events to be returned with data line
let mut events = vec![Ok(Event::default().data(line))];
// insert full response into database
When we reach the end of the NDJSON stream, we insert the full Ollama response into PostgreSQL
database as the assistant’s reply. Replace // insert full response into database with:
if ollama_response.done {
let assistant_response = full_response.lock().unwrap().clone();
let wsRegex = Regex::new(r"[\s]+").unwrap();
match pgpool.get().await {
Ok(chatterDB) => {
if let Err(err) = chatterDB
.execute(
"INSERT INTO chatts (username, message, id, appID) \
VALUES ('assistant', $1, gen_random_uuid(), $2)",
// replace 'assistant' with NULL to test error event
&[&wsRegex.replace_all(&*assistant_response, " "), &appID],
)
.await
{ // report error as an SSE error event
events.push(Ok(Event::default()
.event("error")
.data(json!({ "error": err.to_string() }).to_string())));
}
// don't care about Ok(<#of lines modified>)
}
Err(err) => { // report error as an SSE error event
events.push(Ok(Event::default()
.event("error")
.data(json!({ "error": err.to_string() }).to_string())));
}
}
} // ollama_response.done
// return vector of SSE events as individual events
If we encountered any error in the insertion above, we add an SSE error event and append it to
the array of SSE events. Now we return the vector of SSE events and flatten the array out into
individual events for the SSE stream. Replace // return vector of SSE events as individual events
with:
Some(events) // return a vector of events
} // Ok(bytes)
} // match chunk
} // async move
})
.flat_map(stream::iter); // flatten vector of events into individual stream elements
ndjson_yield_sse
While the perceived functionality is equivalent to that of ndjson_map_sse, this variable is initialized
with a function that works in an imperative iterative manner: for each incoming NDJSON element, we accumulate it to full_response and
transform it into an SSE event, which we then yield into a separate, new
stream. If you’re not used to the functional approach, this solution may be more approachable.
WARNING: Should you want to use ndjson_yield_sse and already have ndjson_map_sse
implemented in your code, you need to comment out ndjson_map_sse, and vice versa. Otherwise, the
differing return types of the two confuses the compiler.
Why not use ndjson_yield_sse?
Fully functional generator, and its ``yield` operator, is still unstable in Rust.
The yield used in this solution is not the unstable yield, instead it is a customized to the async-stream crate and works only within
this crate. It cannot be used stand alone to build a general-purpose
generator. As an example, when chatterdDB.execute() returns an error, we cannot put yield
inside a map_err()’s asynchronous coroutine; you’ll get a compiler error saying
yield is not supported in the rust toolchain.
Also, this solution falls into one of the rare three cases
where an object must be manually pinned in memory—in this case, for interoperability
between rust modules. Pin was originally intended only to be used in implementing low-level libraries.
To use the ndjson_yield_sse version, add the following to your Cargo.toml:
async-stream = "0.3.6"
Add to the use block of handlers.rs:
use async_stream::stream;
use tokio::{ pin, };
add to your serde_json block:
from_slice,
and remove stream from your futures block (or leave it
as is and ignore the compiler warning that this import is unused).
Then instead of ndjson_map_sse variable above, replace the
// process NDJSON elements to produce SSE events comment with:
let ndjson_yield_sse = stream! {
let byte_stream = response.bytes_stream();
pin!(byte_stream); // Pin the stream so we can loop over it
while let Some(chunk) = byte_stream.next().await {
match chunk {
Err(err) => {
yield Ok(Event::default().event("error")
.data(json!({ "error": err.to_string() }).to_string(),))
}
Ok(bytes) => {
let line = String::from_utf8_lossy(&bytes).replace('\n', "");
// replace replaces *all* occurences of given characters
let ollama_response: OllamaResponse = from_slice(&bytes).unwrap_or_default();
if ollama_response.model.is_empty() {
// didn't receive an ollamaresponse, likely got an error message
yield Ok(Event::default().event("error")
.data(line.replace("\\\"", "'")));
continue
}
// SSE conversion and accumulate completion
In the above, we converted each incoming NDJSON line into an OllamaResponse type.
If the conversion is unsuccessful and the model property of the type is empty,
we return an SSE error event and continue with the while loop, moving on to the next NDJSON line.
Otherwise, we append the content in the OllamaResponse to the full_response variable. Then we encode the full NDJSON line
into an SSE Message event and yield it as an element of the SSE stream.
Replace // SSE conversion and accumulate completion with:
full_response.lock().unwrap().push_str(ollama_response.message.content.as_str());
yield Ok(Event::default().data(line));
// insert full response into database
When we reach the end of the NDJSON stream, we insert the full Ollama response into PostgreSQL
database as the assistant’s reply. Replace // insert full response into database with:
if ollama_response.done {
let assistant_response = full_response.lock().unwrap().clone();
let wsRegex = Regex::new(r"[\s]+").unwrap(); // \s means whitespace
match pgpool.get().await {
Ok(chatterDB) => {
if let Err(err) = chatterDB.execute(
"INSERT INTO chatts (username, message, id, appID) \
VALUES ('assistant', $1, gen_random_uuid(), $2)",
// replace 'assistant' with NULL to test error event
&[&wsRegex.replace_all(&*assistant_response, " "), &ollama_request.appID],
// replace all multiple whitespaces with single whitespace
) .await {
yield Ok(Event::default().event("error")
.data(json!({ "error": err.to_string() }).to_string()))
}
// don't care about Ok(<#of lines modified>)
}
Err(err) => {
yield Ok(Event::default().event("error")
.data(json!({ "error": err.to_string() }).to_string()))
}
}
} // ollama_response done
} // Ok(bytes)
} // match chunk
} // byte_stream.next()
}; // ndjson_yield_sse
If we encountered any error in the insertion above, we yield an SSE error event.
Finally, replace the ndjson_map_sse variable in the last line of llmchat() with ndjson_yield_sse:
Ok(Sse::new(ndjson_yield_sse))
We’re done with handlers.rs. Save and exit the file.
main.rs package
Edit src/main.rs:
server$ vi src/main.rs
Find the router = Router::new() instantiation statement and add this route right
after the route for /llmprompt:
.route("/llmchat", post(handlers::llmchat))
We’re done with main.rs. Save and exit the file.
Build and test run
To build your server:
server$ cargo build --release
As before, it will take some time to download and build all the 3rd-party crates. Be patient.
Linking error with cargo build?
When running cargo build --release, if you see:
error: linking with cc failed: exit status: 1
note: collect2: fatal error: ld terminated with signal 9 [Killed]
below a long list of object files, try running cargo build --release again. It usually works the second time around, when it will have less remaining linking to do. If the error persisted, please talk to the teaching staff.
![]()
Rust is a compiled language, like C/C++ and unlike Python, which is an interpreted language. This means you must run cargo build each and every time you made changes to your code, for the changes to show up in your executable.
To run your server:
server$ sudo ./chatterd
# Hit ^C to end the test
The cover backend spec provides instructions on Testing llmChat API and SSE error handling.
References
- Streaming in Rust
- async-stream -try_stream! example
- axum sse example
- axum stream reqwest response example
- Using Regex with Rust language
| Prepared by Chenglin Li, Xin Jie ‘Joyce’ Liu, and Sugih Jamin | Last updated August 10th, 2025 |