Change to your chatterd folder and add:
server$ cd ~/reactive/chatterd
server$ uv add dataclasses_json sse_starlette
Edit handlers.py:
server$ vi handlers.py
First add the following imports at the top of the file:
from dataclasses_json import dataclass_json
from http import HTTPStatus
import json
import re
from sse_starlette.sse import EventSourceResponse
replace your from typing line with:
from typing import List, Optional
Next define these three classes to help llmchat() deserialize
JSON received from clients. Add these lines right below the import
block:
@dataclass_json # must come before @dataclass
@dataclass
class OllamaMessage:
role: str
content: str
@dataclass_json
@dataclass
class OllamaRequest:
appID: str
model: str
messages: List[OllamaMessage]
stream: bool
@dataclass_json
@dataclass
class OllamaResponse:
model: str
message: OllamaMessage
To store the client’s conversation context/history with Ollama in the PostgreSQL
database, llmchat() first confirms that the client has sent an appID that can
be used to tag its entries in the database. Here’s the signature of llmchat().
The from_json deserialization will throw an exception if appID is absent or null:
async def llmchat(request):
# 1tab
try:
ollamaRequest = OllamaRequest.from_json(await request.body(), infer_missing=True)
except Exception as err:
return JSONResponse({"error": f'Deserializing request: {type(err).__name__}: {str(err)}'}, status_code=HTTPStatus.UNPROCESSABLE_ENTITY)
# insert into DB
Once we confirm that the client has an appID, we obtain a PostgreSQL connection pool and
use the connection to insert its current prompt into the database, adding to its conversation
history with Ollama. Replace the comment # insert into DB with the following code:
#1 tab
async with main.server.pool.connection() as conn:
async with conn.cursor() as cur:
#3 tabs
# insert each message into the database
try:
for msg in ollamaRequest.messages:
await cur.execute(
'INSERT INTO chatts (name, message, id, appid) VALUES (%s, %s, gen_random_uuid(), %s);',
(msg.role, msg.content, ollamaRequest.appID,) # preserve prompt formatting
)
except Exception as err:
return JSONResponse({"error": f'Inserting tools: {type(err).__name__}: {str(err)}'},
status_code=HTTPStatus.INTERNAL_SERVER_ERROR)
# retrieve history
Then we retrieve the client’s conversation history chronologically by timestamp, including the recently inserted, current prompt, as the last entry, and put them in a JSON format expected by Ollama’s chat API. Replace # retrieve history with:
#3 tabs
# reconstruct ollamaRequest to be sent to Ollama:
# - add context: retrieve all past messages by appID,
# incl. the one just received,
# - convert each one back to OllamaMessage, and
# - insert it into ollamaRequest
ollamaRequest.messages = []
try:
await cur.execute("SELECT name, message FROM chatts WHERE appid = %s ORDER BY time ASC;",
(ollamaRequest.appID,))
rows = await cur.fetchall()
ollamaRequest.messages = [OllamaMessage(role=row[0], content=row[1]) for row in rows]
except Exception as err:
return JSONResponse({"error": f'{type(err).__name__}: {str(err)}'},
status_code=HTTPStatus.INTERNAL_SERVER_ERROR)
# create a stream driven by Ollama prompt completion
We next create a stream ndjson_yield_sse that will be fed by Ollama prompt completion. We cannot
reuse the PostgreSQL connection from llmchat() in the stream. Instead, we must obtain a separate
one to be used in the stream. Once we’ve obtained a PostgreSQL connection, we can start the stream
by first creating a HTTP request packet with ollamaRequest above as its payload and send it to
Ollama. Then we declare an accumulator variable, tokens, to accumulate and process the reply
tokens Ollama streams back. With the stream defined, we instantiate Starlette’s
EventSourceResponse streaming object around ndjson_yield_sse. Replace # create a stream driven
by Ollama prompt completion with:
#1 tab
async def ndjson_yield_sse():
async with main.server.pool.connection() as conn:
async with conn.cursor() as cur:
#4 tabs
try:
async with asyncClient.stream(
method=request.method,
url=f"{OLLAMA_BASE_URL}/chat",
content=ollamaRequest.to_json().encode("utf-8"),
) as response:
#6 tabs
tokens = []
# accumulate tokens and yield data lines
# insert full response into database
#4 tabs
except Exception as err:
yield {
"event": "error",
"data": f'{{"error": {json.dumps(str(err))}}}'
}
#1 tab
return EventSourceResponse(ndjson_yield_sse())
For each incoming NDJSON line, we convert it into an OllamaResponse type. If the conversion is
unsuccessful, we return an SSE error event and move on to the next NDJSON line. Otherwise, we append
the content in the OllamaResponse to the completion variable, after removing duplicated
whitespaces, and yield the line as an SSE Message data line. Replace # accumulate tokens and
yield data lines with:
#6 tabs
async for line in response.aiter_lines():
try:
# deserialize each line into OllamaResponse
ollamaResponse = OllamaResponse.from_json(line)
# append response token to full assistant message
# replace all multiple whitespaces with single whitespace
tokens.append(re.sub(r"\s+", " ", ollamaResponse.message.content))
# send NDJSON line as SSE data line
yield {
"data": line
}
except Exception as err:
yield {
"event": "error",
"data": f'{{"error": {json.dumps(str(err))}}}'
}
When we reach the end of the NDJSON stream, we insert the full Ollama response into PostgreSQL
database as the assistant’s reply. It will later be sent back to Ollama as part of subsequent
prompts’ context. Replace # insert full response into database with:
#6 tabs
if tokens:
completion = " ".join(tokens)
# save full response to db, to form part of next prompt's history
await cur.execute(
'INSERT INTO chatts (name, message, id, appid) VALUES (%s, %s, gen_random_uuid(), %s);',
("assistant", completion, ollamaRequest.appID,)
# replace 'assistant' with None to test SSE error event
)
We’re done with handlers.py. Save and exit the file.
Edit main.py`:
server$ vi main.py
Find the routes array and add this route right
after the route for /llmprompt:
Route('/llmchat/', handlers.llmchat, methods=['POST']),
We’re done with main.py. Save and exit the file.
To test run your server, launch it from the command line:
server$ sudo su
# You are now root, note the command-line prompt changed from '$' or '%' to '#'.
# You can do a lot of harm with all of root's privileges, so be very careful what you do.
server# source .venv/bin/activate
(chattterd) ubuntu@server:/home/ubuntu/reactive/chatterd# granian --host 0.0.0.0 --port 443 --interface asgi --ssl-certificate /home/ubuntu/reactive/chatterd.crt --ssl-keyfile /home/ubuntu/reactive/chatterd.key --access-log --workers-kill-timeout 1 main:server
# Hit ^C to end the test
(chattterd) ubuntu@server:/home/ubuntu/reactive/chatterd# exit
# So that you're no longer root.
server$
The cover back-end spec provides instructions on Testing llmChat API and SSE error handling.
| Prepared by Chenglin Li, Xin Jie ‘Joyce’ Liu, and Sugih Jamin | Last updated January 18th, 2026 |