Rust with axum
Cover Page
Back-end Page
Cargo.toml: add dependencies
Change to your chatterd folder and edit the file Cargo.toml
to add the 3rd-party libraries we will be using.
server$ cd ~/reactive/chatterd
server$ vi Cargo.toml
In Cargo.toml, add the following lines below the existing [dependencies] tag:
async-stream = "0.3.6"
futures = "0.3.31"
regex = "1.12.2"
and add “stream” feature to the reqwest line:
reqwest = { version = "0.12.24", features = ["json", "stream"] }
handlers
Edit src/handlers.rs:
server$ vi src/handlers.rs
First modify the use imports at the top of the file:
- add this line inside the
use axumblock:response::sse::{Event, Sse}, - add these new
uselines:use async_stream::stream; use futures::{ Stream, TryStreamExt }; use regex::Regex; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio_util::io::StreamReader; - add inside the
use serde_jsonblock:from_str, - then add inside the
use stdblock:convert::Infallible, io::{ Error, ErrorKind },
Next define the following structures to help llmchat() deserialize JSON received from clients.
Add these lines right below the use block:
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct OllamaMessage {
role: String,
content: String,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct OllamaRequest {
appID: String,
model: String,
messages: Vec<OllamaMessage>,
stream: bool,
}
#[derive(Default, Deserialize)]
#[allow(dead_code)]
pub struct OllamaResponse {
model: String,
message: OllamaMessage,
}
To store the client’s conversation context/history with Ollama in the PostgreSQL database,
llmchat() first confirms that the client has sent an appID to be used to tag its entries
in the database. In Rust, the Json deserialization automatically checks for existence of appID
and return an HTTP error if it is absent. Here’s the signature of llmchat():
pub async fn llmchat(
State(appState): State<AppState>,
ConnectInfo(clientIP): ConnectInfo<SocketAddr>,
Json(mut ollamaRequest): Json<OllamaRequest>, // will check for appID
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {
// insert into DB
}
Once we confirm that the client has an appID, we retrieve a connection to PostgreSQL from the
connection pool and use it to insert the arriving prompt into the database, adding to the user’s
conversation history with Ollama. Replace the comment // insert into DB with the following code:
let appState_clone = appState.clone();
let chatterDB = appState_clone.pgpool
.get()
.await
.map_err(|err| logServerErr(&clientIP, err.to_string()))?;
// insert each message into the database
for msg in ollamaRequest.messages {
chatterDB
.execute(
"INSERT INTO chatts (name, message, id, appID) \
VALUES ($1, $2, gen_random_uuid(), $3)",
&[&msg.role, &msg.content, &ollamaRequest.appID,],
)
.await
.map_err(|err| logClientErr(&clientIP, StatusCode::NOT_ACCEPTABLE, err.to_string()))?;
}
// retrieve history
Then we retrieve the client’s conversation history chronologically by timestamp, including the just
inserted, current prompt, and put it in a JSON format expected by Ollama’s chat API. Replace //
retrieve history with:
// reconstruct ollamaRequest to be sent to Ollama:
// - add context: retrieve all past messages by appID,
// incl. the one just received,
// - convert each back to OllamaMessage, and
// - insert it into ollamaRequest
ollamaRequest.messages = chatterDB
.query(
"SELECT name, message FROM chatts WHERE appID = $1 ORDER BY time ASC",
&[&ollamaRequest.appID],
)
.await
.map_err(|err| logServerErr(&clientIP, err.to_string()))?
.into_iter()
.map(|row| OllamaMessage {
role: row.get(0),
content: row.get(1),
})
.collect();
// create a stream driven by Ollama prompt completion
We next create a stream ndjson_yield_sse that will be fed by Ollama prompt completion. We use a
linenl variable to buffer incoming NDJSON line(s). Then we declare an accumulator variable,
completion, to accumulate the reply tokens Ollama streams back. To reduce storage requirement,
we compact multiple consecutive whitespaces into one during the token accumulation process. The
regular expression used for the compaction is stored in the wsRegex variable. Due to Rust’s memory
ownership rules, we cannot reuse the PostgreSQL connection from llmchat() in the stream. Instead,
we must obtain a separate one to be used in the stream. Once we’ve obtained a PostgreSQL connection,
we can initiate and start the stream. With the stream defined, we instantiate axum’s Sse
streaming object around ndjson_yield_sse. Replace // create a stream driven by Ollama prompt
completion with:
let ndjson_yield_sse = stream! {
let mut linenl= String::with_capacity(192);// = String::new();
let mut completion = String::with_capacity(1024);// = String::new();
let wsRegex = Regex::new(r"[\s]+").unwrap();
match appState.pgpool.get().await {
Err(err) => {
yield Ok(Event::default().event("error")
.data(json!({ "error": err.to_string() }).to_string()));
},
Ok(chatterDB) => {
// send request to Ollama and yield reply as SSE stream
} // Ok(chatterDB)
} // match pgppool
}; // ndjson_yield_sse
logOk(&clientIP);
Ok(Sse::new(ndjson_yield_sse))
Replace the comment, // send request to Ollama and yield reply as SSE stream with the following
code:
match appState.client
.post(format!("{OLLAMA_BASE_URL}/chat"))
.json(&ollamaRequest) // convert the request to JSON
.send() // send request to Ollama
.await {
Err(err) => {
yield Ok(Event::default().event("error")
.data(json!({ "error": err.to_string() }).to_string()))
},
Ok(response) => {
let bytes_stream_with_io_error =
response.bytes_stream().map_err(|err| Error::new(ErrorKind::Other, err));
let async_buf = StreamReader::new(bytes_stream_with_io_error);
let mut line_reader = BufReader::new(async_buf);
// accumulate tokens and yield data lines
// insert full response into database
} // Ok(response)
} // match appState.send().await
If Ollama returns an error, we yield an SSE error. Otherwise, we read each line and convert it into
an OllamaResponse type. After removing duplicated whitespaces, we accumulate the tokens in each
line into completion and yield the line as an SSE Message data line. Replace // accumulate
tokens and yield data lines with:
while line_reader.read_line(&mut linenl).await.unwrap_or_default() != 0 { // turn error into EOF (0)
let line = linenl.replace('\n', "");
linenl.clear(); // Clear the buffer for the next line
// deserialize each line into OllamaResponse
let ollamaResponse: OllamaResponse = from_str(&line).unwrap_or_default();
// append response token to full assistant message
completion.push_str(&wsRegex.replace_all(&ollamaResponse.message.content, " "));
// send NDJSON line as SSE data line
yield Ok(Event::default().data(&line));
} // while next line, accumulate tokens into completion
When we reach the end of the NDJSON stream, we insert the full Ollama response into PostgreSQL
database as the assistant’s reply. It will later be sent back to Ollama as part of subsequent
prompts’ context. Replace // insert full response into database with:
if !completion.is_empty() {
// save full response to db, to form part of next prompt's history
if let Err(err) = chatterDB.execute(
"INSERT INTO chatts (name, message, id, appid) \
VALUES ('assistant', $1, gen_random_uuid(), $2)",
&[&completion, &ollamaRequest.appID],
// replace 'assistant' with NULL to test error event
) .await {
yield Ok(Event::default().event("error")
.data(json!({ "error": err.to_string() }).to_string()))
}
} // full response
Generator in Rust
Fully functional generator, and its ``yield` operator, is still unstable in Rust.
The yield used in this solution is not the unstable yield, instead it is customized to the
async-stream crate and works only within this crate. It cannot be used stand alone to build a
general-purpose generator. For example, when chatterdDB.execute() returns an error, we cannot put
yield inside a map_err()’s asynchronous coroutine; you’ll get a compiler error saying yield is
not supported in the rust toolchain.
We’re done with handlers.rs. Save and exit the file.
main.rs package
Edit src/main.rs:
server$ vi src/main.rs
Find the router = Router::new() instantiation statement and add this route right
after the route for /llmprompt:
.route("/llmchat", post(handlers::llmchat))
We’re done with main.rs. Save and exit the file.
Build and test run
To build your server:
server$ cargo build --release
As before, it will take some time to download and build all the 3rd-party crates. Be patient.
Linking error with cargo build?
When running cargo build --release, if you see:
error: linking with cc failed: exit status: 1
note: collect2: fatal error: ld terminated with signal 9 [Killed]
below a long list of object files, try running cargo build --release again. It usually works the second time around, when it will have less remaining linking to do. If the error persisted, please talk to the teaching staff.
![]()
Rust is a compiled language, like C/C++ and unlike Python, which is an interpreted language. This means you must run cargo build each and every time you made changes to your code, for the changes to show up in your executable.
To run your server:
server$ sudo ./chatterd
# Hit ^C to end the test
The cover back-end spec provides instructions on Testing llmChat API and SSE error handling.
References
- Streaming in Rust
- async-stream -try_stream! example
- axum sse example
- axum stream reqwest response example
- Using Regex with Rust language
| Prepared by Chenglin Li, Xin Jie ‘Joyce’ Liu, and Sugih Jamin | Last updated January 17th, 2026 |