mirror of
https://github.com/rustfs/rustfs.git
synced 2026-07-02 07:14:44 +08:00
400 lines
15 KiB
Python
400 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""Machine-readable Iceberg engine compatibility helpers for RustFS S3 Tables."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from collections import OrderedDict
|
|
from io import StringIO
|
|
from typing import Any
|
|
|
|
VENDOR_SPARK_PROFILES: dict[str, dict[str, str]] = {
|
|
"rustfs": {
|
|
"catalog_uri": "{endpoint}/iceberg",
|
|
"warehouse": "{warehouse}",
|
|
"rest_signing_name": "s3",
|
|
"s3_endpoint": "{endpoint}",
|
|
"s3_path_style_access": "true",
|
|
},
|
|
"rustfs-compat": {
|
|
"catalog_uri": "{endpoint}/_iceberg",
|
|
"warehouse": "{warehouse}",
|
|
"rest_signing_name": "s3tables",
|
|
"s3_endpoint": "{endpoint}",
|
|
"s3_path_style_access": "true",
|
|
},
|
|
"aws-s3tables": {
|
|
"catalog_uri": "https://s3tables.{region}.amazonaws.com/iceberg",
|
|
"warehouse": "arn:aws:s3tables:{region}:{account_id}:bucket/{table_bucket}",
|
|
"rest_signing_name": "s3tables",
|
|
},
|
|
"minio-aistor": {
|
|
"catalog_uri": "{endpoint}/_iceberg",
|
|
"warehouse": "{warehouse}",
|
|
"rest_signing_name": "s3tables",
|
|
"s3_endpoint": "{endpoint}",
|
|
"s3_path_style_access": "true",
|
|
},
|
|
"cloudflare-r2-data-catalog": {
|
|
"catalog_uri": "{catalog_uri}",
|
|
"warehouse": "{warehouse_name}",
|
|
"rest_signing_name": "s3",
|
|
},
|
|
"oss-tables": {
|
|
"catalog_uri": "{endpoint}/iceberg",
|
|
"warehouse": "{warehouse}",
|
|
"rest_signing_name": "s3",
|
|
},
|
|
}
|
|
|
|
|
|
def scenario(name: str, status: str, evidence: str) -> dict[str, str]:
|
|
return {
|
|
"name": name,
|
|
"status": status,
|
|
"evidence": evidence,
|
|
}
|
|
|
|
|
|
def engine_compatibility_matrix() -> list[dict[str, Any]]:
|
|
return [
|
|
{
|
|
"client": "PyIceberg",
|
|
"status": "automated-smoke",
|
|
"entrypoint": "scripts/table-catalog/pyiceberg_smoke.py",
|
|
"scenarios": [
|
|
scenario("create-namespace", "automated", "PyIceberg catalog.create_namespace_if_not_exists/create_namespace"),
|
|
scenario("create-table", "automated", "PyIceberg catalog.create_table"),
|
|
scenario("append", "automated", "PyIceberg table.append with PyArrow rows"),
|
|
scenario("reload-table", "automated", "PyIceberg catalog.load_table after append"),
|
|
scenario("scan", "automated", "PyIceberg table.scan().to_arrow"),
|
|
scenario("drop-table", "automated-with-cleanup", "PyIceberg catalog.drop_table when --cleanup or --replace is set"),
|
|
scenario("commit-conflict", "direct-rest-probe-required", "catalog commit conflict remains a follow-up live probe"),
|
|
],
|
|
},
|
|
{
|
|
"client": "Spark Iceberg REST catalog",
|
|
"status": "generated-smoke-harness",
|
|
"entrypoint": "scripts/table-catalog/engine_compatibility.py --print-spark-sql",
|
|
"scenarios": [
|
|
scenario("create-namespace", "generated-spark-sql", "CREATE NAMESPACE IF NOT EXISTS"),
|
|
scenario("create-table", "generated-spark-sql", "CREATE TABLE USING iceberg"),
|
|
scenario("append", "generated-spark-sql", "INSERT INTO"),
|
|
scenario("reload-table", "generated-spark-sql", "REFRESH TABLE and SELECT COUNT"),
|
|
scenario("drop-table", "generated-spark-sql", "DROP TABLE and optional DROP NAMESPACE"),
|
|
scenario("commit-conflict", "manual-validation-required", "requires a two-writer Spark or REST conflict harness"),
|
|
],
|
|
},
|
|
{
|
|
"client": "Trino Iceberg REST catalog",
|
|
"status": "documented-read-path",
|
|
"entrypoint": "scripts/table-catalog/README.md",
|
|
"scenarios": [
|
|
scenario("catalog-load", "manual-validation-required", "REST catalog configuration reference"),
|
|
scenario("read-table", "manual-validation-required", "SELECT from a table created by PyIceberg or Spark"),
|
|
scenario("write-table", "not-claimed", "Trino write compatibility is not claimed by this harness"),
|
|
],
|
|
},
|
|
{
|
|
"client": "DuckDB Iceberg",
|
|
"status": "documented-read-path",
|
|
"entrypoint": "scripts/table-catalog/README.md",
|
|
"scenarios": [
|
|
scenario("catalog-load", "manual-validation-required", "REST catalog extension/configuration reference"),
|
|
scenario("read-table", "manual-validation-required", "read-path verification only"),
|
|
scenario("write-table", "not-claimed", "DuckDB write/commit compatibility is not claimed"),
|
|
],
|
|
},
|
|
{
|
|
"client": "StarRocks Iceberg REST catalog",
|
|
"status": "documented-read-path",
|
|
"entrypoint": "scripts/table-catalog/README.md",
|
|
"scenarios": [
|
|
scenario("catalog-load", "manual-validation-required", "REST catalog configuration reference"),
|
|
scenario("read-table", "manual-validation-required", "external catalog read-path verification only"),
|
|
scenario("write-table", "not-claimed", "StarRocks write/commit compatibility is not claimed"),
|
|
],
|
|
},
|
|
{
|
|
"client": "Snowflake Open Catalog / Iceberg integrations",
|
|
"status": "reference-only",
|
|
"entrypoint": "scripts/table-catalog/README.md",
|
|
"scenarios": [
|
|
scenario("catalog-load", "not-claimed", "reference only until a repeatable external integration harness exists"),
|
|
],
|
|
},
|
|
{
|
|
"client": "Databend",
|
|
"status": "s3-data-plane-reference",
|
|
"entrypoint": "scripts/table-catalog/README.md",
|
|
"scenarios": [
|
|
scenario("s3-data-plane-read", "manual-validation-required", "S3 stage/data-plane reference only"),
|
|
scenario("iceberg-rest-catalog", "not-claimed", "Databend REST catalog integration is not claimed"),
|
|
],
|
|
},
|
|
]
|
|
|
|
|
|
def normalized_endpoint(endpoint: str) -> str:
|
|
return endpoint.rstrip("/")
|
|
|
|
|
|
def normalized_rest_path(rest_path: str) -> str:
|
|
stripped = rest_path.strip()
|
|
if not stripped:
|
|
raise ValueError("REST catalog path cannot be empty")
|
|
if not stripped.startswith("/"):
|
|
stripped = f"/{stripped}"
|
|
return stripped.rstrip("/")
|
|
|
|
|
|
def vendor_profile_context(
|
|
*,
|
|
endpoint: str,
|
|
warehouse: str,
|
|
region: str,
|
|
account_id: str,
|
|
table_bucket: str,
|
|
catalog_uri: str | None,
|
|
warehouse_name: str | None,
|
|
) -> dict[str, str]:
|
|
endpoint = normalized_endpoint(endpoint)
|
|
return {
|
|
"account_id": account_id,
|
|
"catalog_uri": (catalog_uri or f"{endpoint}/iceberg").rstrip("/"),
|
|
"endpoint": endpoint,
|
|
"region": region,
|
|
"table_bucket": table_bucket,
|
|
"warehouse": warehouse,
|
|
"warehouse_name": warehouse_name or warehouse,
|
|
}
|
|
|
|
|
|
def vendor_profile_value(profile: str, key: str, context: dict[str, str]) -> str:
|
|
try:
|
|
template = VENDOR_SPARK_PROFILES[profile][key]
|
|
except KeyError as err:
|
|
raise ValueError(f"unknown vendor profile field: {profile}.{key}") from err
|
|
return template.format(**context).rstrip("/")
|
|
|
|
|
|
def spark_catalog_config(
|
|
*,
|
|
endpoint: str,
|
|
warehouse: str,
|
|
access_key: str,
|
|
secret_key: str,
|
|
region: str,
|
|
catalog_name: str,
|
|
rest_path: str,
|
|
rest_signing_name: str,
|
|
) -> OrderedDict[str, str]:
|
|
endpoint = normalized_endpoint(endpoint)
|
|
rest_path = normalized_rest_path(rest_path)
|
|
prefix = f"spark.sql.catalog.{catalog_name}"
|
|
return OrderedDict(
|
|
[
|
|
(prefix, "org.apache.iceberg.spark.SparkCatalog"),
|
|
(f"{prefix}.type", "rest"),
|
|
(f"{prefix}.uri", f"{endpoint}{rest_path}"),
|
|
(f"{prefix}.warehouse", warehouse),
|
|
(f"{prefix}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO"),
|
|
(f"{prefix}.s3.endpoint", endpoint),
|
|
(f"{prefix}.s3.path-style-access", "true"),
|
|
(f"{prefix}.rest.sigv4-enabled", "true"),
|
|
(f"{prefix}.rest.signing-name", rest_signing_name),
|
|
(f"{prefix}.rest.signing-region", region),
|
|
(f"{prefix}.s3.access-key-id", access_key),
|
|
(f"{prefix}.s3.secret-access-key", secret_key),
|
|
]
|
|
)
|
|
|
|
|
|
def spark_vendor_catalog_config(
|
|
*,
|
|
profile: str,
|
|
endpoint: str,
|
|
warehouse: str,
|
|
access_key: str,
|
|
secret_key: str,
|
|
region: str,
|
|
catalog_name: str,
|
|
account_id: str,
|
|
table_bucket: str,
|
|
catalog_uri: str | None,
|
|
warehouse_name: str | None,
|
|
rest_path: str | None = None,
|
|
rest_signing_name: str | None = None,
|
|
) -> OrderedDict[str, str]:
|
|
context = vendor_profile_context(
|
|
endpoint=endpoint,
|
|
warehouse=warehouse,
|
|
region=region,
|
|
account_id=account_id,
|
|
table_bucket=table_bucket,
|
|
catalog_uri=catalog_uri,
|
|
warehouse_name=warehouse_name,
|
|
)
|
|
profile_defaults = VENDOR_SPARK_PROFILES.get(profile)
|
|
if profile_defaults is None:
|
|
raise ValueError(f"unknown vendor profile: {profile}")
|
|
configured_catalog_uri = vendor_profile_value(profile, "catalog_uri", context)
|
|
if rest_path is not None:
|
|
configured_catalog_uri = f"{normalized_endpoint(endpoint)}{normalized_rest_path(rest_path)}"
|
|
config = spark_catalog_config(
|
|
endpoint=endpoint,
|
|
warehouse=vendor_profile_value(profile, "warehouse", context),
|
|
access_key=access_key,
|
|
secret_key=secret_key,
|
|
region=region,
|
|
catalog_name=catalog_name,
|
|
rest_path="/iceberg",
|
|
rest_signing_name=rest_signing_name or profile_defaults["rest_signing_name"],
|
|
)
|
|
prefix = f"spark.sql.catalog.{catalog_name}"
|
|
config[f"{prefix}.uri"] = configured_catalog_uri
|
|
if "s3_endpoint" in profile_defaults:
|
|
config[f"{prefix}.s3.endpoint"] = vendor_profile_value(profile, "s3_endpoint", context)
|
|
else:
|
|
config.pop(f"{prefix}.s3.endpoint", None)
|
|
config.pop(f"{prefix}.s3.access-key-id", None)
|
|
config.pop(f"{prefix}.s3.secret-access-key", None)
|
|
if "s3_path_style_access" in profile_defaults:
|
|
config[f"{prefix}.s3.path-style-access"] = profile_defaults["s3_path_style_access"]
|
|
else:
|
|
config.pop(f"{prefix}.s3.path-style-access", None)
|
|
return config
|
|
|
|
|
|
def quote_spark_identifier(identifier: str) -> str:
|
|
if not identifier or "`" in identifier or "\n" in identifier or "\r" in identifier:
|
|
raise ValueError("Spark identifier must be non-empty and must not contain backticks or newlines")
|
|
return f"`{identifier}`"
|
|
|
|
|
|
def spark_table_identifier(catalog_name: str, namespace: str, table: str) -> str:
|
|
return ".".join(
|
|
[
|
|
catalog_name,
|
|
quote_spark_identifier(namespace),
|
|
quote_spark_identifier(table),
|
|
]
|
|
)
|
|
|
|
|
|
def spark_sql_smoke(
|
|
*,
|
|
catalog_name: str,
|
|
namespace: str,
|
|
table: str,
|
|
cleanup: bool = False,
|
|
) -> str:
|
|
namespace_identifier = f"{catalog_name}.{quote_spark_identifier(namespace)}"
|
|
table_identifier = spark_table_identifier(catalog_name, namespace, table)
|
|
statements = [
|
|
f"CREATE NAMESPACE IF NOT EXISTS {namespace_identifier};",
|
|
f"DROP TABLE IF EXISTS {table_identifier};",
|
|
f"CREATE TABLE {table_identifier} (id BIGINT, payload STRING) USING iceberg;",
|
|
f"INSERT INTO {table_identifier} VALUES (1, 'alpha'), (2, 'beta');",
|
|
f"REFRESH TABLE {table_identifier};",
|
|
f"SELECT COUNT(*) AS row_count FROM {table_identifier};",
|
|
]
|
|
if cleanup:
|
|
statements.extend(
|
|
[
|
|
f"DROP TABLE IF EXISTS {table_identifier};",
|
|
f"DROP NAMESPACE IF EXISTS {namespace_identifier};",
|
|
]
|
|
)
|
|
return "\n".join(statements) + "\n"
|
|
|
|
|
|
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Print RustFS S3 Tables Iceberg engine compatibility helpers.")
|
|
parser.add_argument("--endpoint", default="http://127.0.0.1:9000")
|
|
parser.add_argument("--access-key", default="rustfsadmin")
|
|
parser.add_argument("--secret-key", default="rustfsadmin")
|
|
parser.add_argument("--region", default="us-east-1")
|
|
parser.add_argument("--warehouse", default="rustfs-s3table-smoke")
|
|
parser.add_argument("--profile", choices=sorted(VENDOR_SPARK_PROFILES), default="rustfs")
|
|
parser.add_argument("--account-id", default="000000000000")
|
|
parser.add_argument("--table-bucket", default="rustfs-s3table-smoke")
|
|
parser.add_argument("--catalog-uri")
|
|
parser.add_argument("--warehouse-name")
|
|
parser.add_argument("--namespace", default="smoke")
|
|
parser.add_argument("--table", default="events")
|
|
parser.add_argument("--catalog-name", default="rustfs")
|
|
parser.add_argument("--rest-path")
|
|
parser.add_argument("--rest-signing-name")
|
|
parser.add_argument("--cleanup", action="store_true")
|
|
parser.add_argument("--print-engine-matrix", action="store_true")
|
|
parser.add_argument("--print-spark-config", action="store_true")
|
|
parser.add_argument("--print-spark-sql", action="store_true")
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def print_json(document: Any, output: StringIO | None = None) -> None:
|
|
text = json.dumps(document, indent=2, sort_keys=True)
|
|
if output is None:
|
|
print(text)
|
|
else:
|
|
output.write(f"{text}\n")
|
|
|
|
|
|
def cli_json(argv: list[str]) -> str:
|
|
output = StringIO()
|
|
run(parse_args(argv), output)
|
|
return output.getvalue()
|
|
|
|
|
|
def run(args: argparse.Namespace, output: StringIO | None = None) -> None:
|
|
printed = False
|
|
if args.print_engine_matrix:
|
|
print_json({"engine_compatibility": engine_compatibility_matrix()}, output)
|
|
printed = True
|
|
if args.print_spark_config:
|
|
print_json(
|
|
{
|
|
"spark_config": spark_vendor_catalog_config(
|
|
profile=args.profile,
|
|
endpoint=args.endpoint,
|
|
warehouse=args.warehouse,
|
|
access_key=args.access_key,
|
|
secret_key=args.secret_key,
|
|
region=args.region,
|
|
catalog_name=args.catalog_name,
|
|
account_id=args.account_id,
|
|
table_bucket=args.table_bucket,
|
|
catalog_uri=args.catalog_uri,
|
|
warehouse_name=args.warehouse_name,
|
|
rest_path=args.rest_path,
|
|
rest_signing_name=args.rest_signing_name,
|
|
)
|
|
},
|
|
output,
|
|
)
|
|
printed = True
|
|
if args.print_spark_sql:
|
|
sql = spark_sql_smoke(
|
|
catalog_name=args.catalog_name,
|
|
namespace=args.namespace,
|
|
table=args.table,
|
|
cleanup=args.cleanup,
|
|
)
|
|
if output is None:
|
|
print(sql, end="")
|
|
else:
|
|
output.write(sql)
|
|
printed = True
|
|
if not printed:
|
|
print_json({"engine_compatibility": engine_compatibility_matrix()}, output)
|
|
|
|
|
|
def main() -> None:
|
|
run(parse_args())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|