Files
InternalAuditInterprise/backend/app/ingest/registry.py
T

23 lines
684 B
Python

"""适配器注册表:按 staging 表名索引,便于调度器统一调用。"""
from __future__ import annotations
from typing import Type
from app.ingest.base import BaseAdapter
# 全局注册表:staging_table -> Adapter 类
ADAPTER_REGISTRY: dict[str, Type[BaseAdapter]] = {}
def register_adapter(cls: Type[BaseAdapter]) -> Type[BaseAdapter]:
"""类装饰器:将 Adapter 注册到全局表。"""
if cls.staging_table:
ADAPTER_REGISTRY[cls.staging_table] = cls
return cls
def get_adapter(staging_table: str) -> Type[BaseAdapter] | None:
"""按 staging 表名查找已注册的适配器类。"""
return ADAPTER_REGISTRY.get(staging_table)