Python with Starlette

Cover Page

Back-end Page

Add dependencies

Change to your chatterd folder and add:

server$ cd ~/reactive/chatterd
server$ uv add dataclasses_json sse_starlette

handlers

Edit handlers.py:

server$ vi handlers.py

First add the following imports at the top of the file:

from dataclasses_json import dataclass_json
from http import HTTPStatus
import json
import re
from sse_starlette.sse import EventSourceResponse

replace your from typing line with:

from typing import List, Optional    

Next define these three classes to help llmchat() deserialize JSON received from clients. Add these lines right below the import block:

@dataclass_json  # must come before @dataclass
@dataclass
class OllamaMessage:
    role: str
    content: str

@dataclass_json
@dataclass
class OllamaRequest:
    appID: str
    model: str
    messages: List[OllamaMessage]
    stream: bool

@dataclass_json
@dataclass
class OllamaResponse:
    model: str
    message: OllamaMessage
    

To store the client’s conversation context/history with Ollama in the PostgreSQL database, llmchat() first confirms that the client has sent an appID that can be used to tag its entries in the database. Here’s the signature of llmchat(). The from_json deserialization will throw an exception if appID is absent or null:

async def llmchat(request):
    # 1tab
    try:
        ollamaRequest = OllamaRequest.from_json(await request.body(), infer_missing=True)
    except Exception as err:
        return JSONResponse({"error": f'Deserializing request: {type(err).__name__}: {str(err)}'}, status_code=HTTPStatus.UNPROCESSABLE_ENTITY)

        # insert into DB

Once we confirm that the client has an appID, we obtain a PostgreSQL connection pool and use the connection to insert its current prompt into the database, adding to its conversation history with Ollama. Replace the comment # insert into DB with the following code:

    #1 tab
    async with main.server.pool.connection() as conn:
        async with conn.cursor() as cur:
            #3 tabs
            # insert each message into the database
            try:
                for msg in ollamaRequest.messages:
                    await cur.execute(
                        'INSERT INTO chatts (name, message, id, appid) VALUES (%s, %s, gen_random_uuid(), %s);',
                        (msg.role, msg.content, ollamaRequest.appID,)  # preserve prompt formatting
                    )
            except Exception as err:
                return JSONResponse({"error": f'Inserting tools: {type(err).__name__}: {str(err)}'},
                                    status_code=HTTPStatus.INTERNAL_SERVER_ERROR)

            # retrieve history

Then we retrieve the client’s conversation history chronologically by timestamp, including the recently inserted, current prompt, as the last entry, and put them in a JSON format expected by Ollama’s chat API. Replace # retrieve history with:

            #3 tabs
            # reconstruct ollamaRequest to be sent to Ollama:
            # - add context: retrieve all past messages by appID,
            #   incl. the one just received,
            # - convert each one back to OllamaMessage, and
            # - insert it into ollamaRequest
            ollamaRequest.messages = []
            try:
                await cur.execute("SELECT name, message FROM chatts WHERE appid = %s ORDER BY time ASC;",
                                  (ollamaRequest.appID,))
                rows = await cur.fetchall()
                ollamaRequest.messages = [OllamaMessage(role=row[0], content=row[1]) for row in rows]
            except Exception as err:
                return JSONResponse({"error": f'{type(err).__name__}: {str(err)}'},
                                    status_code=HTTPStatus.INTERNAL_SERVER_ERROR)

    # create a stream driven by Ollama prompt completion

We next create a stream ndjson_yield_sse that will be fed by Ollama prompt completion. We cannot reuse the PostgreSQL connection from llmchat() in the stream. Instead, we must obtain a separate one to be used in the stream. Once we’ve obtained a PostgreSQL connection, we can start the stream by first creating a HTTP request packet with ollamaRequest above as its payload and send it to Ollama. Then we declare an accumulator variable, tokens, to accumulate and process the reply tokens Ollama streams back. With the stream defined, we instantiate Starlette’s EventSourceResponse streaming object around ndjson_yield_sse. Replace # create a stream driven by Ollama prompt completion with:

    #1 tab
    async def ndjson_yield_sse():
        async with main.server.pool.connection() as conn:
            async with conn.cursor() as cur:
                #4 tabs
                try:
                    async with asyncClient.stream(
                            method=request.method,
                            url=f"{OLLAMA_BASE_URL}/chat",
                            content=ollamaRequest.to_json().encode("utf-8"),
                    ) as response:
                        #6 tabs
                        tokens = []
                        
                        # accumulate tokens and yield data lines
                            
                        # insert full response into database
                  
                #4 tabs
                except Exception as err:
                    yield {
                        "event": "error",
                        "data": f'{{"error": {json.dumps(str(err))}}}'
                    }
    #1 tab
    return EventSourceResponse(ndjson_yield_sse())

For each incoming NDJSON line, we convert it into an OllamaResponse type. If the conversion is unsuccessful, we return an SSE error event and move on to the next NDJSON line. Otherwise, we append the content in the OllamaResponse to the completion variable, after removing duplicated whitespaces, and yield the line as an SSE Message data line. Replace # accumulate tokens and yield data lines with:

                        #6 tabs
                        async for line in response.aiter_lines():
                            try:
                                # deserialize each line into OllamaResponse
                                ollamaResponse = OllamaResponse.from_json(line)

                                # append response token to full assistant message
                                # replace all multiple whitespaces with single whitespace
                                tokens.append(re.sub(r"\s+", " ", ollamaResponse.message.content))

                                # send NDJSON line as SSE data line
                                yield {
                                    "data": line
                                }
                            except Exception as err:
                                yield {
                                    "event": "error",
                                    "data": f'{{"error": {json.dumps(str(err))}}}'
                                }

When we reach the end of the NDJSON stream, we insert the full Ollama response into PostgreSQL database as the assistant’s reply. It will later be sent back to Ollama as part of subsequent prompts’ context. Replace # insert full response into database with:

                        #6 tabs
                        if tokens:
                            completion = " ".join(tokens)
                            # save full response to db, to form part of next prompt's history
                            await cur.execute(
                                'INSERT INTO chatts (name, message, id, appid) VALUES (%s, %s, gen_random_uuid(), %s);',
                                ("assistant", completion, ollamaRequest.appID,)
                                # replace 'assistant' with None to test SSE error event
                            )

We’re done with handlers.py. Save and exit the file.

main.py

Edit main.py`:

server$ vi main.py

Find the routes array and add this route right after the route for /llmprompt:

    Route('/llmchat/', handlers.llmchat, methods=['POST']),

We’re done with main.py. Save and exit the file.

Test run

To test run your server, launch it from the command line:

server$ sudo su
# You are now root, note the command-line prompt changed from '$' or '%' to '#'.
# You can do a lot of harm with all of root's privileges, so be very careful what you do.
server# source .venv/bin/activate
(chattterd) ubuntu@server:/home/ubuntu/reactive/chatterd# granian --host 0.0.0.0 --port 443 --interface asgi --ssl-certificate /home/ubuntu/reactive/chatterd.crt --ssl-keyfile /home/ubuntu/reactive/chatterd.key --access-log --workers-kill-timeout 1 main:server
# Hit ^C to end the test
(chattterd) ubuntu@server:/home/ubuntu/reactive/chatterd# exit
# So that you're no longer root.
server$

The cover back-end spec provides instructions on Testing llmChat API and SSE error handling.

References


Prepared by Chenglin Li, Xin Jie ‘Joyce’ Liu, and Sugih Jamin Last updated January 18th, 2026