kilocode2api

继上一篇文章后续

import json
import time
import uuid
import threading
import base64
from typing import Any, AsyncGenerator, Dict, Union, List, Optional

import httpx
import uvicorn
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field

# Configuration
CONVERSATION_CACHE_MAX_SIZE = 100
DEFAULT_REQUEST_TIMEOUT = 30.0

# Global variables
VALID_CLIENT_KEYS: set = set()
JETBRAINS_JWTS: list = []
current_jwt_index: int = 0
jwt_rotation_lock = threading.Lock()
models_data: Dict[str, Any] = {}
http_client: Optional[httpx.AsyncClient] = None


# Pydantic Models
class ChatMessage(BaseModel):
    role: str
    content: str


# 定义内容项的结构
class ContentItem(BaseModel):
    type: str
    text: str
    cache_control: Optional[Dict[str, str]] = None

# 定义聊天消息的结构
class ChatMessage(BaseModel):
    role: str
    content: Union[str, List[ContentItem]]

# 定义流选项的结构
class StreamOptions(BaseModel):
    include_usage: bool = False

# 定义推理选项的结构
class ReasoningOptions(BaseModel):
    max_tokens: int

# 修改后的 ChatCompletionRequest 类
class ChatCompletionRequest(BaseModel):
    model: str
    messages: List[ChatMessage]
    stream: bool = False
    temperature: Optional[float] = None
    max_tokens: Optional[int] = None
    top_p: Optional[float] = None
    stream_options: Optional[StreamOptions] = None
    transforms: Optional[List[str]] = None
    reasoning: Optional[ReasoningOptions] = None


class ModelInfo(BaseModel):
    id: str
    object: str = "model"
    created: int
    owned_by: str


class ModelList(BaseModel):
    object: str = "list"
    data: List[ModelInfo]


class ChatCompletionChoice(BaseModel):
    message: ChatMessage
    index: int = 0
    finish_reason: str = "stop"


class ChatCompletionResponse(BaseModel):
    id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
    object: str = "chat.completion"
    created: int = Field(default_factory=lambda: int(time.time()))
    model: str
    choices: List[ChatCompletionChoice]
    usage: Dict[str, int] = Field(
        default_factory=lambda: {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})


class StreamChoice(BaseModel):
    delta: Dict[str, Any] = Field(default_factory=dict)
    index: int = 0
    finish_reason: Optional[str] = None


class StreamResponse(BaseModel):
    id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
    object: str = "chat.completion.chunk"
    created: int = Field(default_factory=lambda: int(time.time()))
    model: str
    choices: List[StreamChoice]


# FastAPI App
app = FastAPI(title="JetBrains AI OpenAI Compatible API")
security = HTTPBearer(auto_error=False)


# Helper functions
def decode_jwt_payload(jwt_token: str) -> Dict[str, Any]:
    """解码JWT载荷(不验证签名)"""
    try:
        # JWT格式: header.payload.signature
        parts = jwt_token.split('.')
        if len(parts) != 3:
            return {}
        
        # 解码payload部分
        payload = parts[1]
        # 添加填充以确保base64解码正确
        payload += '=' * (4 - len(payload) % 4)
        decoded_bytes = base64.urlsafe_b64decode(payload)
        return json.loads(decoded_bytes.decode('utf-8'))
    except Exception as e:
        print(f"解码JWT时出错: {e}")
        return {}

def is_jwt_expired(jwt_token: str) -> bool:
    """检查JWT是否过期"""
    payload = decode_jwt_payload(jwt_token)
    if 'exp' not in payload:
        return True
    
    exp_timestamp = payload['exp']
    current_timestamp = int(time.time())
    return current_timestamp >= exp_timestamp

def load_models():
    """加载模型配置"""
    try:
        with open("models.json", "r", encoding="utf-8") as f:
            model_ids = json.load(f)

        processed_models = []
        if isinstance(model_ids, list):
            for model_id in model_ids:
                if isinstance(model_id, str):
                    processed_models.append({
                        "id": model_id,
                        "object": "model",
                        "created": int(time.time()),
                        "owned_by": "jetbrains-ai"
                    })

        return {"data": processed_models}
    except Exception as e:
        print(f"加载 models.json 时出错: {e}")
        return {"data": []}


def load_client_api_keys():
    """加载客户端 API 密钥"""
    global VALID_CLIENT_KEYS
    try:
        with open("client_api_keys.json", "r", encoding="utf-8") as f:
            keys = json.load(f)
            if not isinstance(keys, list):
                print("警告: client_api_keys.json 应包含密钥列表")
                VALID_CLIENT_KEYS = set()
                return
            VALID_CLIENT_KEYS = set(keys)
            if not VALID_CLIENT_KEYS:
                print("警告: client_api_keys.json 为空")
            else:
                print(f"成功加载 {len(VALID_CLIENT_KEYS)} 个客户端 API 密钥")
    except FileNotFoundError:
        print("错误: 未找到 client_api_keys.json")
        VALID_CLIENT_KEYS = set()
    except Exception as e:
        print(f"加载 client_api_keys.json 时出错: {e}")
        VALID_CLIENT_KEYS = set()


def load_jetbrains_jwts():
    """加载 JetBrains AI 认证 JWT"""
    global JETBRAINS_JWTS
    try:
        with open("kilocode.json", "r", encoding="utf-8") as f:
            # 假设 kilocode.json 包含一个对象列表,每个对象都有 'jwt' 键
            jwt_data = json.load(f)
            if isinstance(jwt_data, list):
                valid_jwts = []
                for i, item in enumerate(jwt_data):
                    if "jwt" in item:
                        jwt_token = item["jwt"]
                        payload = decode_jwt_payload(jwt_token)
                        
                        if payload:
                            exp_timestamp = payload.get('exp', 0)
                            exp_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(exp_timestamp))
                            current_time = int(time.time())
                            
                            print(f"JWT #{i+1}:")
                            print(f"  过期时间: {exp_time}")
                            print(f"  用户ID: {payload.get('kiloUserId', 'unknown')}")
                            
                            if is_jwt_expired(jwt_token):
                                print(f"  状态: ❌ 已过期")
                            else:
                                print(f"  状态: ✅ 有效")
                                valid_jwts.append(jwt_token)
                        else:
                            print(f"JWT #{i+1}: ❌ 无法解码")
                
                JETBRAINS_JWTS = valid_jwts

        if not JETBRAINS_JWTS:
            print("❌ 警告: 没有有效的JWT令牌!所有JWT可能都已过期或无效。")
        else:
            print(f"✅ 成功加载 {len(JETBRAINS_JWTS)} 个有效的 JetBrains AI JWT")

    except FileNotFoundError:
        print("错误: 未找到 kilocode.json 文件")
        JETBRAINS_JWTS = []
    except Exception as e:
        print(f"加载 kilocode.json 时出错: {e}")
        JETBRAINS_JWTS = []


def get_model_item(model_id: str) -> Optional[Dict]:
    """根据模型ID获取模型配置"""
    for model in models_data.get("data", []):
        if model.get("id") == model_id:
            return model
    return None


async def authenticate_client(auth: Optional[HTTPAuthorizationCredentials] = Depends(security)):
    """客户端认证"""
    if not VALID_CLIENT_KEYS:
        raise HTTPException(status_code=503, detail="服务不可用: 未配置客户端 API 密钥")

    if not auth or not auth.credentials:
        raise HTTPException(
            status_code=401,
            detail="需要在 Authorization header 中提供 API 密钥",
            headers={"WWW-Authenticate": "Bearer"},
        )

    if auth.credentials not in VALID_CLIENT_KEYS:
        raise HTTPException(status_code=403, detail="无效的客户端 API 密钥")


def get_next_jetbrains_jwt() -> str:
    """轮询获取下一个 JetBrains JWT"""
    global current_jwt_index

    if not JETBRAINS_JWTS:
        raise HTTPException(
            status_code=503, 
            detail="服务不可用: 未配置有效的JetBrains JWT令牌。请检查kilocode.json文件,确保JWT未过期。"
        )

    with jwt_rotation_lock:
        if not JETBRAINS_JWTS:
            raise HTTPException(
                status_code=503, 
                detail="服务不可用: JetBrains JWT 不可用"
            )
        
        token_to_use = JETBRAINS_JWTS[current_jwt_index]
        
        # 检查即将使用的JWT是否已过期
        if is_jwt_expired(token_to_use):
            print(f"警告: 当前JWT #{current_jwt_index + 1} 已过期,但仍在使用中")
        
        current_jwt_index = (current_jwt_index + 1) % len(JETBRAINS_JWTS)
    
    return token_to_use


# FastAPI 生命周期事件
@app.on_event("startup")
async def startup():
    global models_data, http_client
    models_data = load_models()
    load_client_api_keys()
    load_jetbrains_jwts()
    http_client = httpx.AsyncClient(timeout=None)
    print("JetBrains AI OpenAI Compatible API 服务器已启动")


@app.on_event("shutdown")
async def shutdown():
    global http_client
    if http_client:
        await http_client.aclose()


# API 端点
@app.get("/v1/models", response_model=ModelList)
async def list_models(_: None = Depends(authenticate_client)):
    """列出可用模型"""
    model_list = []
    for model in models_data.get("data", []):
        model_list.append(ModelInfo(
            id=model.get("id", ""),
            created=model.get("created", int(time.time())),
            owned_by=model.get("owned_by", "jetbrains-ai")
        ))
    return ModelList(data=model_list)


async def openai_stream_adapter(
        api_stream_generator: AsyncGenerator[str, None],
        model_name: str
) -> AsyncGenerator[str, None]:
    """将 API 的流转换为 OpenAI 格式的 SSE"""
    stream_id = f"chatcmpl-{uuid.uuid4().hex}"
    first_chunk_sent = False

    try:
        async for line in api_stream_generator:
            if not line or line.strip() == "":
                continue

            # 跳过处理标识行
            if line.startswith(': OPENROUTER PROCESSING'):
                continue

            # 处理结束标志
            if line.strip() == "data: [DONE]":
                yield "data: [DONE]\n\n"
                break

            if line.startswith('data: '):
                try:
                    json_str = line[6:].strip()
                    if not json_str or json_str == "[DONE]":
                        continue

                    data = json.loads(json_str)

                    # 检查是否有choices数组
                    if "choices" not in data or not data["choices"]:
                        continue

                    choice = data["choices"][0]
                    delta = choice.get("delta", {})

                    # 处理内容
                    content = ""
                    if "content" in delta and delta["content"]:
                        content = delta["content"]
                    elif "reasoning" in delta and delta["reasoning"]:
                        # 如果有推理内容,也可以选择包含或跳过
                        # content = delta["reasoning"]  # 如果要显示推理过程
                        continue  # 跳过推理内容,只显示最终回答

                    # 构建响应
                    if content or choice.get("finish_reason"):
                        delta_payload = {}

                        if not first_chunk_sent and content:
                            delta_payload = {"role": "assistant", "content": content}
                            first_chunk_sent = True
                        elif content:
                            delta_payload = {"content": content}

                        # 处理结束标志
                        finish_reason = choice.get("finish_reason")

                        stream_resp = StreamResponse(
                            id=stream_id,
                            model=model_name,
                            choices=[StreamChoice(
                                delta=delta_payload,
                                index=0,
                                finish_reason=finish_reason
                            )]
                        )
                        yield f"data: {stream_resp.model_dump_json()}\n\n"

                        # 如果已结束,跳出循环
                        if finish_reason:
                            break

                except json.JSONDecodeError as je:
                    print(f"警告: 无法解析的 JSON 行: {line}, 错误: {je}")
                    continue
                except Exception as e:
                    print(f"处理数据行时出错: {line}, 错误: {e}")
                    continue

        # 确保发送结束标志
        yield "data: [DONE]\n\n"

    except Exception as e:
        print(f"流式适配器错误: {e}")
        error_resp = StreamResponse(
            id=stream_id,
            model=model_name,
            choices=[StreamChoice(
                delta={"role": "assistant", "content": f"内部错误: {str(e)}"},
                index=0,
                finish_reason="stop"
            )]
        )
        yield f"data: {error_resp.json()}\n\n"
        yield "data: [DONE]\n\n"


async def aggregate_stream_for_non_stream_response(
        openai_sse_stream: AsyncGenerator[str, None],
        model_name: str
) -> ChatCompletionResponse:
    """聚合流式响应为完整响应"""
    content_parts = []

    async for sse_line in openai_sse_stream:
        if sse_line.startswith("data: ") and sse_line.strip() != "data: [DONE]":
            try:
                data = json.loads(sse_line[6:].strip())
                if data.get("choices") and len(data["choices"]) > 0:
                    delta = data["choices"][0].get("delta", {})
                    if "content" in delta:
                        content_parts.append(delta["content"])
            except:
                pass

    full_content = "".join(content_parts)

    return ChatCompletionResponse(
        model=model_name,
        choices=[ChatCompletionChoice(
            message=ChatMessage(role="assistant", content=full_content),
            finish_reason="stop"
        )]
    )


@app.post("/v1/chat/completions")
async def chat_completions(
    request: ChatCompletionRequest,
    _: None = Depends(authenticate_client)
):
    """创建聊天完成"""
    model_config = get_model_item(request.model)
    if not model_config:
        raise HTTPException(status_code=404, detail=f"模型 {request.model} 未找到")

    auth_token = get_next_jetbrains_jwt()

    # 将 OpenAI 格式的消息转换为 KiloCode 格式
    processed_messages = []
    for msg in request.messages:
        # 处理复杂消息格式 (包含 type/text 结构)
        if isinstance(msg.content, list):
            processed_content = []
            for content_item in msg.content:
                if isinstance(content_item, dict) and "type" in content_item and "text" in content_item:
                    processed_content.append(content_item)
            processed_messages.append({
                "role": msg.role,
                "content": processed_content
            })
        # 处理简单的字符串消息,转换为指定格式
        else:
            processed_messages.append({
                "role": msg.role,
                "content": [
                    {
                        "type": "text",
                        "text": msg.content
                    }
                ]
            })
    # 创建 API 请求的 payload
    payload = {
        "model": request.model,
        "max_tokens": request.max_tokens or 16384,
        "temperature": request.temperature or 1,
        "messages": processed_messages,
        "stream": request.stream
    }

    headers = {
        "User-Agent": "_n/JS 5.5.1",
        "X-Title": "Kilo Code",
        "X-KiloCode-Version": "4.49.2",
        "Accept": "application/json",
        "Content-Type": "application/json",
        "Accept-Charset": "UTF-8",
        "Cache-Control": "no-cache",
        "HTTP-Referer": 'https://kilocode.ai',
        "authorization": f"Bearer {auth_token}",
    }

    async def api_stream_generator():
        """一个包装 httpx 请求的异步生成器"""
        try:
            print(f"正在请求 kilocode.ai API...")
            print(f"使用的JWT令牌前20个字符: {auth_token[:20]}...")
            print(f"请求模型: {request.model}")
            print(f"请求头: {headers}")
            
            async with http_client.stream("POST", "https://kilocode.ai/api/openrouter/chat/completions",
                                        json=payload, headers=headers, timeout=300) as response:
                print(f"收到响应状态码: {response.status_code}")
                
                if response.status_code == 403:
                    response_text = await response.aread()
                    print(f"403错误响应内容: {response_text.decode()}")
                    raise HTTPException(
                        status_code=403, 
                        detail=f"KiloCode API认证失败 (403): JWT令牌可能无效或已过期。响应: {response_text.decode()[:200]}"
                    )
                elif response.status_code == 401:
                    response_text = await response.aread()
                    print(f"401错误响应内容: {response_text.decode()}")
                    raise HTTPException(
                        status_code=401, 
                        detail=f"KiloCode API认证失败 (401): {response_text.decode()[:200]}"
                    )
                
                response.raise_for_status()
                async for line in response.aiter_lines():
                    yield line
        except HTTPException:
            # 重新抛出已经处理过的HTTP异常
            raise
        except httpx.HTTPStatusError as e:
            print(f"HTTP状态错误: {e.response.status_code}")
            print(f"错误响应: {e.response.text}")
            raise HTTPException(
                status_code=502, 
                detail=f"KiloCode API请求失败 (状态码 {e.response.status_code}): {e.response.text[:200]}"
            )
        except Exception as e:
            print(f"KiloCode API 请求错误: {e}")
            raise HTTPException(status_code=502, detail=f"KiloCode API 请求失败: {str(e)}")

    # 创建 OpenAI 格式的流
    openai_sse_stream = openai_stream_adapter(
        api_stream_generator(),
        request.model
    )

    # 返回流式或非流式响应
    if request.stream:
        return StreamingResponse(
            openai_sse_stream,
            media_type="text/event-stream"
        )
    else:
        return await aggregate_stream_for_non_stream_response(
            openai_sse_stream,
            request.model
        )



# 主程序入口
if __name__ == "__main__":
    import os

    # 创建示例配置文件(如果不存在)
    if not os.path.exists("client_api_keys.json"):
        with open("client_api_keys.json", "w", encoding="utf-8") as f:
            json.dump(["sk-your-custom-key-here"], f, indent=2)
        print("已创建示例 client_api_keys.json 文件")

    if not os.path.exists("kilocode.json"):
        with open("kilocode.json", "w", encoding="utf-8") as f:
            json.dump([{"jwt": "your-jwt-here"}], f, indent=2)
        print("已创建示例 kilocode.json 文件")

    if not os.path.exists("models.json"):
        with open("models.json", "w", encoding="utf-8") as f:
            json.dump(["anthropic-claude-3.5-sonnet"], f, indent=2)
        print("已创建示例 models.json 文件")

    print("正在启动 JetBrains AI OpenAI Compatible API 服务器...")
    print("端点:")
    print("  GET  /v1/models")
    print("  POST /v1/chat/completions")
    print("\n在 Authorization header 中使用客户端 API 密钥 (Bearer sk-xxx)")

    uvicorn.run(app, host="0.0.0.0", port=8001)

apikey 获取连接 页面底部:Kilo Code - Open source AI agent VS Code extension

[
	"google/gemini-2.5-pro-preview",
	"anthropic/claude-sonnet-4",
	"anthropic/claude-3.7-sonnet",
	"anthropic/claude-3.7-sonnet:thinking",
	"openai/gpt-4.1",
	"google/gemini-2.5-flash-lite-preview-06-17",
	"google/gemini-2.5-flash",
	"google/gemini-2.5-pro",
	"openai/o3-pro",
	"x-ai/grok-3-mini",
	"x-ai/grok-3",
	"deepseek/deepseek-r1-0528",
	"anthropic/claude-opus-4",
	"google/gemini-2.5-flash-preview-05-20",
	"google/gemini-2.5-flash-preview-05-20:thinking",
	"google/gemini-2.5-pro-preview-05-06",
	"google/gemini-2.5-flash-preview:thinking",
	"openai/o4-mini-high",
	"openai/o3",
	"openai/o4-mini",
	"openai/gpt-4.1-mini",
	"openai/gpt-4.1-nano",
	"x-ai/grok-3-mini-beta",
	"x-ai/grok-3-beta",
	"deepseek/deepseek-chat-v3-0324",
	"openai/o1-pro",
	"openai/gpt-4.5-preview",
	"google/gemini-2.0-flash-lite-001",
	"anthropic/claude-3.7-sonnet:beta",
	"openai/o3-mini-high",
	"openai/o3-mini",
	"deepseek/deepseek-r1:free",
	"deepseek/deepseek-r1",
	"deepseek/deepseek-chat:free",
	"deepseek/deepseek-chat",
	"openai/o1",
	"openai/gpt-4o-2024-11-20",
	"x-ai/grok-vision-beta",
	"anthropic/claude-3.5-haiku-20241022",
	"anthropic/claude-3.5-haiku",
	"anthropic/claude-3.5-sonnet",
	"x-ai/grok-beta",
	"openai/o1-mini",
	"openai/o1-preview-2024-09-12",
	"openai/o1-mini-2024-09-12",
	"openai/o1-preview",
	"openai/chatgpt-4o-latest",
	"openai/gpt-4o-mini-2024-07-18",
	"openai/gpt-4o-mini",
	"anthropic/claude-3.5-sonnet-20240620",
	"openai/gpt-4o",
	"openai/gpt-4o:extended",
	"openai/gpt-4o-2024-05-13",
	"openai/gpt-4-turbo",
	"google/gemini-pro-1.5",
	"anthropic/claude-3-haiku",
	"anthropic/claude-3-sonnet",
	"anthropic/claude-3-opus",
	"openai/gpt-3.5-turbo-0613",
	"openai/gpt-4-turbo-preview",
	"openai/gpt-4-1106-preview",
	"openai/gpt-3.5-turbo-instruct",
	"openai/gpt-4",
	"openai/gpt-4-0314"
]

models.json

2 个赞

感谢技术大佬

不错不过,主流的几乎都接入了

:xhj003:

支持大佬,666