Data Layer
Last updated: 2026-05-08
Overview
The project uses a single PostgreSQL database to manage all persistent data, but this data is divided into four distinct logical schemas (namespaces):
| Schema | Owner | Purpose |
|---|---|---|
app |
SQLAlchemy / Alembic | Users, sessions, conversations, collectives, feature flags |
langgraph |
LangGraph library | LangGraph checkpoint tables (managed automatically) |
rag |
SQLAlchemy | PGVectorStore tables for document embeddings |
cache |
SQLAlchemy | RAG semantic-cache and embedding-cache tables |
This multi-schema approach keeps each domain cleanly isolated while sharing a single database connection pool.
Application Data Schema (api/database/models/)
The schema for core application data is defined using SQLAlchemy ORM models in api/database/models/. The init_db() function, called on application startup, creates these tables and runs any pending Alembic migrations.
User
Stores registered user profiles.
id(UUID, PK) — generated on registration.user_type— enum:teenagerorparent.language— 2-letter language code.age_class,country,school— optional profile fields.created_at,updated_at,last_seen_at— timestamps.- Relationships:
app_sessions(1→N, eager-loaded),conversations(1→N, eager-loaded),idp_logins(1→N, eager-loaded),memberships(1→N, eager-loaded),consent(1→1, lazy-loaded),parental_consent(1→1, lazy-loaded).
IDPLogin
Maps an external Identity Provider identity to a User. Composite primary key (idp, login_id).
idp— enum:mock-idp(supported viaIDProviderenum).login_id— opaque external user ID (e.g., thesubclaim from a JWT).user_id(FK →User.id).created_at— registration timestamp.
AppSession
Represents a single visit / authentication event (Bearer Token session).
id(UUID, PK).user_id(FK →User.id).origin— client identifier (e.g.,web_react,discord).scope— enum (SessionScope):ONBOARDING,PENDING_CONSENT, orFULL. Reflects the user’s progress through the consent pipeline.capabilities(JSON) — negotiated client capabilities for this session.created_at,last_active_at,expires_at— session lifecycle management.- Relationships:
user(N→1, joined-loaded).
Conversation
Represents one independent conversation thread belonging to a User.
id(UUID, PK).user_id(FK →User.id).title— optional thread title.conversation_type— enum:incidentorgeneral.summary(Text) — rolling summary persisted by the agent.current_monster— tracks the active monster persona (e.g., “betty”).updated_at— indexed for efficient recency queries.- Relationships:
user(N→1, joined-loaded),chat_sessions(1→N, eager-loaded).
ChatSession
A specific interaction instance within a Conversation. Its id is used as the LangGraph thread_id.
id(UUID, PK).conversation_id(FK →Conversation.id).created_at— session start timestamp.- Relationships:
conversation(N→1, joined-loaded).
Collective
Represents a group of users, such as a family or a school class.
id(UUID, PK).name— display name.type— type of collective (e.g.,family,class).parent_id— optional FK to a parentCollective(supports hierarchy).parental_consent_granted— a flag indicating the parent has explicitly granted consent for all minors in this collective.- Relationships:
collective_members(1→N, eager-loaded),invitations(1→N, eager-loaded).
CollectiveMembership
Joins a User to a Collective with a specific role.
id(UUID, PK).user_id(FK →User.id),collective_id(FK →Collective.id).role— enum:owner,admin,member.
FeatureFlag
Stores remotely-toggleable feature flags consumed by both backend and frontend.
id(UUID, PK),name(unique),description.is_enabled(Boolean).environment— enum:development,production,staging,all.visibility— enum:frontend,backend,all.variant— enum:teenager,parent,all.targeting_rules(JSON) — optional key/value rules for fine-grained targeting.
See Backend API — Feature Flags for the full list of seeded flags.
UserConsent (models/consent.py)
Tracks TOS / general consent acceptance for every user.
id(UUID, PK).user_id(FK →User.id, CASCADE).consent_ip(String, nullable) — IP address from which consent was given.created_at— consent timestamp.- Relationships:
user(N→1).
ParentalConsent (models/consent.py)
Stores parental authorization linking a parent to a teenager. Composite primary key (parent_id, teenager_id).
parent_id(FK →User.id, CASCADE).teenager_id(FK →User.id, CASCADE).consent_ip(String, nullable).created_at— consent timestamp.
Invitation (models/invitation.py)
A shareable token used to invite users (typically teenagers) into a Collective. Invitations carry settings from the parent who creates them.
id(UUID, PK).collective_id(FK →Collective.id, CASCADE).inviter_id(FK →User.id, CASCADE) — the parent/owner who created the invitation.age_class— optional target age group (child,preteen,teenager,adult).language— UI language for the invited user.label— optional display name.usage_limit— max uses (null = unlimited). Age-class-restricted invites default to 1.current_uses— number of times used.status— enum:PENDING,ACTIVE,ACCEPTED,EXPIRED,REVOKED.created_at,expires_at— lifecycle timestamps.accepted_at,accepted_by(FK →User.id) — acceptance metadata.- Relationships:
collective(N→1),inviter(N→1),accepted_user(N→1).
LocalizedContent
Stores dynamic i18n content that overrides filesystem YAML files.
category(Enum, PK) —prompts,prompt_snippets,ui_messages,context_questions, ortools.language(String, PK) — 2-letter language code.variant(String, PK) —teenager,parent, orall.key(String, PK) — unique identifier within the category.value(JSON) — the actual content (string or dictionary).updated_at— last update timestamp.
Application Data Service (services/app_data/)
All database interactions go through the AppDataService, which is built on a repository pattern:
AppDataService
├── UserRepository
├── IDPLoginRepository
├── AppSessionRepository
├── ConversationRepository
├── ChatSessionRepository
├── CollectiveRepository
├── CollectiveMembershipRepository
├── InvitationRepository
├── UserConsentRepository
└── ParentalConsentRepository
UserAuthService also manages:
UserAuthService
├── UserRepository
├── IDPLoginRepository
├── AppSessionRepository
├── UserConsentRepository
├── ParentalConsentRepository
├── CollectiveRepository
├── CollectiveMembershipRepository
├── InvitationRepository
└── ParentalAuthorizationRequestRepository
- Each repository exposes typed
get_by_primary_key,create,update, anddeletemethods viaBaseRepository. - Both services use the
db_session_routerpattern — methods accept an optionaldb_sessionparameter and create their own if none is provided, enabling transactional reuse. FeatureFlagRepositoryis managed separately viaFeatureFlagService.LocalizedContentRepositoryis managed separately viaLocalizedContentService.BaseDataServiceprovides shared helper methods (get_repo_record_by_primary_key, update helpers) to eliminate boilerplate.- Composite primary keys (e.g.,
IDPLogin,ParentalConsent,CollectiveMembership) are fully supported. - Relationships are eagerly loaded using
selectinloading to avoid N+1 queries in async SQLAlchemy.
Database Connection (api/database/database.py & api/database/schemas.py)
The connection module exposes:
engine— synchronous SQLAlchemy engine (used by Alembic and legacy sync code).async_engine— async SQLAlchemy engine for ORM operations.get_async_pool()— returns the sharedpsycopg_pool.AsyncConnectionPool(namedgeneral_pool) used byPGVectorStore,PostgresRAGCache, and raw async queries.get_db()— async generator that yields anAsyncSession, ensuring it is closed after each request.schema_context(pool, schema)— async context manager that setssearch_pathon a raw connection for schema-specific operations (defined inapi/database/schemas.py).Schemaenum —APP,LANGGRAPH,RAG,CACHE,PUBLIC— used as the canonical source of schema names throughout the codebase.
Connection pools have separate named instances:
| Pool name | Used by |
|---|---|
general_pool |
Application ORM queries, RAG service, and RAG semantic cache |
langgraph_pool |
LangGraph checkpoint reads/writes |
LangGraph Persistent State
Conversation state (message history, action, context) is managed by LangGraph’s MemorySaver (in-memory, per-server). This is the live working memory for active conversations.
Long-term memory is achieved through a two-layer stack:
PostgresStore(LangGraph built-in) — a key-value store in thelanggraphschema. TheMemoryServicewraps this to read and write conversation summaries.MemoryLake(agents/service1/memory/lake.py) — a non-blocking async queue that sits in front ofMemoryService. The agent drops aQueuedSummaryinto the lake and returns immediately; a pool of background workers flushes it to the database. This completely decouples the agent’s hot path from database latency.
RAG Vector Store (services/rag/)
PGVectorStore (langchain-postgres)
The RAG knowledge base has migrated from langchain_postgres.PGVector to langchain_postgres.PGVectorStore. Each user type has its own collection:
docs_youth— educational content for teenagers.docs_adult— parenting guides for the adult / parent variant.
Tables live in the dedicated rag schema. PGEngine instances are created lazily per collection and cached in-process.
Multi-Level Semantic Caching
To reduce embedding costs and LLM latency a two-level cache sits in front of the vector store:
Level 1 — CacheBackedEmbeddings (services/rag/cached_embeddings.py):
Wraps the Gemini embedding model with a LangChain CacheBackedEmbeddings store backed by the cache schema. Identical query strings reuse stored embeddings without hitting the embedding API.
Level 2 — PostgresRAGCache (services/rag/postgres_rag_cache.py):
A custom semantic cache. When a new query arrives, its embedding is compared to previously cached query embeddings using cosine similarity in the cache.rag_cache PostgreSQL table. The cache entry ID is a hash of the query and user type: md5(query || '|' || user_type). If a sufficiently similar query is found (below a distance threshold), the previously retrieved document IDs are returned directly, skipping the vector store search entirely.
Cache updates are performed asynchronously in the background: the RAG service returns results to the agent immediately and schedules the cache write as a non-blocking task, keeping latency low.
Analytics Tables
To allow post-hoc analysis of chatbot interactions the schema also includes analytics tables managed by AnalyticsService (via FeedbackService):
ConversationAnalytics— high-level extracted data: bullying message, summary, mood, strategy, chatbot version, language.- Categorization tables (many-to-one with
ConversationAnalytics):ConversationMessagesCategorized,ConversationMoodsCategorized,ConversationStrategiesCategorized,ConversationAnswerTypesCategorized.
Feedback
Stores user feedback (rating, comment, is_positive) linked by session_id UUID. There is intentionally no hard FK constraint to the LangGraph thread_id, keeping the application and LangGraph layers independent.
Database Migrations (Alembic)
Alembic is configured in api/alembic.ini and api/migrations/. Migrations are schema-aware and support pgvector extension creation. Run migrations with:
cd api
alembic upgrade headContainer startup auto-migration
On deployment, the application container automatically runs alembic upgrade head before starting the API server. This is configured in the Dockerfile’s entrypoint and ensures the database schema is always up-to-date with the deployed code. See Deployment for details.
Schema search path
The database connection sets search_path = app, langgraph, rag, cache, public at the connection level. This forces Alembic autogenerate and the ORM to always find tables in their correct schemas, even when operating from a different search path context.
Alembic baseline (zero-diff check)
A baseline migration (6c64fe7fdf14) was created as a reference point. The zero_diff_check.py revision ensures that after applying all migrations, Alembic’s autogenerate detects no differences between the ORM models and the database state — confirming the schema is fully aligned.
Multi-schema workaround: Enum inherit_schema patch
SQLAlchemy’s Enum column type creates the underlying PostgreSQL ENUM type in the public schema by default, ignoring the MetaData(schema=…) setting on Base. This causes type "xyz" does not exist errors at runtime and in Alembic autogenerate whenever the database search_path does not include public.
The fix — applied at the very top of both api/run.py and api/migrations/env.py, before any ORM imports — monkeypatches Enum.__init__ to default inherit_schema=True:
import sqlalchemy.sql.sqltypes
def _patch_sqlalchemy_enum():
_orig_enum_init = sqlalchemy.sql.sqltypes.Enum.__init__
def _patched_enum_init(self, *args, **kwargs):
kwargs.setdefault("inherit_schema", True) # co-locate ENUM with the referencing table's schema
_orig_enum_init(self, *args, **kwargs)
sqlalchemy.sql.sqltypes.Enum.__init__ = _patched_enum_init
_patch_sqlalchemy_enum()With inherit_schema=True, PostgreSQL creates one copy of the ENUM type per schema that references it. For example, if a type usertype is used by tables in both app and langgraph, PostgreSQL will create app.usertype and langgraph.usertype as independent types. Currently all application enums are only referenced by tables in the app schema, so in practice only app.<typename> is created — but the behaviour generalises correctly if an enum is ever shared across schemas.
This must be applied before the ORM metadata is first constructed (i.e., before from database import …). It is duplicated in both entry points because run.py and migrations/env.py are independent Python processes. A future clean-up would centralise it in api/core/patches.py and import from there.
Note for contributors: when adding a new
Enumcolumn to any model, no additional action is needed — the patch ensures the type is automatically created in the schema of every table that references it. When writing a migration manually (e.g., to rename or drop an enum value), include theschema=argument matching the table’s schema in every DDL operation (e.g.,schema='app'for application models).