Comprehensive skill for Graphiti and Zep - temporal knowledge graph framework for AI agents with dynamic context engineering
Expert assistance for building AI agents with temporal knowledge graphs using Graphiti and Zep's context engineering platform for dynamic, personalized agent memory and context assembly.
This skill should be used when:
Graphiti is an open-source framework for constructing and querying time-aware knowledge graphs optimized for AI agents in dynamic environments.
Key Characteristics:
Zep is a production-ready platform built on Graphiti that provides:
Core Value Proposition:
"Assembles personalized context—including user preferences, traits, and business data—for reliable agent applications"
Graphiti creates time-stamped relationship networks that:
Bi-Temporal Model:
Entities: Objects or concepts in your knowledge graph
Relations: Connections between entities
Facts: Extracted information from conversations and data
Summaries: Condensed representations of interactions
Graphiti uses three complementary search methods:
Semantic Search:
Keyword Search (BM25):
Graph Traversal:
# Install from PyPI
pip install graphiti-core
# Or from source
git clone https://github.com/getzep/graphiti.git
cd graphiti
pip install -e .
# Python SDK
pip install zep-cloud
# TypeScript SDK
npm install @getzep/zep-cloud
# Go SDK
go get github.com/getzep/zep-go
Graphiti supports multiple graph database backends:
Neo4j (Recommended)
# Docker
docker run -d \
--name neo4j \
-p 7474:7474 -p 7687:7687 \
-e NEO4J_AUTH=neo4j/password \
neo4j:5.26
FalkorDB
docker run -d \
--name falkordb \
-p 6379:6379 \
falkordb/falkordb:1.1.2
Kuzu
pip install kuzu
Amazon Neptune (Enterprise)
Graphiti works with various LLM providers:
# OpenAI (default)
import os
os.environ["OPENAI_API_KEY"] = "your-key"
# Anthropic Claude
os.environ["ANTHROPIC_API_KEY"] = "your-key"
# Google Gemini
os.environ["GOOGLE_API_KEY"] = "your-key"
# Azure OpenAI
os.environ["AZURE_OPENAI_API_KEY"] = "your-key"
os.environ["AZURE_OPENAI_ENDPOINT"] = "your-endpoint"
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
# Initialize Graphiti with Neo4j
graphiti = Graphiti(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password"
)
# Initialize the graph
await graphiti.build_indices_and_constraints()
# Add a conversation message
await graphiti.add_episode(
name="User Message",
episode_body="I love hiking in the mountains, especially in Colorado.",
source_description="User conversation",
reference_time=datetime.now(),
episode_type=EpisodeType.message
)
# Add business data
await graphiti.add_episode(
name="Purchase Event",
episode_body="User purchased hiking boots size 10",
source_description="E-commerce transaction",
reference_time=datetime.now(),
episode_type=EpisodeType.json
)
# Add document/file content
await graphiti.add_episode(
name="User Profile",
episode_body="John is a 35-year-old software engineer based in Denver",
source_description="User profile document",
reference_time=datetime.now(),
episode_type=EpisodeType.text
)
# Search for relevant context
results = await graphiti.search(
query="What outdoor activities does the user enjoy?",
num_results=10
)
for result in results:
print(f"Entity: {result.name}")
print(f"Content: {result.content}")
print(f"Relevance: {result.score}")
print("---")
# Point-in-time query (historical data)
from datetime import datetime, timedelta
past_time = datetime.now() - timedelta(days=30)
historical_results = await graphiti.search(
query="User preferences",
reference_time=past_time,
num_results=5
)
from pydantic import BaseModel, Field
from typing import Optional
class Product(BaseModel):
"""Custom product entity"""
name: str = Field(description="Product name")
category: str = Field(description="Product category")
price: Optional[float] = Field(description="Product price")
in_stock: bool = Field(default=True, description="Availability status")
class Customer(BaseModel):
"""Custom customer entity"""
name: str = Field(description="Customer name")
email: str = Field(description="Customer email")
tier: str = Field(description="Customer tier: basic, premium, enterprise")
lifetime_value: Optional[float] = Field(description="Total customer value")
# Register custom entity types
graphiti.register_entity_type(Product)
graphiti.register_entity_type(Customer)
# Add episodes with custom entities
await graphiti.add_episode(
name="Product Purchase",
episode_body="Customer john@example.com purchased Premium Hiking Boots for $150",
source_description="Transaction log",
reference_time=datetime.now(),
entity_types=[Customer, Product]
)
from zep_cloud.client import AsyncZep
from zep_cloud import Message
# Initialize Zep client
zep = AsyncZep(api_key="your-api-key")
# Create a user
user = await zep.user.add(
user_id="user_123",
email="user@example.com",
first_name="John",
last_name="Doe",
metadata={"plan": "premium"}
)
# Create a session (conversation thread)
session = await zep.memory.add_session(
session_id="session_456",
user_id="user_123",
metadata={"channel": "web"}
)
# Add messages to build memory
await zep.memory.add_memory(
session_id="session_456",
messages=[
Message(
role="user",
content="I'm planning a hiking trip to Colorado next month"
),
Message(
role="assistant",
content="That sounds great! What areas are you considering?"
)
]
)
# Retrieve relevant context
memory = await zep.memory.get_memory(
session_id="session_456"
)
# Get facts from the knowledge graph
facts = memory.facts
# Get user summary
summary = memory.summary
# Search across user's graph
search_results = await zep.memory.search_memory(
user_id="user_123",
text="outdoor activities and preferences",
search_scope="facts"
)
import { ZepClient } from "@getzep/zep-cloud";
// Initialize client
const zep = new ZepClient({ apiKey: "your-api-key" });
// Create user
const user = await zep.user.add({
userId: "user_123",
email: "user@example.com",
firstName: "John",
lastName: "Doe"
});
// Add messages
await zep.memory.addMemory("session_456", {
messages: [
{ role: "user", content: "I love mountain biking" },
{ role: "assistant", content: "Great! Where do you usually ride?" }
]
});
// Get memory with context
const memory = await zep.memory.getMemory("session_456");
console.log("Facts:", memory.facts);
console.log("Summary:", memory.summary);
// Search user's knowledge graph
const results = await zep.memory.searchMemory({
userId: "user_123",
text: "outdoor hobbies",
searchScope: "facts"
});
# Add multiple episodes efficiently
episodes = [
{
"name": f"Message {i}",
"episode_body": f"Content {i}",
"source_description": "Batch import",
"reference_time": datetime.now(),
"episode_type": EpisodeType.message
}
for i in range(100)
]
# Process in parallel
for episode in episodes:
await graphiti.add_episode(**episode)
# Define domain-specific relationships
ontology = {
"entities": ["Customer", "Product", "Order", "Support Ticket"],
"relations": [
{"type": "purchased", "from": "Customer", "to": "Product"},
{"type": "contains", "from": "Order", "to": "Product"},
{"type": "created", "from": "Customer", "to": "Support Ticket"},
{"type": "related_to", "from": "Support Ticket", "to": "Product"}
]
}
# Use ontology in episode processing
await graphiti.add_episode(
name="Customer Support",
episode_body="Customer had issue with hiking boots, opened ticket",
source_description="Support system",
reference_time=datetime.now(),
ontology=ontology
)
# Find connected entities
from graphiti_core.search import GraphTraversal
# Find all products a user has purchased
traversal = GraphTraversal(
start_entity="user_123",
relationship_types=["purchased"],
max_depth=2
)
related_products = await graphiti.traverse(traversal)
# Find similar users based on preferences
similar_users = await graphiti.find_similar_entities(
entity_id="user_123",
entity_type="Customer",
similarity_threshold=0.7
)
# Assemble comprehensive context for agent
context = await zep.memory.get_memory(
session_id="session_456",
memory_type="perpetual" # Includes full user graph
)
# Build agent prompt with context
system_prompt = f"""
You are a helpful assistant. Here's what you know about the user:
User Summary: {context.summary}
Recent Facts:
{chr(10).join(f"- {fact}" for fact in context.facts[:5])}
User Preferences:
{chr(10).join(f"- {pref}" for pref in context.relevant_facts)}
"""
# Use in your agent framework
from langchain.chat_models import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
response = llm.invoke([
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Recommend some activities for my trip"}
])
# Query what was known at a specific time
from datetime import datetime, timedelta
# What did we know 7 days ago?
past_date = datetime.now() - timedelta(days=7)
past_context = await graphiti.search(
query="user preferences",
reference_time=past_date
)
# Track how knowledge evolved
current_context = await graphiti.search(
query="user preferences",
reference_time=datetime.now()
)
# Compare past vs current
print("7 days ago:", past_context)
print("Current:", current_context)
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from zep_cloud.client import AsyncZep
# Initialize
zep = AsyncZep(api_key="your-key")
llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
# Define agent state
class AgentState(TypedDict):
messages: list
user_id: str
session_id: str
context: str
# Context retrieval node
async def get_context(state: AgentState):
memory = await zep.memory.get_memory(state["session_id"])
context = f"Summary: {memory.summary}\nFacts: {memory.facts}"
return {"context": context}
# LLM node with context
async def call_llm(state: AgentState):
system_msg = f"Context: {state['context']}"
messages = [{"role": "system", "content": system_msg}] + state["messages"]
response = await llm.ainvoke(messages)
return {"messages": state["messages"] + [response]}
# Build graph
workflow = StateGraph(AgentState)
workflow.add_node("get_context", get_context)
workflow.add_node("call_llm", call_llm)
workflow.add_edge("get_context", "call_llm")
workflow.add_edge("call_llm", END)
workflow.set_entry_point("get_context")
app = workflow.compile()
# Run agent
result = await app.ainvoke({
"messages": [{"role": "user", "content": "Plan my trip"}],
"user_id": "user_123",
"session_id": "session_456",
"context": ""
})
from crewai import Agent, Task, Crew
from zep_cloud.client import AsyncZep
# Initialize Zep
zep = AsyncZep(api_key="your-key")
# Custom tool for memory retrieval
from crewai_tools import tool
@tool("get_user_context")
async def get_user_context(user_id: str) -> str:
"""Retrieve user context from Zep knowledge graph"""
results = await zep.memory.search_memory(
user_id=user_id,
text="preferences and history",
search_scope="facts"
)
return "\n".join([r.content for r in results])
# Create agent with memory
planning_agent = Agent(
role="Travel Planner",
goal="Create personalized travel plans",
backstory="Expert at creating customized itineraries",
tools=[get_user_context],
verbose=True
)
# Define task
task = Task(
description="Plan a hiking trip for user_123 based on their preferences",
agent=planning_agent,
expected_output="Detailed 3-day itinerary"
)
# Create crew
crew = Crew(
agents=[planning_agent],
tasks=[task]
)
# Execute with context
result = crew.kickoff()
# Track customer behavior
await graphiti.add_episode(
name="Product View",
episode_body="Customer viewed Premium Hiking Boots, spent 5 minutes",
source_description="Analytics",
reference_time=datetime.now()
)
await graphiti.add_episode(
name="Cart Addition",
episode_body="Customer added Premium Hiking Boots size 10 to cart",
source_description="E-commerce",
reference_time=datetime.now()
)
# Retrieve personalized recommendations
context = await graphiti.search(
query="Products customer is interested in",
num_results=5
)
# Use for recommendation agent
recommendations = llm.invoke(f"""
Based on this context: {context}
Recommend 3 products the customer might like.
""")
# Build comprehensive support context
support_context = await zep.memory.get_memory(session_id="support_789")
# Include:
# - Past issues and resolutions
# - Product ownership
# - Communication preferences
# - Account history
system_prompt = f"""
You are a customer support agent.
Customer History:
{support_context.summary}
Previous Issues:
{support_context.facts}
Provide helpful, personalized support.
"""
# Support agent with full context
support_agent = Agent(
model="claude-3-5-sonnet-20241022",
system=system_prompt
)
# Track patient information securely
await graphiti.add_episode(
name="Patient Visit",
episode_body="Patient reported back pain, prescribed physical therapy",
source_description="EMR System",
reference_time=datetime.now(),
metadata={"confidential": True, "hipaa_compliant": True}
)
# Retrieve medical history for treatment
medical_history = await graphiti.search(
query="patient symptoms and treatments",
filters={"metadata.confidential": True}
)
# Use Zep's enterprise features for compliance
# - RBAC for access control
# - Audit logging
# - Data encryption
# - BYOK (Bring Your Own Key)
# Track individual user contexts in group chat
users = ["user_1", "user_2", "user_3"]
for user_id in users:
# Each user has their own knowledge graph
await zep.user.add(user_id=user_id)
# Add messages with user attribution
await zep.memory.add_memory(
session_id="group_chat_123",
messages=[
Message(role="user", content="I prefer morning meetings",
metadata={"user_id": "user_1"}),
Message(role="user", content="I'm usually free after 2pm",
metadata={"user_id": "user_2"})
]
)
# Retrieve context for each user
for user_id in users:
user_context = await zep.memory.search_memory(
user_id=user_id,
text="scheduling preferences"
)
print(f"{user_id} preferences: {user_context}")
# Batch episodes for better performance
async def batch_ingest(episodes, batch_size=50):
for i in range(0, len(episodes), batch_size):
batch = episodes[i:i + batch_size]
await asyncio.gather(*[
graphiti.add_episode(**ep) for ep in batch
])
# Limit context to most relevant information
def assemble_context(memory, max_facts=10, max_tokens=4000):
facts = memory.facts[:max_facts]
context = f"Summary: {memory.summary}\n\nKey Facts:\n"
for fact in facts:
context += f"- {fact}\n"
if len(context) > max_tokens:
break
return context[:max_tokens]
# Handle entity variations and aliases
entity_aliases = {
"Colorado": ["CO", "Colorful Colorado", "Centennial State"],
"hiking": ["trekking", "backpacking", "trail walking"]
}
# Normalize entities before ingestion
def normalize_episode(text):
for canonical, aliases in entity_aliases.items():
for alias in aliases:
text = text.replace(alias, canonical)
return text
# Implement data retention policies
from datetime import timedelta
async def cleanup_old_data(user_id, retention_days=90):
cutoff_date = datetime.now() - timedelta(days=retention_days)
# Delete old episodes
await graphiti.delete_episodes(
user_id=user_id,
before_date=cutoff_date
)
# Redact sensitive information
def redact_pii(text):
import re
# Redact email addresses
text = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[EMAIL]', text)
# Redact phone numbers
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
return text
# Log graph operations
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("graphiti")
async def add_episode_with_logging(graphiti, **kwargs):
logger.info(f"Adding episode: {kwargs['name']}")
try:
result = await graphiti.add_episode(**kwargs)
logger.info(f"Successfully added episode: {result}")
return result
except Exception as e:
logger.error(f"Failed to add episode: {e}")
raise
# Track performance metrics
import time
async def search_with_metrics(graphiti, query):
start = time.time()
results = await graphiti.search(query)
latency = time.time() - start
logger.info(f"Search latency: {latency:.3f}s, Results: {len(results)}")
return results
# Verify database is running
# For Neo4j:
docker ps | grep neo4j
# Test connection
from neo4j import GraphDatabase
driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password")
)
with driver.session() as session:
result = session.run("RETURN 1 as num")
print(result.single()["num"]) # Should print 1
# Implement exponential backoff
import asyncio
async def add_episode_with_retry(graphiti, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
return await graphiti.add_episode(**kwargs)
except RateLimitError:
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
raise Exception("Max retries exceeded")
# Debug search configuration
results = await graphiti.search(
query="user preferences",
num_results=20, # Increase number
search_config={
"semantic_weight": 0.5, # Adjust weights
"bm25_weight": 0.3,
"graph_weight": 0.2
}
)
# Check if data exists
all_nodes = await graphiti.get_all_nodes(limit=10)
print(f"Total nodes in graph: {len(all_nodes)}")
from graphiti_core import Graphiti
from graphiti_core.llm import OpenAIClient
graphiti = Graphiti(
# Database
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
# LLM
llm_client=OpenAIClient(
model="gpt-4o",
temperature=0.7
),
# Search configuration
search_config={
"semantic_weight": 0.5,
"bm25_weight": 0.3,
"graph_weight": 0.2
},
# Processing
max_workers=4, # Parallel processing
batch_size=50, # Batch size for ingestion
# Embeddings
embedding_model="text-embedding-3-small",
embedding_dim=1536
)
from zep_cloud.client import AsyncZep
zep = AsyncZep(
api_key="your-api-key",
# Optional: Custom endpoint
base_url="https://api.getzep.com",
# Timeout settings
timeout=30.0,
# Retry configuration
max_retries=3
)
# Create optimal indices
await graphiti.build_indices_and_constraints()
# Custom indices for frequently queried properties
await graphiti.create_index("User", "email")
await graphiti.create_index("Product", "sku")
# Use filters to narrow search scope
results = await graphiti.search(
query="hiking products",
filters={
"entity_type": "Product",
"category": "outdoor",
"price_range": {"min": 50, "max": 200}
},
num_results=5
)
# Limit traversal depth for graph queries
results = await graphiti.traverse(
start_node="user_123",
max_depth=2, # Limit depth
relationship_types=["purchased", "viewed"]
)
from functools import lru_cache
import hashlib
# Cache frequent searches
@lru_cache(maxsize=100)
def cache_key(query, user_id):
return hashlib.md5(f"{query}:{user_id}".encode()).hexdigest()
async def cached_search(graphiti, query, user_id):
key = cache_key(query, user_id)
# Implement your caching logic
return await graphiti.search(query)
Graphiti is open-source. Zep offers both open-source and commercial licensing options.
Note: This skill provides comprehensive guidance for building AI agents with temporal knowledge graphs using Graphiti and Zep. Always ensure proper data handling, privacy compliance, and security measures for production applications.