package handler import ( "encoding/json" "net/http" "strconv" "time" "github.com/enterprise-ai-platform/server/internal/middleware" "github.com/enterprise-ai-platform/server/internal/response" "github.com/go-chi/chi/v5" "github.com/jackc/pgx/v5/pgxpool" ) type AdminHandler struct { pool *pgxpool.Pool } func NewAdminHandler(pool *pgxpool.Pool) *AdminHandler { return &AdminHandler{pool: pool} } // getUserOrgID 通过当前登录用户ID查询其所属机构ID,用于多租户数据隔离 func (h *AdminHandler) getUserOrgID(r *http.Request) (string, error) { userID := middleware.GetUserID(r.Context()) var orgID string err := h.pool.QueryRow(r.Context(), `SELECT COALESCE(org_id::text, '') FROM users WHERE id = $1`, userID).Scan(&orgID) return orgID, err } // --- Overview Stats --- func (h *AdminHandler) Overview(w http.ResponseWriter, r *http.Request) { orgID, err := h.getUserOrgID(r) if err != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } now := time.Now() todayStart := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) monthStart := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, now.Location()) var totalUsers, totalApps, activeUsers, todayConversations int var monthlyTokens int64 var monthlyCost float64 h.pool.QueryRow(r.Context(), `SELECT COUNT(*) FROM users WHERE status = 'active' AND org_id = $1`, orgID).Scan(&totalUsers) h.pool.QueryRow(r.Context(), `SELECT COUNT(*) FROM applications WHERE status = 'approved' AND org_id = $1`, orgID).Scan(&totalApps) h.pool.QueryRow(r.Context(), `SELECT COUNT(DISTINCT l.user_id) FROM app_usage_logs l JOIN users u ON l.user_id = u.id WHERE l.created_at >= $1 AND u.org_id = $2`, todayStart, orgID).Scan(&activeUsers) h.pool.QueryRow(r.Context(), `SELECT COUNT(*) FROM app_usage_logs l JOIN users u ON l.user_id = u.id WHERE l.created_at >= $1 AND u.org_id = $2`, todayStart, orgID).Scan(&todayConversations) h.pool.QueryRow(r.Context(), `SELECT COALESCE(SUM(l.total_tokens), 0) FROM app_usage_logs l JOIN users u ON l.user_id = u.id WHERE l.created_at >= $1 AND u.org_id = $2`, monthStart, orgID).Scan(&monthlyTokens) h.pool.QueryRow(r.Context(), `SELECT COALESCE(SUM(l.estimated_cost), 0) FROM app_usage_logs l JOIN users u ON l.user_id = u.id WHERE l.created_at >= $1 AND u.org_id = $2`, monthStart, orgID).Scan(&monthlyCost) response.JSON(w, http.StatusOK, map[string]any{ "total_users": totalUsers, "total_apps": totalApps, "active_users": activeUsers, "total_conversations": todayConversations, "monthly_tokens": monthlyTokens, "monthly_cost": monthlyCost, }) } // --- User Management --- func (h *AdminHandler) ListUsers(w http.ResponseWriter, r *http.Request) { orgID, err := h.getUserOrgID(r) if err != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } q := r.URL.Query() page, _ := strconv.Atoi(q.Get("page")) if page < 1 { page = 1 } pageSize := 20 offset := (page - 1) * pageSize search := q.Get("q") roleFilter := q.Get("role") statusFilter := q.Get("status") query := `SELECT id, name, email, avatar_url, role, status, employee_id, last_login_at, login_count, created_at FROM users WHERE org_id = $1` args := []any{orgID} argIdx := 2 if search != "" { query += ` AND (name ILIKE '%' || $` + strconv.Itoa(argIdx) + ` || '%' OR email ILIKE '%' || $` + strconv.Itoa(argIdx) + ` || '%')` args = append(args, search) argIdx++ } if roleFilter != "" { query += ` AND role = $` + strconv.Itoa(argIdx) args = append(args, roleFilter) argIdx++ } if statusFilter != "" { query += ` AND status = $` + strconv.Itoa(argIdx) args = append(args, statusFilter) argIdx++ } query += ` ORDER BY created_at DESC LIMIT $` + strconv.Itoa(argIdx) + ` OFFSET $` + strconv.Itoa(argIdx+1) args = append(args, pageSize, offset) rows, err := h.pool.Query(r.Context(), query, args...) if err != nil { response.InternalError(w, "查询用户失败") return } defer rows.Close() var users []map[string]any for rows.Next() { var ( id, name, email, role, status string avatarURL, employeeID *string lastLoginAt *time.Time loginCount int createdAt time.Time ) if err := rows.Scan(&id, &name, &email, &avatarURL, &role, &status, &employeeID, &lastLoginAt, &loginCount, &createdAt); err != nil { continue } users = append(users, map[string]any{ "id": id, "name": name, "email": email, "avatar_url": avatarURL, "role": role, "status": status, "employee_id": employeeID, "last_login_at": lastLoginAt, "login_count": loginCount, "created_at": createdAt, }) } if users == nil { users = []map[string]any{} } response.JSON(w, http.StatusOK, map[string]any{"items": users, "page": page}) } func (h *AdminHandler) UpdateUserRole(w http.ResponseWriter, r *http.Request) { userID := chi.URLParam(r, "id") operatorRole := middleware.GetRole(r.Context()) var req struct { Role string `json:"role"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { response.BadRequest(w, "无效的请求格式") return } validRoles := map[string]bool{"user": true, "creator": true, "admin": true, "super_admin": true} if !validRoles[req.Role] { response.BadRequest(w, "无效的角色") return } if req.Role == "super_admin" && operatorRole != "super_admin" { response.Forbidden(w, "只有超级管理员才能设置超级管理员角色") return } // 确保只能修改本机构用户 orgID, orgErr := h.getUserOrgID(r) if orgErr != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } _, err := h.pool.Exec(r.Context(), `UPDATE users SET role = $2 WHERE id = $1 AND org_id = $3`, userID, req.Role, orgID) if err != nil { response.InternalError(w, "更新角色失败") return } response.JSON(w, http.StatusOK, nil) } func (h *AdminHandler) UpdateUserStatus(w http.ResponseWriter, r *http.Request) { userID := chi.URLParam(r, "id") var req struct { Status string `json:"status"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { response.BadRequest(w, "无效的请求格式") return } if req.Status != "active" && req.Status != "disabled" { response.BadRequest(w, "无效的状态") return } // 确保只能修改本机构用户 orgID, orgErr := h.getUserOrgID(r) if orgErr != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } _, err := h.pool.Exec(r.Context(), `UPDATE users SET status = $2 WHERE id = $1 AND org_id = $3`, userID, req.Status, orgID) if err != nil { response.InternalError(w, "更新状态失败") return } response.JSON(w, http.StatusOK, nil) } // --- App Management --- func (h *AdminHandler) ListAllApps(w http.ResponseWriter, r *http.Request) { orgID, err := h.getUserOrgID(r) if err != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } q := r.URL.Query() page, _ := strconv.Atoi(q.Get("page")) if page < 1 { page = 1 } offset := (page - 1) * 20 statusFilter := q.Get("status") query := `SELECT a.id, a.name, a.slug, a.description, a.icon_url, c.name as category_name, u.name as creator_name, a.dify_app_type, a.status, a.visibility, a.usage_count, a.created_at FROM applications a LEFT JOIN categories c ON a.category_id = c.id LEFT JOIN users u ON a.creator_id = u.id WHERE a.org_id = $1` args := []any{orgID} argIdx := 2 if statusFilter != "" { query += ` AND a.status = $` + strconv.Itoa(argIdx) args = append(args, statusFilter) argIdx++ } query += ` ORDER BY a.created_at DESC LIMIT $` + strconv.Itoa(argIdx) + ` OFFSET $` + strconv.Itoa(argIdx+1) args = append(args, 20, offset) rows, err := h.pool.Query(r.Context(), query, args...) if err != nil { response.InternalError(w, "查询应用失败") return } defer rows.Close() var apps []map[string]any for rows.Next() { var ( id, name, slug, status, visibility string desc, iconURL, catName, creator *string appType *string usageCount int64 createdAt time.Time ) if err := rows.Scan(&id, &name, &slug, &desc, &iconURL, &catName, &creator, &appType, &status, &visibility, &usageCount, &createdAt); err != nil { continue } apps = append(apps, map[string]any{ "id": id, "name": name, "slug": slug, "description": desc, "icon_url": iconURL, "category_name": catName, "creator_name": creator, "dify_app_type": appType, "status": status, "visibility": visibility, "usage_count": usageCount, "created_at": createdAt, }) } if apps == nil { apps = []map[string]any{} } response.JSON(w, http.StatusOK, map[string]any{"items": apps, "page": page}) } // --- Audit Logs --- func (h *AdminHandler) ListAuditLogs(w http.ResponseWriter, r *http.Request) { orgID, orgErr := h.getUserOrgID(r) if orgErr != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } q := r.URL.Query() page, _ := strconv.Atoi(q.Get("page")) if page < 1 { page = 1 } offset := (page - 1) * 50 rows, err := h.pool.Query(r.Context(), ` SELECT al.id, al.action, al.resource_type, al.resource_id, al.details, al.ip_address, al.created_at, u.name as user_name FROM audit_logs al LEFT JOIN users u ON al.user_id = u.id WHERE u.org_id = $2 ORDER BY al.created_at DESC LIMIT 50 OFFSET $1`, offset, orgID) if err != nil { response.InternalError(w, "查询审计日志失败") return } defer rows.Close() var logs []map[string]any for rows.Next() { var ( id, action, resType string resID *string details json.RawMessage ipAddr *string createdAt time.Time userName *string ) if err := rows.Scan(&id, &action, &resType, &resID, &details, &ipAddr, &createdAt, &userName); err != nil { continue } logs = append(logs, map[string]any{ "id": id, "action": action, "resource_type": resType, "resource_id": resID, "details": details, "ip_address": ipAddr, "created_at": createdAt, "user_name": userName, }) } if logs == nil { logs = []map[string]any{} } response.JSON(w, http.StatusOK, map[string]any{"items": logs, "page": page}) } // --- Usage Analytics --- func (h *AdminHandler) UsageAnalytics(w http.ResponseWriter, r *http.Request) { orgID, orgErr := h.getUserOrgID(r) if orgErr != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } days := 7 if d, err := strconv.Atoi(r.URL.Query().Get("days")); err == nil && d > 0 && d <= 90 { days = d } rows, err := h.pool.Query(r.Context(), ` SELECT DATE(l.created_at) as day, COUNT(*) as count, COALESCE(SUM(l.total_tokens), 0) as tokens FROM app_usage_logs l JOIN users u ON l.user_id = u.id WHERE l.created_at >= NOW() - $1::interval AND u.org_id = $2 GROUP BY DATE(l.created_at) ORDER BY day`, strconv.Itoa(days)+" days", orgID) if err != nil { response.InternalError(w, "查询使用统计失败") return } defer rows.Close() var dailyStats []map[string]any for rows.Next() { var ( day time.Time count int tokens int64 ) if err := rows.Scan(&day, &count, &tokens); err != nil { continue } dailyStats = append(dailyStats, map[string]any{ "date": day.Format("2006-01-02"), "count": count, "total_tokens": tokens, }) } if dailyStats == nil { dailyStats = []map[string]any{} } // Top apps topRows, err := h.pool.Query(r.Context(), ` SELECT a.name, COUNT(l.id) as usage_count FROM app_usage_logs l JOIN applications a ON l.app_id = a.id WHERE l.created_at >= NOW() - $1::interval AND a.org_id = $2 GROUP BY a.name ORDER BY usage_count DESC LIMIT 10`, strconv.Itoa(days)+" days", orgID) if err != nil { response.JSON(w, http.StatusOK, map[string]any{"daily": dailyStats}) return } defer topRows.Close() var topApps []map[string]any for topRows.Next() { var name string var count int if err := topRows.Scan(&name, &count); err != nil { continue } topApps = append(topApps, map[string]any{"name": name, "count": count}) } if topApps == nil { topApps = []map[string]any{} } response.JSON(w, http.StatusOK, map[string]any{ "daily": dailyStats, "top_apps": topApps, }) } // --- Review Management --- func (h *AdminHandler) ListPendingReviews(w http.ResponseWriter, r *http.Request) { orgID, orgErr := h.getUserOrgID(r) if orgErr != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } rows, err := h.pool.Query(r.Context(), ` SELECT r.id, r.app_id, r.version, r.submit_comment, r.submitted_at, a.name as app_name, a.description as app_description, a.icon_url, u.name as submitter_name FROM app_reviews r JOIN applications a ON r.app_id = a.id JOIN users u ON r.submitter_id = u.id WHERE r.status = 'pending' AND a.org_id = $1 ORDER BY r.submitted_at ASC LIMIT 50`, orgID) if err != nil { response.InternalError(w, "查询审核列表失败") return } defer rows.Close() var reviews []map[string]any for rows.Next() { var ( id, appID, version string comment *string submittedAt time.Time appName, appDesc *string appIcon *string submitterName string ) if err := rows.Scan(&id, &appID, &version, &comment, &submittedAt, &appName, &appDesc, &appIcon, &submitterName); err != nil { continue } reviews = append(reviews, map[string]any{ "id": id, "app_id": appID, "version": version, "submit_comment": comment, "submitted_at": submittedAt, "app_name": appName, "app_description": appDesc, "app_icon": appIcon, "submitter_name": submitterName, }) } if reviews == nil { reviews = []map[string]any{} } response.JSON(w, http.StatusOK, reviews) } func (h *AdminHandler) ApproveReview(w http.ResponseWriter, r *http.Request) { reviewID := chi.URLParam(r, "id") reviewerID := middleware.GetUserID(r.Context()) var req struct { Comment string `json:"comment"` } json.NewDecoder(r.Body).Decode(&req) // Get app_id from review var appID string err := h.pool.QueryRow(r.Context(), `SELECT app_id FROM app_reviews WHERE id = $1 AND status = 'pending'`, reviewID).Scan(&appID) if err != nil { response.NotFound(w, "审核记录不存在或已处理") return } tx, err := h.pool.Begin(r.Context()) if err != nil { response.InternalError(w, "事务开始失败") return } defer tx.Rollback(r.Context()) tx.Exec(r.Context(), ` UPDATE app_reviews SET status = 'approved', reviewer_id = $2, review_comment = $3, reviewed_at = NOW() WHERE id = $1`, reviewID, reviewerID, req.Comment) tx.Exec(r.Context(), ` UPDATE applications SET status = 'approved', published_at = NOW() WHERE id = $1`, appID) if err := tx.Commit(r.Context()); err != nil { response.InternalError(w, "审核通过失败") return } response.JSON(w, http.StatusOK, nil) } func (h *AdminHandler) RejectReview(w http.ResponseWriter, r *http.Request) { reviewID := chi.URLParam(r, "id") reviewerID := middleware.GetUserID(r.Context()) var req struct { Comment string `json:"comment"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Comment == "" { response.BadRequest(w, "驳回必须填写原因") return } var appID string err := h.pool.QueryRow(r.Context(), `SELECT app_id FROM app_reviews WHERE id = $1 AND status = 'pending'`, reviewID).Scan(&appID) if err != nil { response.NotFound(w, "审核记录不存在或已处理") return } tx, err := h.pool.Begin(r.Context()) if err != nil { response.InternalError(w, "事务开始失败") return } defer tx.Rollback(r.Context()) tx.Exec(r.Context(), ` UPDATE app_reviews SET status = 'rejected', reviewer_id = $2, review_comment = $3, reviewed_at = NOW() WHERE id = $1`, reviewID, reviewerID, req.Comment) tx.Exec(r.Context(), ` UPDATE applications SET status = 'rejected' WHERE id = $1`, appID) if err := tx.Commit(r.Context()); err != nil { response.InternalError(w, "驳回失败") return } response.JSON(w, http.StatusOK, nil) } func (h *AdminHandler) DelistApp(w http.ResponseWriter, r *http.Request) { orgID, orgErr := h.getUserOrgID(r) if orgErr != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } appID := chi.URLParam(r, "id") var status string err := h.pool.QueryRow(r.Context(), `SELECT status FROM applications WHERE id = $1 AND org_id = $2`, appID, orgID).Scan(&status) if err != nil { response.NotFound(w, "应用不存在") return } if status != "approved" { response.BadRequest(w, "只有已上架的应用可以撤架") return } _, err = h.pool.Exec(r.Context(), `UPDATE applications SET status = 'archived' WHERE id = $1`, appID) if err != nil { response.InternalError(w, "撤架失败") return } response.JSON(w, http.StatusOK, map[string]string{"message": "已撤架"}) } func (h *AdminHandler) RelistApp(w http.ResponseWriter, r *http.Request) { orgID, orgErr := h.getUserOrgID(r) if orgErr != nil || orgID == "" { response.InternalError(w, "无法确定当前机构") return } appID := chi.URLParam(r, "id") var status string err := h.pool.QueryRow(r.Context(), `SELECT status FROM applications WHERE id = $1 AND org_id = $2`, appID, orgID).Scan(&status) if err != nil { response.NotFound(w, "应用不存在") return } if status != "archived" { response.BadRequest(w, "只有已归档的应用可以重新上架") return } _, err = h.pool.Exec(r.Context(), `UPDATE applications SET status = 'approved', published_at = NOW() WHERE id = $1`, appID) if err != nil { response.InternalError(w, "重新上架失败") return } response.JSON(w, http.StatusOK, map[string]string{"message": "已重新上架"}) }