init: AI培训与智能巡检系统

This commit is contained in:
selfrelease
2026-06-16 00:55:20 +08:00
commit c55598494b
201 changed files with 53131 additions and 0 deletions
+39
View File
@@ -0,0 +1,39 @@
# 中国机车图鉴 · 应用工程
对应 `2-task.md` 的实现代码。当前已完成 **Phase 0(部分)+ Phase 1A(数据底座 ETL**
## 目录结构
```
app/
etl/
field_dict.py # 统一字段字典、同义表头映射、枚举、分类配置 (T-1.1)
clean.py # 清洗规则:单元格清理、数值+单位拆分、年代/状态/国别 (T-1.2)
schema.sql # 三层模型 schemaCategory/Model/Unit + 关系表 (T-1.1)
importer.py # ETL 主流程:12 CSV -> SQLite + JSON + 报告 (T-1.2)
tests/ # 单元/集成测试 (Jest 等价物,使用 Python unittest)
data/ # 产物:machines.db / machines.json / import_report.md
```
## 运行 ETL 导入
```bash
# 在 Train/ 目录下
python3 -m app.etl.importer
```
输入:`Train/csv/*.csv`(由总表 xlsx 转出的 12 张分类表)
输出:`app/data/machines.db``machines.json``import_report.md`
当前导入结果:**Model 540 / Unit 307**,跳过 10(均为空行),0 待复核。
## 运行测试
```bash
python3 -m unittest discover -s app/tests -p "test_*.py" -v
```
覆盖:字段映射、单位拆分、年代/状态解析、向前填充、表头识别、全量导入幂等性。
## 设计说明与已知取舍
- **存储**:开发期使用 SQLite(零依赖、可运行验证)。schema 按可移植到 PostgreSQL 设计,
Phase 1B 接入 API 时可平滑迁移(数值+单位字段、外键、索引均兼容)。
- **保真**:每条记录保留 `raw_json`,原始清洗后所有列不丢失,便于后续众包修订与字段补全。
- **复合数值**(如电力机车 "2×92(100)"):取首个数值入 `*_value`,完整原文存 `raw_json`
待 Phase 2 众包修订细化。
- **国别属性**:当前用启发式默认"国产",字段已就位,等待圈层二(国外车型)数据与人工标注。
View File
Binary file not shown.
BIN
View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
+23
View File
@@ -0,0 +1,23 @@
# ETL 导入报告
- 车型(Model)**540**
- 个体(Unit)**307**
- 跳过(无主键)**10**
- 待人工复核:**0**
## 分表明细
| 分类表 | 分类 | 粒度 | 数据行 | 入库 | 跳过 | 备注 |
|---|---|---|---|---|---|---|
| 中国蒸汽汽车统计表(建国前) | 蒸汽机车 | model | 0 | 0 | 0 | 空表(无数据) |
| 中国蒸汽机车型号表(建国后) | 蒸汽机车 | model | 16 | 16 | 0 | |
| 北京表(蒸汽机车) | 蒸汽机车 | unit | 67 | 59 | 8 | |
| 全国普速客车型号统计表 | 客车 | model | 188 | 188 | 0 | |
| 全国电力机车型号表 | 电力机车 | model | 63 | 63 | 0 | |
| 全国内燃机车型号表 | 内燃机车 | model | 100 | 100 | 0 | |
| CJ型动车组、早期动车组及和谐号统计表 | 动车组 | model | 59 | 59 | 0 | |
| 全国复兴号统计表 | 动车组 | model | 41 | 41 | 0 | |
| 中国铁路货车统计表 | 货车 | model | 35 | 35 | 0 | |
| 中国铁路普速检测车统计表 | 检测车 | unit | 229 | 229 | 0 | |
| 中国高速综合检测列车统计表 | 检测车 | unit | 21 | 19 | 2 | |
| 中国旅游列车统计表 | 旅游列车 | model | 38 | 38 | 0 | |
Binary file not shown.
+27274
View File
File diff suppressed because it is too large Load Diff
Binary file not shown.

After

Width:  |  Height:  |  Size: 351 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 346 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 232 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 572 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 301 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 414 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 415 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 319 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 270 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 236 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 338 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 341 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 252 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 418 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 235 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 284 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 237 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 274 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 347 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 273 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 532 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 346 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 232 KiB

View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+110
View File
@@ -0,0 +1,110 @@
"""清洗规则:单元格清理、数值+单位拆分、年代解析、状态/国别规范化。
对应任务 T-1.2(清洗)。所有函数均为纯函数,便于单元测试(T-1.2 UT)。
"""
import re
# 占位/空值标记
_EMPTY_MARKERS = {"", "-", "——", "", "/", "N/A", "n/a", "", "未知"}
def clean_cell(value) -> str:
"""清理单元格:去换行、压缩空白、trim。返回字符串。"""
if value is None:
return ""
s = str(value).replace("\n", "").replace("\r", "")
s = s.replace("\u3000", " ")
s = re.sub(r"\s+", " ", s).strip()
return s
def is_empty(value) -> bool:
"""判断是否为空/占位值。"""
return clean_cell(value) in _EMPTY_MARKERS
# 匹配前导数字(含小数、千分位、负号),后跟可选单位
_NUM_RE = re.compile(r"-?\d+(?:\.\d+)?")
def parse_value_unit(raw, default_unit: str = ""):
"""从形如 '400km/h(试验)''17t''2×92(100)''126.0''——'
中拆出 (数值: float|None, 单位: str, 原文: str)。
规则:取第一个出现的数值作为主数值;单位优先取 default_unit
否则尝试从数值后紧跟的字母/单位片段提取;无数值则返回 (None, '', 原文)。
"""
text = clean_cell(raw)
if text in _EMPTY_MARKERS:
return None, "", text
m = _NUM_RE.search(text)
if not m:
return None, "", text
value = float(m.group())
unit = default_unit
if not unit:
# 取数值之后紧邻的单位片段(字母 / 常见单位字符)
rest = text[m.end():]
um = re.match(r"\s*([A-Za-z%/·³²]+(?:/[A-Za-z]+)?)", rest)
if um:
unit = um.group(1)
return value, unit, text
def parse_year(raw):
"""解析年份:'1971 年''2006''2007-12-22 00:00:00' -> int|None。"""
text = clean_cell(raw)
if text in _EMPTY_MARKERS:
return None
m = re.search(r"(1[89]\d{2}|20\d{2})", text)
return int(m.group(1)) if m else None
def normalize_status(raw) -> str:
"""把自由文本状态规范到枚举之一。"""
text = clean_cell(raw)
if not text:
return "未知"
mapping = [
("半封存", "半封存"),
("封存", "封存"),
("现役", "现役"),
("在役", "现役"),
("退役", "退役"),
("淘汰", "退役"),
("报废", "报废"),
("保存", "保存"),
("试验", "试验"),
("样车", "试验"),
]
for kw, val in mapping:
if kw in text:
return val
return "未知"
# 国别属性的简单启发式推断(默认国产;后续众包修订可覆盖)
_IMPORT_HINTS = ["进口", "苏联", "罗马尼亚", "法国", "日本", "德国", "美国", "捷克"]
def infer_country_type(*texts) -> str:
blob = " ".join(clean_cell(t) for t in texts)
if "合资" in blob:
return "中外合资"
if "仿制" in blob or "引进" in blob:
return "引进仿制"
for h in _IMPORT_HINTS:
if h in blob:
return "进口"
return "国产"
def forward_fill(values):
"""对一列做向前填充(处理合并单元格被拆空的 系列 列)。"""
out, last = [], ""
for v in values:
c = clean_cell(v)
if c:
last = c
out.append(last)
return out
+109
View File
@@ -0,0 +1,109 @@
"""统一字段字典与枚举。
将各 sheet 中形态各异的表头映射到规范字段名(canonical),
并声明哪些字段是"数值 + 单位"型(需要拆分)。
对应任务 T-1.1(字段字典)/ T-1.2(字段映射)。
"""
# 规范字段 -> 该字段的同义表头集合(去空格后匹配)
# 注意:表头里常含空格/换行,匹配前统一做 normalize_header() 处理。
FIELD_SYNONYMS = {
"series": ["系列"],
"model_code": ["型号", "型号(代号)", "车型", "车型代码", "小表编号车型"],
"full_name": ["车型全称", "车型名称"],
"first_year": ["首产时间", "制造时间", "生产时间"],
"last_year": ["停产时间"],
"manufacturer": ["生产商", "主要生产厂商", "生产厂商", "制造商"],
"length": ["车体长度/mm", "机车长度/mm", "车辆全长/mm", "车体全长/mm"],
"width": ["车体宽度/mm", "机车宽度/mm"],
"height": ["车体高度/mm"],
"wheelbase": ["轴距/mm", "车辆定距/mm"],
"weight": ["整备重量/t", "自重/t", "整备重量/轴重/t", "整备重量/节数"],
"axle_load": ["轴重/t"],
"axle_arrangement": ["轴列式", "轴式", "轴列式(标准编组)"],
"tractive_start": ["牵引力(起动)/kN"],
"tractive_cont": ["牵引力(持续)/kN"],
"power_kw": ["牵引力/kW"],
"efficiency": ["传动效率", "机械效率"],
"drive": ["传动方式/动力来源", "供电方式(传动方式)", "供电方式", "制动方式"],
"max_speed": ["最高运行时速", "最高运营时速", "构造速度/km/h", "运行时速",
"最高速度", "构造速度"],
"usage": ["用途", "主要用途", "核心运行区段/主题"],
"production_count": ["产量", "累计产量", "生产台数", "生产数量",
"累计产量/配属"],
"capacity": ["容积/m³", "定员/人"],
"load": ["载重/t"],
"bogie": ["转向架型号"],
"coupler": ["车钩类型"],
# 个体(Unit)粒度字段
"car_number": ["车号", "编号"],
"function": ["功能"],
"depot": ["配属", "所属路局"],
"livery": ["颜色", "涂装"],
"status": ["状态"],
"side_mark": ["侧标"],
"note": ["备注"],
"location": ["存放位置"],
"formation": ["动力车、拖车", "动力车拖车"],
"predecessor": ["前身(部分含)", "前身"],
"lifespan": ["最大使用寿命(结合实际)"],
"tour_name": ["旅游列车名称"],
"tractor_models": ["牵引机车常用型号"],
}
# "数值 + 单位"型字段:导入时拆出 <field>_value(float) 与 <field>_unit(text)
NUMERIC_UNIT_FIELDS = {
"length": "mm", "width": "mm", "height": "mm", "wheelbase": "mm",
"weight": "t", "axle_load": "t", "load": "t",
"tractive_start": "kN", "tractive_cont": "kN", "power_kw": "kW",
"max_speed": "km/h", "capacity": "",
}
# 状态枚举(规范化目标)
STATUS_ENUM = ["现役", "封存", "半封存", "报废", "退役", "保存", "试验", "未知"]
# 国别属性枚举
COUNTRY_TYPE_ENUM = ["国产", "进口", "引进仿制", "中外合资", "未知"]
# sheet 文件名 -> 分类配置
# grain: "model" 车型粒度 / "unit" 个体粒度
CATEGORY_CONFIG = {
"中国蒸汽汽车统计表(建国前)": {"category": "蒸汽机车", "subcat": "建国前", "grain": "model"},
"中国蒸汽机车型号表(建国后)": {"category": "蒸汽机车", "subcat": "建国后", "grain": "model"},
"北京表(蒸汽机车)": {"category": "蒸汽机车", "subcat": "北京现存", "grain": "unit"},
"全国普速客车型号统计表": {"category": "客车", "subcat": "普速客车", "grain": "model"},
"全国电力机车型号表": {"category": "电力机车", "subcat": "", "grain": "model",
"col_override": {0: "series", 1: "model_code"}},
"全国内燃机车型号表": {"category": "内燃机车", "subcat": "", "grain": "model"},
"CJ型动车组、早期动车组及和谐号统计表": {"category": "动车组", "subcat": "CJ/早期/和谐号", "grain": "model"},
"全国复兴号统计表": {"category": "动车组", "subcat": "复兴号", "grain": "model"},
"中国铁路货车统计表": {"category": "货车", "subcat": "", "grain": "model"},
"中国铁路普速检测车统计表": {"category": "检测车", "subcat": "普速检测", "grain": "unit"},
"中国高速综合检测列车统计表": {"category": "检测车", "subcat": "高速综合检测", "grain": "unit"},
"中国旅游列车统计表": {"category": "旅游列车", "subcat": "", "grain": "model"},
}
# 用于识别"表头行"的关键 tokennormalize 后)
HEADER_TOKENS = {"型号", "车号", "车型代码", "车型", "旅游列车名称", "小表编号",
"型号(代号)", "系列", "车型全称"}
# 反向索引:normalized header -> canonical field
_REVERSE = {}
for _canon, _syns in FIELD_SYNONYMS.items():
for _s in _syns:
_REVERSE[_s] = _canon
def normalize_header(h: str) -> str:
"""规范化表头:去除空白、换行、全角空格,并将全角括号转半角。"""
if h is None:
return ""
s = (h.replace("\n", "").replace("\r", "")
.replace(" ", "").replace("\u3000", "").strip())
s = s.replace("", "(").replace("", ")")
return s
def map_header(h: str):
"""把原始表头映射到 canonical 字段名,未知返回 None。"""
return _REVERSE.get(normalize_header(h))
+267
View File
@@ -0,0 +1,267 @@
"""ETL 导入管线:12 张 CSV -> 清洗 -> SQLite + JSON + 导入报告。
对应任务 T-1.2。运行:
python3 -m app.etl.importer # 从 Train/ 目录运行
或:
python3 app/etl/importer.py
"""
import csv
import json
import os
import sqlite3
import sys
from . import field_dict as fd
from . import clean as cl
HERE = os.path.dirname(os.path.abspath(__file__))
APP_DIR = os.path.dirname(HERE)
ROOT = os.path.dirname(APP_DIR) # Train/
CSV_DIR = os.path.join(ROOT, "csv")
OUT_DIR = os.path.join(APP_DIR, "data")
DB_PATH = os.path.join(OUT_DIR, "machines.db")
JSON_PATH = os.path.join(OUT_DIR, "machines.json")
REPORT_PATH = os.path.join(OUT_DIR, "import_report.md")
SCHEMA_PATH = os.path.join(HERE, "schema.sql")
def find_header_row(rows):
"""返回表头行索引:含 >=3 个非空单元且命中关键 token 的首行。"""
for i, row in enumerate(rows):
cells = [fd.normalize_header(c) for c in row]
nonempty = [c for c in cells if c]
if len(nonempty) >= 3 and (set(cells) & fd.HEADER_TOKENS):
return i
return None
def build_column_map(header_row):
"""列索引 -> canonical 字段名(未知列保留为 raw::原表头)。"""
col_map = {}
for idx, h in enumerate(header_row):
canon = fd.map_header(h)
norm = fd.normalize_header(h)
if canon:
col_map[idx] = canon
elif norm:
col_map[idx] = "raw::" + norm
return col_map
def clean_record(row, col_map):
"""把一行映射为 canonical 字段 dict(含 raw:: 保真列)。"""
rec, raw = {}, {}
for idx, field in col_map.items():
value = row[idx] if idx < len(row) else ""
if field.startswith("raw::"):
c = cl.clean_cell(value)
if c:
raw[field[5:]] = c
continue
rec[field] = value
rec["_raw_extra"] = raw
return rec
def to_model_row(rec, category_id, sheet, series_value):
"""构造 model 表插入字典。"""
raw_all = dict(rec.get("_raw_extra", {}))
m = {
"category_id": category_id,
"series": cl.clean_cell(rec.get("series") or series_value),
"model_code": cl.clean_cell(rec.get("model_code")),
"full_name": cl.clean_cell(rec.get("full_name")),
"manufacturer": cl.clean_cell(rec.get("manufacturer")),
"first_year": cl.parse_year(rec.get("first_year")),
"last_year": cl.parse_year(rec.get("last_year")),
"status": cl.normalize_status(rec.get("status")),
"usage": cl.clean_cell(rec.get("usage")),
"production_count": cl.clean_cell(rec.get("production_count")),
"axle_arrangement": cl.clean_cell(rec.get("axle_arrangement")),
"drive": cl.clean_cell(rec.get("drive")),
"efficiency": cl.clean_cell(rec.get("efficiency")),
"country": "中国",
"country_type": cl.infer_country_type(
rec.get("manufacturer"), rec.get("model_code"),
rec.get("usage"), rec.get("production_count")),
"source_sheet": sheet,
}
# 数值+单位字段拆分
for field, default_unit in fd.NUMERIC_UNIT_FIELDS.items():
if field not in fd.FIELD_SYNONYMS:
continue
val, unit, _ = cl.parse_value_unit(rec.get(field), default_unit)
m[field + "_value"] = val
m[field + "_unit"] = unit or default_unit
if rec.get(field) is not None:
raw_all[field] = cl.clean_cell(rec.get(field))
# 把所有 canonical 原文也并入 raw_json 保真
for k, v in rec.items():
if k == "_raw_extra":
continue
c = cl.clean_cell(v)
if c:
raw_all[k] = c
m["raw_json"] = json.dumps(raw_all, ensure_ascii=False)
return m
def to_unit_row(rec, category_id, sheet):
raw_all = dict(rec.get("_raw_extra", {}))
for k, v in rec.items():
if k == "_raw_extra":
continue
c = cl.clean_cell(v)
if c:
raw_all[k] = c
return {
"category_id": category_id,
"car_number": cl.clean_cell(rec.get("car_number")),
"model_name": cl.clean_cell(rec.get("full_name") or rec.get("model_code")),
"function": cl.clean_cell(rec.get("function")),
"depot": cl.clean_cell(rec.get("depot")),
"livery": cl.clean_cell(rec.get("livery")),
"status": cl.normalize_status(rec.get("status")),
"location": cl.clean_cell(rec.get("location")),
"note": cl.clean_cell(rec.get("note")),
"raw_json": json.dumps(raw_all, ensure_ascii=False),
"source_sheet": sheet,
}
def _insert(conn, table, row):
cols = list(row.keys())
ph = ",".join(["?"] * len(cols))
conn.execute(
f"INSERT INTO {table} ({','.join(cols)}) VALUES ({ph})",
[row[c] for c in cols],
)
def import_all(csv_dir=CSV_DIR, db_path=DB_PATH):
os.makedirs(OUT_DIR, exist_ok=True)
if os.path.exists(db_path):
os.remove(db_path)
conn = sqlite3.connect(db_path)
with open(SCHEMA_PATH, encoding="utf-8") as f:
conn.executescript(f.read())
report = {"sheets": [], "models": 0, "units": 0, "skipped": 0, "review": []}
export = {"categories": [], "models": [], "units": []}
cat_ids = {}
for sheet, cfg in fd.CATEGORY_CONFIG.items():
path = os.path.join(csv_dir, sheet + ".csv")
entry = {"sheet": sheet, "category": cfg["category"], "grain": cfg["grain"],
"rows": 0, "imported": 0, "skipped": 0, "note": ""}
if not os.path.exists(path):
entry["note"] = "文件缺失"
report["sheets"].append(entry)
continue
with open(path, encoding="utf-8-sig") as fh:
rows = list(csv.reader(fh))
if not rows:
entry["note"] = "空表(无数据)"
report["sheets"].append(entry)
continue
key = (cfg["category"], cfg["subcat"])
if key not in cat_ids:
cur = conn.execute(
"INSERT INTO category(name, subcat, slug) VALUES (?,?,?)",
(cfg["category"], cfg["subcat"], None))
cat_ids[key] = cur.lastrowid
export["categories"].append(
{"id": cur.lastrowid, "name": cfg["category"], "subcat": cfg["subcat"]})
category_id = cat_ids[key]
hidx = find_header_row(rows)
if hidx is None:
entry["note"] = "未识别表头行"
report["sheets"].append(entry)
continue
col_map = build_column_map(rows[hidx])
# 应用 per-sheet 列覆盖(处理表头标注与实际不符的脏表)
for idx, field in cfg.get("col_override", {}).items():
col_map[idx] = field
data_rows = rows[hidx + 1:]
entry["rows"] = len(data_rows)
# 系列列向前填充(合并单元格)
series_col = next((i for i, f in col_map.items() if f == "series"), None)
if series_col is not None:
filled = cl.forward_fill([r[series_col] if series_col < len(r) else ""
for r in data_rows])
else:
filled = [""] * len(data_rows)
for r, series_value in zip(data_rows, filled):
rec = clean_record(r, col_map)
if cfg["grain"] == "unit":
car = cl.clean_cell(rec.get("car_number"))
if not car:
entry["skipped"] += 1
report["skipped"] += 1
continue
row = to_unit_row(rec, category_id, sheet)
_insert(conn, "unit", row)
export["units"].append(row)
report["units"] += 1
entry["imported"] += 1
else:
code = cl.clean_cell(rec.get("model_code")) or \
cl.clean_cell(rec.get("tour_name")) or \
cl.clean_cell(rec.get("full_name"))
if not code:
entry["skipped"] += 1
report["skipped"] += 1
continue
row = to_model_row(rec, category_id, sheet, series_value)
if not row["model_code"]:
row["model_code"] = code
_insert(conn, "model", row)
export["models"].append(row)
report["models"] += 1
entry["imported"] += 1
# 年代逻辑校验 -> 待复核
if (row["first_year"] and row["last_year"]
and row["first_year"] > row["last_year"]):
report["review"].append(
f"{sheet} / {code}: 首产年 {row['first_year']} > 停产年 {row['last_year']}")
report["sheets"].append(entry)
conn.commit()
conn.close()
with open(JSON_PATH, "w", encoding="utf-8") as f:
json.dump(export, f, ensure_ascii=False, indent=2)
_write_report(report)
return report
def _write_report(report):
lines = ["# ETL 导入报告\n",
f"- 车型(Model)**{report['models']}**",
f"- 个体(Unit)**{report['units']}**",
f"- 跳过(无主键)**{report['skipped']}**",
f"- 待人工复核:**{len(report['review'])}**\n",
"## 分表明细\n",
"| 分类表 | 分类 | 粒度 | 数据行 | 入库 | 跳过 | 备注 |",
"|---|---|---|---|---|---|---|"]
for s in report["sheets"]:
lines.append(f"| {s['sheet']} | {s['category']} | {s['grain']} | "
f"{s['rows']} | {s['imported']} | {s['skipped']} | {s['note']} |")
if report["review"]:
lines.append("\n## 待人工复核\n")
for r in report["review"]:
lines.append(f"- {r}")
with open(REPORT_PATH, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
if __name__ == "__main__":
rep = import_all()
print(f"导入完成:Model={rep['models']} Unit={rep['units']} "
f"跳过={rep['skipped']} 待复核={len(rep['review'])}")
print(f"输出:\n {DB_PATH}\n {JSON_PATH}\n {REPORT_PATH}")
sys.exit(0)
+77
View File
@@ -0,0 +1,77 @@
-- 中国机车图鉴 · 数据底座 schema(SQLite,设计为可移植到 PostgreSQL
-- 对应任务 T-1.1。三层模型:Category -> Model -> Unit,加 Model 关系表。
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS category (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
subcat TEXT DEFAULT '',
slug TEXT,
UNIQUE(name, subcat)
);
CREATE TABLE IF NOT EXISTS model (
id INTEGER PRIMARY KEY,
category_id INTEGER NOT NULL REFERENCES category(id),
series TEXT DEFAULT '',
model_code TEXT NOT NULL,
full_name TEXT DEFAULT '',
aliases TEXT DEFAULT '',
manufacturer TEXT DEFAULT '',
country TEXT DEFAULT '中国',
country_type TEXT DEFAULT '国产', -- 国产/进口/引进仿制/中外合资/未知
first_year INTEGER,
last_year INTEGER,
status TEXT DEFAULT '未知',
usage TEXT DEFAULT '',
production_count TEXT DEFAULT '',
axle_arrangement TEXT DEFAULT '',
drive TEXT DEFAULT '',
efficiency TEXT DEFAULT '',
-- 数值 + 单位(拆分存储)
length_value REAL, length_unit TEXT,
width_value REAL, width_unit TEXT,
height_value REAL, height_unit TEXT,
wheelbase_value REAL, wheelbase_unit TEXT,
weight_value REAL, weight_unit TEXT,
axle_load_value REAL, axle_load_unit TEXT,
load_value REAL, load_unit TEXT,
tractive_start_value REAL, tractive_start_unit TEXT,
tractive_cont_value REAL, tractive_cont_unit TEXT,
power_kw_value REAL, power_kw_unit TEXT,
max_speed_value REAL, max_speed_unit TEXT,
capacity_value REAL, capacity_unit TEXT,
raw_json TEXT, -- 原始清洗后所有列,保真不丢字段
source_sheet TEXT
);
CREATE INDEX IF NOT EXISTS idx_model_category ON model(category_id);
CREATE INDEX IF NOT EXISTS idx_model_code ON model(model_code);
CREATE INDEX IF NOT EXISTS idx_model_first_year ON model(first_year);
CREATE TABLE IF NOT EXISTS unit (
id INTEGER PRIMARY KEY,
category_id INTEGER NOT NULL REFERENCES category(id),
model_id INTEGER REFERENCES model(id),
car_number TEXT,
model_name TEXT DEFAULT '',
function TEXT DEFAULT '',
depot TEXT DEFAULT '',
livery TEXT DEFAULT '',
status TEXT DEFAULT '未知',
location TEXT DEFAULT '',
note TEXT DEFAULT '',
raw_json TEXT,
source_sheet TEXT
);
CREATE INDEX IF NOT EXISTS idx_unit_category ON unit(category_id);
CREATE INDEX IF NOT EXISTS idx_unit_car_number ON unit(car_number);
-- 车型族谱关系(父型号/衍生/国外原型),本期预留,后续填充
CREATE TABLE IF NOT EXISTS model_relation (
id INTEGER PRIMARY KEY,
from_model_id INTEGER NOT NULL REFERENCES model(id),
to_model_id INTEGER NOT NULL REFERENCES model(id),
rel_type TEXT NOT NULL, -- parent / derivative / prototype
UNIQUE(from_model_id, to_model_id, rel_type)
);
View File
Binary file not shown.
Binary file not shown.
+109
View File
@@ -0,0 +1,109 @@
"""清洗规则单元测试 — 对应 T-1.2 UT。"""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app.etl import clean as cl
class TestCleanCell(unittest.TestCase):
def test_strip_newlines_and_spaces(self):
self.assertEqual(cl.clean_cell("\n\n\n"), "东风")
self.assertEqual(cl.clean_cell(" HXD1 型 "), "HXD1 型")
def test_none(self):
self.assertEqual(cl.clean_cell(None), "")
def test_is_empty(self):
for v in ["", "-", "——", "", "/", None, ""]:
self.assertTrue(cl.is_empty(v), v)
self.assertFalse(cl.is_empty("现役"))
class TestParseValueUnit(unittest.TestCase):
def test_plain_number(self):
v, u, _ = cl.parse_value_unit("126.0", "t")
self.assertEqual(v, 126.0)
self.assertEqual(u, "t")
def test_with_unit_in_text(self):
v, u, _ = cl.parse_value_unit("400km/h(试验)")
self.assertEqual(v, 400.0)
self.assertEqual(u, "km/h")
def test_speed_with_default_unit(self):
v, u, _ = cl.parse_value_unit("160km/h", "km/h")
self.assertEqual(v, 160.0)
self.assertEqual(u, "km/h")
def test_axle_load_with_paren(self):
v, u, _ = cl.parse_value_unit("23(25)", "t")
self.assertEqual(v, 23.0)
self.assertEqual(u, "t")
def test_composite_takes_first_number(self):
v, u, _ = cl.parse_value_unit("2×92(100)", "t")
self.assertEqual(v, 2.0) # 取第一个数值,原文保真在 raw_json
def test_empty_markers(self):
for raw in ["——", "-", "", "/"]:
v, u, txt = cl.parse_value_unit(raw, "t")
self.assertIsNone(v)
self.assertEqual(u, "")
def test_no_number(self):
v, u, txt = cl.parse_value_unit("交-直-交传动", "")
self.assertIsNone(v)
self.assertEqual(txt, "交-直-交传动")
class TestParseYear(unittest.TestCase):
def test_year_with_char(self):
self.assertEqual(cl.parse_year("1971 年"), 1971)
def test_year_datetime(self):
self.assertEqual(cl.parse_year("2007-12-22 00:00:00"), 2007)
def test_year_plain(self):
self.assertEqual(cl.parse_year("2006"), 2006)
def test_year_dashes(self):
self.assertIsNone(cl.parse_year("——"))
self.assertIsNone(cl.parse_year(""))
class TestNormalizeStatus(unittest.TestCase):
def test_mapping(self):
self.assertEqual(cl.normalize_status("半封存"), "半封存")
self.assertEqual(cl.normalize_status("封存"), "封存")
self.assertEqual(cl.normalize_status("已淘汰"), "退役")
self.assertEqual(cl.normalize_status("样车"), "试验")
self.assertEqual(cl.normalize_status(""), "未知")
class TestInferCountryType(unittest.TestCase):
def test_default_domestic(self):
self.assertEqual(cl.infer_country_type("大连机车车辆厂"), "国产")
def test_import(self):
self.assertEqual(cl.infer_country_type("苏联引进"), "引进仿制")
self.assertEqual(cl.infer_country_type("日本制造"), "进口")
def test_joint(self):
self.assertEqual(cl.infer_country_type("中外合资生产"), "中外合资")
class TestForwardFill(unittest.TestCase):
def test_fill(self):
self.assertEqual(
cl.forward_fill(["东风", "", "", "韶山", ""]),
["东风", "东风", "东风", "韶山", "韶山"])
def test_leading_empty(self):
self.assertEqual(cl.forward_fill(["", "A"]), ["", "A"])
if __name__ == "__main__":
unittest.main()
+44
View File
@@ -0,0 +1,44 @@
"""字段字典/映射单元测试 — 对应 T-1.1 UT。"""
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app.etl import field_dict as fd
class TestHeaderMapping(unittest.TestCase):
def test_basic(self):
self.assertEqual(fd.map_header("型号"), "model_code")
self.assertEqual(fd.map_header("车号"), "car_number")
self.assertEqual(fd.map_header("系列"), "series")
def test_with_spaces_and_newlines(self):
self.assertEqual(fd.map_header("车体长度 /mm"), "length")
self.assertEqual(fd.map_header("牵引力 (起动)/kN"), "tractive_start")
self.assertEqual(fd.map_header("最高运营时速"), "max_speed")
def test_unknown(self):
self.assertIsNone(fd.map_header("不存在的列"))
def test_normalize(self):
self.assertEqual(fd.normalize_header("车体长度 /mm"), "车体长度/mm")
self.assertEqual(fd.normalize_header("\n"), "东风")
class TestConfig(unittest.TestCase):
def test_all_12_sheets_configured(self):
self.assertEqual(len(fd.CATEGORY_CONFIG), 12)
def test_grain_values(self):
for cfg in fd.CATEGORY_CONFIG.values():
self.assertIn(cfg["grain"], ("model", "unit"))
def test_numeric_fields_have_synonyms(self):
for field in fd.NUMERIC_UNIT_FIELDS:
self.assertIn(field, fd.FIELD_SYNONYMS, field)
if __name__ == "__main__":
unittest.main()
+64
View File
@@ -0,0 +1,64 @@
"""导入管线集成/幂等性测试 — 对应 T-1.2 UT/E2E。"""
import os
import sqlite3
import sys
import tempfile
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app.etl import importer
class TestHeaderDetection(unittest.TestCase):
def test_find_header_row_skips_title(self):
rows = [["全国电力机车型号表"], [], ["型号", "首产时间", "停产时间", "生产商"]]
self.assertEqual(importer.find_header_row(rows), 2)
def test_build_column_map(self):
header = ["系列", "型号", "首产时间", "未知列X"]
cmap = importer.build_column_map(header)
self.assertEqual(cmap[0], "series")
self.assertEqual(cmap[1], "model_code")
self.assertEqual(cmap[2], "first_year")
self.assertTrue(cmap[3].startswith("raw::"))
class TestFullImport(unittest.TestCase):
"""跑真实 12 表导入,断言结果与幂等性。"""
def setUp(self):
self.tmp = tempfile.mkdtemp()
self.db = os.path.join(self.tmp, "t.db")
def test_import_and_idempotent(self):
if not os.path.isdir(importer.CSV_DIR):
self.skipTest("CSV 目录不存在")
r1 = importer.import_all(db_path=self.db)
self.assertGreater(r1["models"], 0)
self.assertGreater(r1["units"], 0)
conn = sqlite3.connect(self.db)
n1 = conn.execute("SELECT COUNT(*) FROM model").fetchone()[0]
conn.close()
# 再次导入:应重建而非翻倍(幂等)
importer.import_all(db_path=self.db)
conn = sqlite3.connect(self.db)
n2 = conn.execute("SELECT COUNT(*) FROM model").fetchone()[0]
conn.close()
self.assertEqual(n1, n2)
def test_numeric_split_persisted(self):
if not os.path.isdir(importer.CSV_DIR):
self.skipTest("CSV 目录不存在")
importer.import_all(db_path=self.db)
conn = sqlite3.connect(self.db)
row = conn.execute(
"SELECT max_speed_value, max_speed_unit FROM model "
"WHERE max_speed_value IS NOT NULL LIMIT 1").fetchone()
conn.close()
self.assertIsNotNone(row)
self.assertIsInstance(row[0], float)
if __name__ == "__main__":
unittest.main()