feat(postgres_impl): add vchordrq vector index support and unify vector index creation logic

This commit is contained in:
wmsnp
2025-11-18 11:45:16 +08:00
parent f83b475ab1
commit d07023c962
3 changed files with 62 additions and 67 deletions

View File

@@ -28,10 +28,13 @@ password = your_password
database = your_database
# workspace = default
max_connections = 12
vector_index_type = HNSW # HNSW or IVFFLAT
vector_index_type = HNSW # HNSW, IVFFLAT or VCHORDRQ
hnsw_m = 16
hnsw_ef = 64
ivfflat_lists = 100
vchordrq_build_options =
vchordrq_probes =
vchordrq_epsilon = 1.9
[memgraph]
uri = bolt://localhost:7687

View File

@@ -352,11 +352,14 @@ POSTGRES_MAX_CONNECTIONS=12
# POSTGRES_WORKSPACE=forced_workspace_name
### PostgreSQL Vector Storage Configuration
### Vector storage type: HNSW, IVFFlat
### Vector storage type: HNSW, IVFFlat, VCHORDRQ
POSTGRES_VECTOR_INDEX_TYPE=HNSW
POSTGRES_HNSW_M=16
POSTGRES_HNSW_EF=200
POSTGRES_IVFFLAT_LISTS=100
POSTGRES_VCHORDRQ_BUILD_OPTIONS=
POSTGRES_VCHORDRQ_PROBES=
POSTGRES_VCHORDRQ_EPSILON=1.9
### PostgreSQL Connection Retry Configuration (Network Robustness)
### Number of retry attempts (1-10, default: 3)

View File

@@ -77,6 +77,9 @@ class PostgreSQLDB:
self.hnsw_m = config.get("hnsw_m")
self.hnsw_ef = config.get("hnsw_ef")
self.ivfflat_lists = config.get("ivfflat_lists")
self.vchordrq_build_options = config.get("vchordrq_build_options")
self.vchordrq_probes = config.get("vchordrq_probes")
self.vchordrq_epsilon = config.get("vchordrq_epsilon")
# Server settings
self.server_settings = config.get("server_settings")
@@ -362,7 +365,8 @@ class PostgreSQLDB:
await self.configure_age(connection, graph_name)
elif with_age and not graph_name:
raise ValueError("Graph name is required when with_age is True")
if self.vector_index_type == "VCHORDRQ":
await self.configure_vchordrq(connection)
return await operation(connection)
@staticmethod
@@ -408,6 +412,14 @@ class PostgreSQLDB:
):
pass
async def configure_vchordrq(self, connection: asyncpg.Connection) -> None:
"""Configure VCHORDRQ extension for vector similarity search."""
try:
await connection.execute(f"SET vchordrq.probes TO '{self.vchordrq_probes}'")
await connection.execute(f"SET vchordrq.epsilon TO {self.vchordrq_epsilon}")
except Exception:
pass
async def _migrate_llm_cache_schema(self):
"""Migrate LLM cache schema: add new columns and remove deprecated mode field"""
try:
@@ -1142,19 +1154,12 @@ class PostgreSQLDB:
f"PostgreSQL, Create vector indexs, type: {self.vector_index_type}"
)
try:
if self.vector_index_type == "HNSW":
await self._create_hnsw_vector_indexes()
elif self.vector_index_type == "IVFFLAT":
await self._create_ivfflat_vector_indexes()
elif self.vector_index_type == "FLAT":
logger.warning(
"FLAT index type is not supported by pgvector. Skipping vector index creation. "
"Please use 'HNSW' or 'IVFFLAT' instead."
)
if self.vector_index_type in ["HNSW", "IVFFLAT", "VCHORDRQ"]:
await self._create_vector_indexes()
else:
logger.warning(
"Doesn't support this vector index type: {self.vector_index_type}. "
"Supported types: HNSW, IVFFLAT"
"Supported types: HNSW, IVFFLAT, VCHORDRQ"
)
except Exception as e:
logger.error(
@@ -1361,21 +1366,37 @@ class PostgreSQLDB:
except Exception as e:
logger.warning(f"Failed to create index {index['name']}: {e}")
async def _create_hnsw_vector_indexes(self):
async def _create_vector_indexes(self):
vdb_tables = [
"LIGHTRAG_VDB_CHUNKS",
"LIGHTRAG_VDB_ENTITY",
"LIGHTRAG_VDB_RELATION",
]
embedding_dim = int(os.environ.get("EMBEDDING_DIM", 1024))
create_sql = {
"HNSW": f"""
CREATE INDEX {{vector_index_name}}
ON {{k}} USING hnsw (content_vector vector_cosine_ops)
WITH (m = {self.hnsw_m}, ef_construction = {self.hnsw_ef})
""",
"IVFFLAT": f"""
CREATE INDEX {{vector_index_name}}
ON {{k}} USING ivfflat (content_vector vector_cosine_ops)
WITH (lists = {self.ivfflat_lists})
""",
"VCHORDRQ": f"""
CREATE INDEX {{vector_index_name}}
ON {{k}} USING vchordrq (content_vector vector_cosine_ops)
{f'WITH (options = $${self.vchordrq_build_options}$$)' if self.vchordrq_build_options else ''}
"""
}
embedding_dim = int(os.environ.get("EMBEDDING_DIM", 1024))
for k in vdb_tables:
vector_index_name = f"idx_{k.lower()}_hnsw_cosine"
vector_index_name = f"idx_{k.lower()}_{self.vector_index_type.lower()}_cosine"
check_vector_index_sql = f"""
SELECT 1 FROM pg_indexes
WHERE indexname = '{vector_index_name}'
AND tablename = '{k.lower()}'
WHERE indexname = '{vector_index_name}' AND tablename = '{k.lower()}'
"""
try:
vector_index_exists = await self.query(check_vector_index_sql)
@@ -1384,64 +1405,18 @@ class PostgreSQLDB:
alter_sql = f"ALTER TABLE {k} ALTER COLUMN content_vector TYPE VECTOR({embedding_dim})"
await self.execute(alter_sql)
logger.debug(f"Ensured vector dimension for {k}")
create_vector_index_sql = f"""
CREATE INDEX {vector_index_name}
ON {k} USING hnsw (content_vector vector_cosine_ops)
WITH (m = {self.hnsw_m}, ef_construction = {self.hnsw_ef})
"""
logger.info(f"Creating hnsw index {vector_index_name} on table {k}")
await self.execute(create_vector_index_sql)
logger.info(f"Creating {self.vector_index_type} index {vector_index_name} on table {k}")
await self.execute(create_sql[self.vector_index_type].format(vector_index_name=vector_index_name, k=k))
logger.info(
f"Successfully created vector index {vector_index_name} on table {k}"
)
else:
logger.info(
f"HNSW vector index {vector_index_name} already exists on table {k}"
f"{self.vector_index_type} vector index {vector_index_name} already exists on table {k}"
)
except Exception as e:
logger.error(f"Failed to create vector index on table {k}, Got: {e}")
async def _create_ivfflat_vector_indexes(self):
vdb_tables = [
"LIGHTRAG_VDB_CHUNKS",
"LIGHTRAG_VDB_ENTITY",
"LIGHTRAG_VDB_RELATION",
]
embedding_dim = int(os.environ.get("EMBEDDING_DIM", 1024))
for k in vdb_tables:
index_name = f"idx_{k.lower()}_ivfflat_cosine"
check_index_sql = f"""
SELECT 1 FROM pg_indexes
WHERE indexname = '{index_name}' AND tablename = '{k.lower()}'
"""
try:
exists = await self.query(check_index_sql)
if not exists:
# Only set vector dimension when index doesn't exist
alter_sql = f"ALTER TABLE {k} ALTER COLUMN content_vector TYPE VECTOR({embedding_dim})"
await self.execute(alter_sql)
logger.debug(f"Ensured vector dimension for {k}")
create_sql = f"""
CREATE INDEX {index_name}
ON {k} USING ivfflat (content_vector vector_cosine_ops)
WITH (lists = {self.ivfflat_lists})
"""
logger.info(f"Creating ivfflat index {index_name} on table {k}")
await self.execute(create_sql)
logger.info(
f"Successfully created ivfflat index {index_name} on table {k}"
)
else:
logger.info(
f"Ivfflat vector index {index_name} already exists on table {k}"
)
except Exception as e:
logger.error(f"Failed to create ivfflat index on {k}: {e}")
async def query(
self,
sql: str,
@@ -1596,6 +1571,20 @@ class ClientManager:
config.get("postgres", "ivfflat_lists", fallback="100"),
)
),
"vchordrq_build_options": os.environ.get(
"POSTGRES_VCHORDRQ_BUILD_OPTIONS",
config.get("postgres", "vchordrq_build_options", fallback=""),
),
"vchordrq_probes": os.environ.get(
"POSTGRES_VCHORDRQ_PROBES",
config.get("postgres", "vchordrq_probes", fallback=""),
),
"vchordrq_epsilon": float(
os.environ.get(
"POSTGRES_VCHORDRQ_EPSILON",
config.get("postgres", "vchordrq_epsilon", fallback="1.9"),
)
),
# Server settings for Supabase
"server_settings": os.environ.get(
"POSTGRES_SERVER_SETTINGS",