Delete tests directory

This commit is contained in:
Genz
2026-03-30 11:05:12 +08:00
committed by GitHub
parent 03acb4484d
commit 671a40eece
8 changed files with 0 additions and 794 deletions

View File

@@ -1,60 +0,0 @@
from pathlib import Path
import sys
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from src.api import routes
from src.core.auth import verify_api_key_flexible
class FakeGenerationHandler:
def __init__(self):
self.calls = []
self.file_cache = None
self.non_stream_chunks = []
self.stream_chunks = []
async def handle_generation(self, model, prompt, images=None, stream=False):
self.calls.append(
{
"model": model,
"prompt": prompt,
"images": images,
"stream": stream,
}
)
chunks = self.stream_chunks if stream else self.non_stream_chunks
for chunk in chunks:
yield chunk
@pytest.fixture
def fake_handler():
return FakeGenerationHandler()
@pytest.fixture
def fastapi_app(fake_handler):
app = FastAPI()
app.include_router(routes.router)
async def fake_auth():
return "test-api-key"
app.dependency_overrides[verify_api_key_flexible] = fake_auth
routes.set_generation_handler(fake_handler)
yield app
app.dependency_overrides.clear()
routes.set_generation_handler(None)
@pytest.fixture
def client(fastapi_app):
with TestClient(fastapi_app) as test_client:
yield test_client

View File

@@ -1,150 +0,0 @@
import asyncio
import json
from src.api import admin as admin_module
from src.api import routes
from src.core.auth import AuthManager, verify_api_key_flexible
def build_openai_completion(content: str) -> str:
return json.dumps(
{
"id": "chatcmpl-test",
"object": "chat.completion",
"created": 1,
"model": "flow2api",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": content,
},
"finish_reason": "stop",
}
],
}
)
def test_openai_route_resolves_alias_and_returns_non_stream_result(client, fake_handler):
fake_handler.non_stream_chunks = [build_openai_completion("![Generated Image](https://example.com/out.png)")]
response = client.post(
"/v1/chat/completions",
json={
"model": "gemini-3.0-pro-image",
"messages": [{"role": "user", "content": "draw a sunset"}],
"generationConfig": {
"imageConfig": {
"aspectRatio": "16:9",
"imageSize": "2K",
}
},
},
)
assert response.status_code == 200
assert fake_handler.calls[0]["model"] == "gemini-3.0-pro-image-landscape-2k"
assert response.json()["choices"][0]["message"]["content"].startswith("![Generated Image]")
def test_openai_route_returns_handler_error_status(client, fake_handler):
fake_handler.non_stream_chunks = [
json.dumps(
{
"error": {
"message": "没有可用的Token进行图片生成",
"status_code": 503,
}
}
)
]
response = client.post(
"/v1/chat/completions",
json={
"model": "gemini-3.0-pro-image",
"messages": [{"role": "user", "content": "draw a tree"}],
},
)
assert response.status_code == 503
assert response.json()["error"]["message"] == "没有可用的Token进行图片生成"
def test_flexible_auth_accepts_x_goog_api_key(monkeypatch):
monkeypatch.setattr(AuthManager, "verify_api_key", staticmethod(lambda api_key: api_key == "secret"))
assert asyncio.run(
verify_api_key_flexible(
credentials=None,
x_goog_api_key="secret",
key=None,
)
) == "secret"
def test_admin_remote_browser_helper_uses_httpx(monkeypatch):
calls = []
class FakeResponse:
status_code = 200
text = '{"success": true, "token": "abc"}'
def json(self):
return {"success": True, "token": "abc"}
class FakeAsyncClient:
def __init__(self, **kwargs):
calls.append({"client_kwargs": kwargs})
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def request(self, method, url, **kwargs):
calls.append({
"method": method,
"url": url,
"kwargs": kwargs,
})
return FakeResponse()
monkeypatch.setattr(admin_module.httpx, "AsyncClient", FakeAsyncClient)
status_code, payload, response_text = asyncio.run(
admin_module._sync_json_http_request(
method="POST",
url="https://example.com/api/v1/custom-score",
headers={"Authorization": "Bearer token"},
payload={"website_url": "https://example.com"},
timeout=15,
)
)
assert status_code == 200
assert payload == {"success": True, "token": "abc"}
assert response_text == '{"success": true, "token": "abc"}'
assert calls == [
{
"client_kwargs": {
"follow_redirects": True,
},
},
{
"method": "POST",
"url": "https://example.com/api/v1/custom-score",
"kwargs": {
"headers": {
"Authorization": "Bearer token",
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8",
},
"timeout": 15,
"json": {"website_url": "https://example.com"},
},
}
]

View File

@@ -1,31 +0,0 @@
import asyncio
import os
import time
from src.services.file_cache import FileCache
def test_cleanup_keeps_files_when_timeout_is_zero(tmp_path):
cache = FileCache(cache_dir=str(tmp_path), default_timeout=0)
cached_file = tmp_path / "expired.jpg"
cached_file.write_bytes(b"cached")
expired_at = time.time() - 3600
os.utime(cached_file, (expired_at, expired_at))
asyncio.run(cache._cleanup_expired_files())
assert cached_file.exists()
def test_cleanup_removes_files_when_timeout_is_positive(tmp_path):
cache = FileCache(cache_dir=str(tmp_path), default_timeout=1)
cached_file = tmp_path / "expired.jpg"
cached_file.write_bytes(b"cached")
expired_at = time.time() - 3600
os.utime(cached_file, (expired_at, expired_at))
asyncio.run(cache._cleanup_expired_files())
assert not cached_file.exists()

View File

@@ -1,56 +0,0 @@
import asyncio
import pytest
from src.services import flow_client as flow_client_module
from src.services.flow_client import FlowClient
def test_create_project_retries_timeout_then_succeeds(monkeypatch):
client = FlowClient(proxy_manager=None)
attempts = []
sleep_calls = []
async def fake_make_request(**kwargs):
attempts.append(kwargs["timeout"])
if len(attempts) < 3:
raise Exception("Flow API request failed: curl: (28) Connection timed out after 5013 milliseconds")
return {
"result": {
"data": {
"json": {
"result": {
"projectId": "project-123",
}
}
}
}
}
async def fake_sleep(seconds):
sleep_calls.append(seconds)
monkeypatch.setattr(client, "_make_request", fake_make_request)
monkeypatch.setattr(flow_client_module.asyncio, "sleep", fake_sleep)
project_id = asyncio.run(client.create_project("st-token", "Retry Test"))
assert project_id == "project-123"
assert attempts == [15, 15, 15]
assert sleep_calls == [1, 1]
def test_create_project_invalid_response_fails_fast(monkeypatch):
client = FlowClient(proxy_manager=None)
attempts = []
async def fake_make_request(**kwargs):
attempts.append(kwargs["timeout"])
return {"result": {"data": {"json": {"result": {}}}}}
monkeypatch.setattr(client, "_make_request", fake_make_request)
with pytest.raises(Exception, match="missing projectId"):
asyncio.run(client.create_project("st-token", "Invalid Response"))
assert attempts == [15]

View File

@@ -1,114 +0,0 @@
import asyncio
from src.services import flow_client as flow_client_module
from src.services.flow_client import FlowClient
def test_control_plane_timeout_is_capped():
client = FlowClient(None)
client.timeout = 120
assert client._get_control_plane_timeout() == 10
client.timeout = 8
assert client._get_control_plane_timeout() == 8
client.timeout = 3
assert client._get_control_plane_timeout() == 5
def test_control_plane_calls_use_short_timeouts(monkeypatch):
client = FlowClient(None)
client.timeout = 120
calls = []
async def fake_make_request(**kwargs):
calls.append({
"url": kwargs["url"],
"timeout": kwargs.get("timeout"),
})
url = kwargs["url"]
if url.endswith("/auth/session"):
return {"access_token": "at", "user": {"email": "tester@example.com"}}
if url.endswith("/trpc/project.createProject"):
return {"result": {"data": {"json": {"result": {"projectId": "project-123"}}}}}
if url.endswith("/credits"):
return {"credits": 1000, "userPaygateTier": "PAYGATE_TIER_ONE"}
return {}
monkeypatch.setattr(client, "_make_request", fake_make_request)
async def run():
await client.st_to_at("st")
await client.create_project("st", "demo")
await client.delete_project("st", "project-123")
await client.get_credits("at")
asyncio.run(run())
assert [call["timeout"] for call in calls] == [10, 15, 10, 10]
def test_remote_browser_http_helper_uses_httpx(monkeypatch):
calls = []
class FakeResponse:
status_code = 200
text = '{"ok": true}'
def json(self):
return {"ok": True}
class FakeAsyncClient:
def __init__(self, **kwargs):
calls.append({"client_kwargs": kwargs})
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def request(self, method, url, **kwargs):
calls.append({
"method": method,
"url": url,
"kwargs": kwargs,
})
return FakeResponse()
monkeypatch.setattr(flow_client_module.httpx, "AsyncClient", FakeAsyncClient)
status_code, payload, response_text = asyncio.run(
FlowClient._sync_json_http_request(
method="POST",
url="https://example.com/api/v1/solve",
headers={"Authorization": "Bearer token"},
payload={"project_id": "project-123"},
timeout=12,
)
)
assert status_code == 200
assert payload == {"ok": True}
assert response_text == '{"ok": true}'
assert calls == [
{
"client_kwargs": {
"follow_redirects": True,
},
},
{
"method": "POST",
"url": "https://example.com/api/v1/solve",
"kwargs": {
"headers": {
"Authorization": "Bearer token",
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8",
},
"timeout": 12,
"json": {"project_id": "project-123"},
},
}
]

View File

@@ -1,174 +0,0 @@
import base64
import json
from src.api import routes
def build_openai_completion(content: str) -> str:
return json.dumps(
{
"id": "chatcmpl-test",
"object": "chat.completion",
"created": 1,
"model": "flow2api",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": content,
},
"finish_reason": "stop",
}
],
}
)
def test_generate_content_returns_gemini_response(client, fake_handler, monkeypatch):
fake_handler.non_stream_chunks = [
build_openai_completion("![Generated Image](https://example.com/generated.png)")
]
async def fake_retrieve_image_data(url: str):
return b"\x89PNG\r\n\x1a\nfake"
monkeypatch.setattr(routes, "retrieve_image_data", fake_retrieve_image_data)
response = client.post(
"/v1beta/models/gemini-3.0-pro-image:generateContent",
json={
"contents": [
{
"role": "user",
"parts": [{"text": "draw a mountain"}],
}
],
"generationConfig": {
"imageConfig": {
"aspectRatio": "16:9",
"imageSize": "2K",
}
},
},
)
assert response.status_code == 200
assert fake_handler.calls[0]["model"] == "gemini-3.0-pro-image-landscape-2k"
body = response.json()
assert body["modelVersion"] == "gemini-3.0-pro-image"
part = body["candidates"][0]["content"]["parts"][0]["inlineData"]
assert part["mimeType"] == "image/png"
assert base64.b64decode(part["data"]).startswith(b"\x89PNG")
def test_stream_generate_content_returns_sse_chunks(client, fake_handler, monkeypatch):
fake_handler.stream_chunks = [
'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1,"model":"flow2api","choices":[{"index":0,"delta":{"reasoning_content":"starting generation"},"finish_reason":null}]}\n\n',
'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","created":1,"model":"flow2api","choices":[{"index":0,"delta":{"content":"![Generated Image](https://example.com/final.png)"},"finish_reason":"stop"}]}\n\n',
]
async def fake_retrieve_image_data(url: str):
return b"\x89PNG\r\n\x1a\nstream"
monkeypatch.setattr(routes, "retrieve_image_data", fake_retrieve_image_data)
response = client.post(
"/v1beta/models/gemini-3.0-pro-image:streamGenerateContent?alt=sse",
json={
"contents": [
{
"role": "user",
"parts": [{"text": "draw a city"}],
}
]
},
)
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/event-stream")
data_lines = [
line.removeprefix("data: ")
for line in response.text.splitlines()
if line.startswith("data: ")
]
assert len(data_lines) == 2
first_chunk = json.loads(data_lines[0])
assert first_chunk["modelVersion"] == "gemini-3.0-pro-image"
assert first_chunk["candidates"][0]["content"]["parts"][0]["text"] == "starting generation"
second_chunk = json.loads(data_lines[1])
assert second_chunk["modelVersion"] == "gemini-3.0-pro-image"
image_part = second_chunk["candidates"][0]["content"]["parts"][0]["inlineData"]
assert image_part["mimeType"] == "image/png"
assert second_chunk["candidates"][0]["finishReason"] == "STOP"
def test_models_generate_content_supports_system_instruction_and_file_data(client, fake_handler):
fake_handler.non_stream_chunks = [
build_openai_completion("![Generated Image](https://example.com/generated-square.png)")
]
reference_image = base64.b64encode(b"\x89PNG\r\n\x1a\nref").decode()
response = client.post(
"/models/gemini-3.1-flash-image:generateContent",
json={
"systemInstruction": {
"parts": [{"text": "answer in English"}],
},
"contents": [
{
"role": "user",
"parts": [
{"text": "draw a cat"},
{
"fileData": {
"fileUri": f"data:image/png;base64,{reference_image}",
"mimeType": "image/png",
}
},
],
}
],
"generationConfig": {
"imageConfig": {
"aspectRatio": "1:1",
"imageSize": "1K",
}
},
},
)
assert response.status_code == 200
assert fake_handler.calls[0]["model"] == "gemini-3.1-flash-image-square"
assert response.json()["modelVersion"] == "gemini-3.1-flash-image"
assert fake_handler.calls[0]["prompt"] == "answer in English\n\ndraw a cat"
assert len(fake_handler.calls[0]["images"]) == 1
def test_models_root_lists_gemini_models_for_newapi_compatibility(client):
response = client.get("/models")
assert response.status_code == 200
body = response.json()
names = {item["name"] for item in body["models"]}
assert "models/gemini-3.0-pro-image" in names
assert "models/gemini-3.1-flash-image" in names
assert any(
item["name"] == "models/gemini-3.1-flash-image"
and item["supportedGenerationMethods"] == ["generateContent", "streamGenerateContent"]
for item in body["models"]
)
def test_v1beta_models_root_lists_gemini_models(client):
response = client.get("/v1beta/models")
assert response.status_code == 200
body = response.json()
assert any(item["name"] == "models/gemini-3.0-pro-image" for item in body["models"])

View File

@@ -1,111 +0,0 @@
import asyncio
from types import SimpleNamespace
from src.services.generation_handler import GenerationHandler
class FakeFlowClient:
async def upload_image(self, at, image_bytes, aspect_ratio, project_id=None):
return "media-uploaded"
async def generate_image(
self,
at,
project_id,
prompt,
model_name,
aspect_ratio,
image_inputs=None,
token_id=None,
token_image_concurrency=None,
progress_callback=None,
):
if progress_callback is not None:
await progress_callback("solving_image_captcha", 38)
await progress_callback("submitting_image", 48)
return (
{
"media": [
{
"name": "media-generated",
"image": {
"generatedImage": {
"fifeUrl": "https://example.com/generated.png"
}
},
}
]
},
"session-1",
{"generation_attempts": [{"launch_queue_ms": 0, "launch_stagger_ms": 0}]},
)
class FakeDB:
def __init__(self):
self.status_updates = []
async def update_request_log(self, log_id, **kwargs):
self.status_updates.append(
{
"log_id": log_id,
"status_text": kwargs.get("status_text"),
"progress": kwargs.get("progress"),
}
)
async def _collect(async_gen):
items = []
async for item in async_gen:
items.append(item)
return items
def test_image_generation_progress_switches_from_upload_to_captcha():
db = FakeDB()
handler = GenerationHandler(
flow_client=FakeFlowClient(),
token_manager=None,
load_balancer=None,
db=db,
concurrency_manager=None,
proxy_manager=None,
)
token = SimpleNamespace(
id=1,
at="at-token",
image_concurrency=-1,
user_paygate_tier="PAYGATE_TIER_NOT_PAID",
)
generation_result = handler._create_generation_result()
request_log_state = {"id": 123}
asyncio.run(
_collect(
handler._handle_image_generation(
token=token,
project_id="project-1",
model_config={
"model_name": "NARWHAL",
"aspect_ratio": "IMAGE_ASPECT_RATIO_SQUARE",
},
prompt="draw a cat",
images=[b"fake-image"],
stream=False,
perf_trace={},
generation_result=generation_result,
request_log_state=request_log_state,
pending_token_state={"active": False},
)
)
)
status_texts = [item["status_text"] for item in db.status_updates]
assert status_texts[:4] == [
"uploading_images",
"solving_image_captcha",
"submitting_image",
"image_generated",
]

View File

@@ -1,98 +0,0 @@
import pytest
from src.core.model_resolver import resolve_model_name
from src.core.models import ChatCompletionRequest, ChatMessage
from src.services.generation_handler import MODEL_CONFIG
def build_request(model: str, **kwargs) -> ChatCompletionRequest:
payload = {
"model": model,
"messages": [ChatMessage(role="user", content="draw a cat")],
}
payload.update(kwargs)
return ChatCompletionRequest(**payload)
def test_image_alias_resolves_with_official_generation_config():
request = build_request(
"gemini-3.0-pro-image",
generationConfig={
"imageConfig": {
"aspectRatio": "16:9",
"imageSize": "2K",
}
},
)
assert (
resolve_model_name(request.model, request, MODEL_CONFIG)
== "gemini-3.0-pro-image-landscape-2k"
)
def test_image_alias_treats_1k_as_default_size():
request = build_request(
"gemini-3.1-flash-image",
generationConfig={
"imageConfig": {
"aspectRatio": "1:1",
"imageSize": "1K",
}
},
)
assert (
resolve_model_name(request.model, request, MODEL_CONFIG)
== "gemini-3.1-flash-image-square"
)
def test_generation_config_can_come_from_extra_body():
request = build_request(
"gemini-3.1-flash-image",
extra_body={
"generationConfig": {
"imageConfig": {
"aspectRatio": "4:3",
"imageSize": "4K",
}
}
},
)
assert (
resolve_model_name(request.model, request, MODEL_CONFIG)
== "gemini-3.1-flash-image-four-three-4k"
)
@pytest.mark.parametrize(
("alias", "expected"),
[
("veo_3_1_t2v_fast_ultra", "veo_3_1_t2v_fast_portrait_ultra"),
(
"veo_3_1_t2v_fast_ultra_relaxed",
"veo_3_1_t2v_fast_portrait_ultra_relaxed",
),
("veo_3_1_i2v_s_fast_fl", "veo_3_1_i2v_s_fast_portrait_fl"),
("veo_3_1_i2v_s_fast_ultra_fl", "veo_3_1_i2v_s_fast_portrait_ultra_fl"),
(
"veo_3_1_i2v_s_fast_ultra_relaxed",
"veo_3_1_i2v_s_fast_portrait_ultra_relaxed",
),
("veo_3_1_r2v_fast", "veo_3_1_r2v_fast_portrait"),
("veo_3_1_r2v_fast_ultra", "veo_3_1_r2v_fast_portrait_ultra"),
(
"veo_3_1_r2v_fast_ultra_relaxed",
"veo_3_1_r2v_fast_portrait_ultra_relaxed",
),
],
)
def test_conflicting_video_aliases_resolve_to_portrait(alias, expected):
request = build_request(
alias,
generationConfig={"imageConfig": {"aspectRatio": "9:16"}},
)
assert resolve_model_name(request.model, request, MODEL_CONFIG) == expected