Files
InternalAuditInterprise/backend/app/ingest/adapters_r9.py
T

185 lines
6.8 KiB
Python

"""R9 适配器:市场业务真实性 / 养卡骗补。
源明细:SrcChannelMonthly / SrcSubscription
映射到:Entity(CHANNEL, MSISDN) + 关系(BELONGS_TO_CHANNEL, SUBSCRIBES) + MetricEvent
"""
from __future__ import annotations
import datetime as dt
import uuid
from sqlalchemy.orm import Session
from app.datahub.graph_repo import add_relationship, upsert_entity
from app.datahub.models import MetricEvent
from app.datahub.ontology import EntityType, RelationshipType
from app.datahub.staging import SrcChannelMonthly, SrcSubscription
from app.ingest.base import BaseAdapter, IngestResult
from app.ingest.registry import register_adapter
@register_adapter
class ChannelMonthlyAdapter(BaseAdapter):
"""SrcChannelMonthly → MetricEvent(渠道月度留存/佣金时序)。"""
source_system = "BSS"
staging_table = "src_channel_monthly"
def ingest(
self,
session: Session,
data_version_id: uuid.UUID | None = None,
batch_size: int = 1000,
) -> IngestResult:
result = IngestResult()
query = session.query(SrcChannelMonthly)
if data_version_id:
query = query.filter(SrcChannelMonthly.data_version_id == data_version_id)
rows = query.limit(batch_size).all()
for row in rows:
try:
# 确保渠道实体存在
upsert_entity(
session,
entity_type=EntityType.CHANNEL,
business_key=row.channel_key,
data_version_id=data_version_id,
)
# cohort_label 如 "2025-01" → 转为时间
try:
event_time = dt.datetime.strptime(
row.cohort_label, "%Y-%m"
).replace(tzinfo=dt.timezone.utc)
except ValueError:
event_time = dt.datetime.now(dt.timezone.utc)
# 留存率事件
event = MetricEvent(
event_time=event_time,
subject_type="channel",
subject_key=row.channel_key,
metric_name="retention",
metric_value=row.retained / row.cohort_size if row.cohort_size > 0 else 0.0,
attributes={
"cohort_label": row.cohort_label,
"month_index": row.month_index,
"cohort_size": row.cohort_size,
"retained": row.retained,
"commission_paid": row.commission_paid,
"active_ratio": row.active_ratio,
"zero_usage_ratio": row.zero_usage_ratio,
},
data_version_id=data_version_id,
)
session.add(event)
result.metric_events.append(event)
result.row_count += 1
except Exception:
result.error_count += 1
return result
@register_adapter
class SubscriptionAdapter(BaseAdapter):
"""SrcSubscription → Entity(MSISDN) + 关系(BELONGS_TO_CHANNEL, SUBSCRIBES) + MetricEvent。"""
source_system = "BSS"
staging_table = "src_subscription"
def ingest(
self,
session: Session,
data_version_id: uuid.UUID | None = None,
batch_size: int = 1000,
) -> IngestResult:
result = IngestResult()
query = session.query(SrcSubscription)
if data_version_id:
query = query.filter(SrcSubscription.data_version_id == data_version_id)
rows = query.limit(batch_size).all()
for row in rows:
try:
# MSISDN 实体
msisdn_entity = upsert_entity(
session,
entity_type=EntityType.MSISDN,
business_key=row.msisdn,
display_name=row.msisdn,
attributes={"region": row.region},
data_version_id=data_version_id,
)
result.entities.append(msisdn_entity)
# 渠道归属关系
if row.channel_key:
chan_entity = upsert_entity(
session,
entity_type=EntityType.CHANNEL,
business_key=row.channel_key,
data_version_id=data_version_id,
)
rel = add_relationship(
session, RelationshipType.BELONGS_TO_CHANNEL, msisdn_entity, chan_entity,
data_version_id=data_version_id,
)
result.relationships.append(rel)
# 订购关系(号码→合同/产品)
if row.product_code:
contract_entity = upsert_entity(
session,
entity_type=EntityType.CONTRACT,
business_key=row.product_code,
display_name=f"产品-{row.product_code}",
data_version_id=data_version_id,
)
rel = add_relationship(
session, RelationshipType.SUBSCRIBES, msisdn_entity, contract_entity,
data_version_id=data_version_id,
)
result.relationships.append(rel)
# 订购/退订时序事件
if row.subscribe_time:
event = MetricEvent(
event_time=row.subscribe_time,
subject_type="msisdn",
subject_key=row.msisdn,
metric_name="subscribe",
metric_value=1.0,
attributes={
"channel_key": row.channel_key,
"product_code": row.product_code,
},
data_version_id=data_version_id,
)
session.add(event)
result.metric_events.append(event)
if row.unsubscribe_time:
event = MetricEvent(
event_time=row.unsubscribe_time,
subject_type="msisdn",
subject_key=row.msisdn,
metric_name="unsubscribe",
metric_value=-1.0,
attributes={
"channel_key": row.channel_key,
"product_code": row.product_code,
},
data_version_id=data_version_id,
)
session.add(event)
result.metric_events.append(event)
result.row_count += 1
except Exception:
result.error_count += 1
return result