如何制作一个AI Gateway演示示例?

摘要:现在ai盛行,各家api gateway 纷纷实现了ai gateway 的功能,作为强大的卖点 2025 年个人用c#造了个api gateway的轮子VKProxy,现在各项功能或扩展能力齐备,性能也不差,个人时间精力也有限,ai ga
现在ai盛行,各家api gateway 纷纷实现了ai gateway 的功能,作为强大的卖点 2025 年个人用c#造了个api gateway的轮子VKProxy,现在各项功能或扩展能力齐备,性能也不差,个人时间精力也有限,ai gateway 就搞个简单的 demo 做为此轮子篇章的结尾吧 那么AI Gateway有什么作用? AI Gateway(AI 网关)是专门为 AI/LLM 型服务设计的网关层,负责把应用和底层模型提供者(或自建模型)之间的请求做统一管理、路由、识别、控制与观测。它把模型接入、权限、合规、性能优化和成本控制等横切关注点抽象出来,供上层应用以统一、安全和可控的方式调用 AI 功能。 主要作用(要点) 统一接入:屏蔽不同模型/供应商(如 OpenAI、Anthropic、自建模型等)差异,提供统一 API。 路由与模型选择:根据策略(任务类型、成本、延迟、质量)把请求路由到合适模型或并联/串联多个模型。 认证与授权:集中鉴权、API key 管理、细粒度权限控制与多租户隔离。 请求治理:输入校验、敏感信息脱敏、内容审查(安全/合规)、速率限制、配额管理。 成本与流量控制:按模型/客户限额、自动降级到更低成本模型、token 计费监控与预算告警。 性能优化:请求批量/合并、并发控制、缓存常见响应、流式转发、超时与重试策略。 可观测性与审计:统一日志、指标、分布式追踪、行为审计、响应质量监控、训练数据反馈采集。 隐私与数据治理:数据脱敏、数据留存策略、是否回传训练(opt-out)控制、满足合规要求。 流水线与编排:支持 prompt 管理、预处理/后处理、链式调用(chaining)、融合多模型输出(ensembling)。 插件/适配器:接入检索增强生成(RAG)、知识库、数据库、向量搜索等组件。 典型架构组件 前端 API 层:统一 REST / gRPC / WebSocket 接口,处理认证与速率限制。 路由/策略引擎:根据规则选择模型、拆分/合并请求、决定缓存策略。 模型适配器层:与不同模型提供者的 SDK/HTTP 适配器(支持不同协议与速率限制)。 处理流水线:输入校验、脱敏、prompt 模板、后处理、过滤。 缓存与队列:降低重复调用、支持批处理与异步任务。 观测与审计:日志、指标、请求追踪、合规审计存档。 管理面板:配额、策略配置、模型目录、监控告警、权限管理。 常见使用场景 企业多模型策略:按任务切换最优模型(例如摘要使用小模型、法律审查使用高质量模型)。 多租户 SaaS:不同客户隔离、配额与计费。 合规/安全敏感场景:控制数据流向、审计、内容过滤。 成本优化:高峰期自动降级或批处理以减少 token 消耗。 平滑迁移模型供应商:替换底层模型/供应商而不改应用代码。 构建复杂应用流水线:RAG、后处理规则、多模型投票/融合等。 请求在 AI Gateway 中的简化流程(示例) 客户端调用网关 API,携带 token/用户 id。 网关验证权限、检查配额/速率。 输入做脱敏/校验,选择并填充 prompt 模板。 路由引擎决定使用哪个模型(或并行多模型)。 调用模型适配器(支持流式或同步),可能先做缓存查找。 收到模型响应后做后处理(过滤、格式化)、记录日志与成本信息。 返回给客户端并存储审计记录或反馈数据供后续监控/微调。 选择或设计 AI Gateway 时的注意事项 延迟要求:网关本身需尽量低延迟,注意串联多个后处理步骤会增加延时。 可扩展性:设计水平可扩展的路由与限流,防止单点瓶颈。 多协议/流式支持:是否需要支持流式响应、WebSocket、SSE(Server-Sent Events)。 可配置性与策略表达:路由、降级策略、流量分配规则是否够灵活且可实时调整。 安全与合规:数据加密、日志脱敏、数据驻留与擦除策略。 可观测性:记录 token 使用、错误率、延迟、模型质量指标(如生成可信度)。 失败/回退策略:模型不可用时的备用方案、重试策略与幂等性处理。 成本透明度:按模型/请求细粒度计费、告警与报表支持。 何时直接调用模型 vs 使用 AI Gateway 直接调用模型适合:小规模、单一模型的试验或原型开发(更简单、快速)。 使用 AI Gateway 适合:生产环境、多模型、多租户、需要合规/审计、成本或路由策略的场景。 AI Gateway Demo 实现 这里用 VKProxy 作为实现基础, 不过为了演示简单,不做流式响应处理, 所以大家也可以非常简单用 asp.net + client 或 yarp 或任意语言任意框架实现 首先定义 api 作为ai gateway , 我们可以直接统一各个 ai 服务的配置,这样用户无需重复配置各项ai 服务 为了演示简单,这里就演示最简单 统一适配各项ai服务,提供统一api的方式,定义的配置就可以像如下 { "Logging": { "LogLevel": { "Default": "Information" } }, "ServerOptions": { "AddServerHeader": false }, "ReverseProxy": { "ConnectionTimeout": "00:00:01.000", "Listen": { // 服务监听地址 "http": { "Protocols": [ "Http1" ], "Address": [ "127.0.0.1:5001", "[::1]:5001" ] } }, "Routes": { "ai": { // 路由配置,为了简单 我们只做 非流式响应的 chat "Match": { "Hosts": [ "*" ], "Paths": [ "/v1/chat/completions" ] }, "ClusterId": "openai", "Timeout": "00:10:00", "Metadata": { "AiMapping": "{\"openai\":{\"DefaultModel\": \"llama-3.3-70b-versatile\",\"Driver\":\"openai\"},\"deepseek\":{}}" // 这里用最简单的配置 } } }, "Clusters": { // 可复用的ai 服务配置 "deepseek": { "LoadBalancingPolicy": "RoundRobin", "Destinations": [ { "Address": "https://api.deepseek.com" } ] }, "openai": { "LoadBalancingPolicy": "RoundRobin", "Destinations": [ { "Address": "https://api.groq.com/groq" } ] } } } } api 请求格式 现在各家 ai 请求都很统一相似,这里就演示简单的固定结构,这样大家理解也简单,不过实现最好以比较动态的方式实现,这样适配各家差异也比较容易 public class AiRequest { public string? ApiKey { get; set; } // ai 服务key, 这里放请求其实是为了避免个人不小心上传key到 GitHub,至于大家是否需要可自己考虑 public bool? Stream { get; set; } // 是否流式响应,当然这里不会演示,不然就太多了 public string? Provider { get; set; } // 哪家服务 public string? Model { get; set; } // 哪个模型 public List<AiMessage>? Messages { get; set; } // 你想让ai干什么 } public class AiMessage { public string? Role { get; set; } public string? Content { get; set; } } 实现 IAIProvider public interface IAIProvider { Task Request(HttpContext context, AiRequest req, AiMapping ai); } public class OpenAiProvider : IAIProvider { public virtual async Task Request(HttpContext context, AiRequest req, AiMapping ai) { if (req.Model == null) { req.Model = ai.DefaultModel; } context.Request.Headers.Authorization = $"Bearer {req.ApiKey ?? ai.ApiKey}"; // 设置授权,测试的是这家是这样格式,所以为了演示简单,写死了 req.Provider = null; req.ApiKey = null; // 通过json 忽略 多余字段,有些ai 服务商校验比较严格,多一个它没有的字段都不行 var r = new MemoryStream(Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(req, AiGatewayMiddleware.jsonOptions))); context.Request.Body = r; context.Request.ContentLength = r.Length; // 替换body 和 ContentLength // 产线最好严格处理 Accept / Accept-Encoding 等等影响格式的内容,避免压缩等等各种情况带来的解析问题,这里就不做了 } } 实现 Middleware public class AiGatewayMiddleware : IMiddleware { private readonly ILogger<AiGatewayMiddleware> logger; private readonly IConfigSource<IProxyConfig> configSource; private readonly IServiceProvider provider; private readonly ILoadBalancingPolicyFactory loadBalancing; public static readonly JsonSerializerOptions jsonOptions = new JsonSerializerOptions() { DefaultIgnoreCondition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull, PropertyNameCaseInsensitive = true, PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower }; public AiGatewayMiddleware(ILogger<AiGatewayMiddleware> logger, IConfigSource<IProxyConfig> configSource, IServiceProvider provider, ILoadBalancingPolicyFactory loadBalancing) { this.logger = logger; this.configSource = configSource; this.provider = provider; this.loadBalancing = loadBalancing; } public async Task InvokeAsync(HttpContext context, RequestDelegate next) { var feature = context.Features.Get<IReverseProxyFeature>(); if (feature is not null) { var route = feature.Route; if (route is not null && route.Metadata is not null && route.Metadata.TryGetValue("AiMapping", out var b)) // 这里 VKProxy 已经解析过配置,匹配上对应路由了,只需判断是否开启了ai 功能, 如果你用 asp.net core , 则需要自己处理,当然也可以直接用 controller + httpclient 直接写,虽然少了些优化点,不过ai 场景这点性能不太重要了大多 { var req = await context.Request.ReadFromJsonAsync<AiRequest>(context.RequestAborted); // 读取请求 if (req is null) { context.Response.StatusCode = StatusCodes.Status400BadRequest; return; } var mapping = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, AiMapping>>(b); // 读取配置,这里demo 为了简单就不写什么缓存优化了 IAIProvider driver; if (mapping.TryGetValue(req.Provider ?? "openai", out var ai)) // 获取ai 调用实现 { driver = provider.GetKeyedService<IAIProvider>(ai.Driver); } else driver = null; if (driver is null) { context.Response.StatusCode = StatusCodes.Status400BadRequest; await context.Response.WriteAsJsonAsync(new { error = $"Unsupported AI provider: {req.Provider}" }, context.RequestAborted); return; } if (configSource.CurrentSnapshot.Clusters.TryGetValue(ai.Driver, out var cluster)) // 获取 服务地址 { feature.SelectedDestination = loadBalancing.PickDestination(feature, cluster); // 获取有效地址 if (feature.SelectedDestination is null) { context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await context.Response.WriteAsJsonAsync(new { error = $"{req.Provider} service unavailable" }, context.RequestAborted); return; } } await SendAi(context, req, ai, driver); // 处理请求 // 由于VKProxy 再后续已经实现非常高效的代理处理,这里就简单实现复制内容,这样respone 会第一时间就已经送回的 client端,即使后续我们token 记录出问题也不影响用户, 不过要修改就要麻烦点了,所以不在乎性能的话,client 直接call 是最简单的 var origin = context.Response.Body; using var buf = new ResponseCachingStream(origin, 64 * 1024 * 1024 * 10, 81920, () => ValueTask.CompletedTask); // demo这里直接用内存实现的,产线如果也想这么做,推荐 小的用内存,大的用磁盘,或者直接镜像复制发送给其他程序做分析统计 等等 context.Response.Body = buf; await next(context); // 代理处理 // 结果解析,并记录 token 使用, 这里demo 就简单记录 log 了, 真实场景就可以通过log 解析或者 直接存入统计db 等,这样可以达到ai gateway最重要的功能之一: token 统计,找出账单超支的罪魁祸首 var resp = buf.GetCachedResponseBody(); using var u = new MemoryStream(resp.Segments.SelectMany(i => i).ToArray()); var aiResp = JsonSerializer.Deserialize<AiResponse>(u, jsonOptions); if (aiResp?.Usage != null) { logger.LogWarning("AI TotalTokens: {Usage}", aiResp.Usage.TotalTokens); } context.Response.Body = origin; return; } } await next(context); } private async Task SendAi(HttpContext context, AiRequest req, AiMapping ai, IAIProvider driver) { await driver.Request(context, req, ai); } } 除开 SSE , 基本的实现非常简单,其实这里代理实现VKProxy 已经为了性能引入一些复杂度, 如果不在乎性能,比如用httpclient call,你们的写法还可以更简单,懒得自己写,还可以叫ai 代劳 全部代码也放在了 VKProxy AiGatewayDemo