diff --git a/Cargo.lock b/Cargo.lock index 2d4e3adf8..0a8cfcf48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4956,6 +4956,21 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kafka-protocol" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66292444a1cd4d430d450d472c30cba839d0724229aba2d79affffcf901516e2" +dependencies = [ + "anyhow", + "bytes", + "crc", + "crc32c", + "indexmap 2.14.0", + "paste", + "uuid", +] + [[package]] name = "konst" version = "0.2.20" @@ -6435,9 +6450,9 @@ dependencies = [ [[package]] name = "pbkdf2" -version = "0.13.0-rc.10" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f24f3eb2f4471b1730d59e4b730b747939960a8c7eb0c33c5a9076f2d3dddea" +checksum = "112d82ceb8c5bf524d9af484d4e4970c9fd5a0cc15ba14ad93dccd28873b0629" dependencies = [ "digest 0.11.2", "hmac 0.13.0", @@ -8052,7 +8067,7 @@ dependencies = [ "cfg-if", "chacha20poly1305", "jsonwebtoken", - "pbkdf2 0.13.0-rc.10", + "pbkdf2 0.13.0", "rand 0.10.1", "serde_json", "sha2 0.11.0", @@ -8267,6 +8282,54 @@ dependencies = [ "tracing", ] +[[package]] +name = "rustfs-kafka" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eee0644a99743fb2f51db7fbae1a6ca2d85f064daf7595784eaa52834a68c96" +dependencies = [ + "base64 0.22.1", + "bytes", + "fnv", + "hmac 0.13.0", + "indexmap 2.14.0", + "kafka-protocol", + "metrics", + "pbkdf2 0.13.0", + "rand 0.10.1", + "rustls", + "rustls-native-certs", + "sha2 0.11.0", + "socket2", + "thiserror 2.0.18", + "tracing", + "twox-hash", + "webpki-roots 1.0.7", +] + +[[package]] +name = "rustfs-kafka-async" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd1997c3116cb94ede80d9a0b828f46dd27386cc93825fce141a92eb3aa9630" +dependencies = [ + "base64 0.22.1", + "bytes", + "hmac 0.13.0", + "kafka-protocol", + "metrics", + "pbkdf2 0.13.0", + "rand 0.10.1", + "rustfs-kafka", + "rustls", + "rustls-native-certs", + "sha2 0.11.0", + "tokio", + "tokio-rustls", + "tracing", + "webpki-roots 1.0.7", +] + [[package]] name = "rustfs-keystone" version = "0.0.5" @@ -8684,6 +8747,7 @@ dependencies = [ "rumqttc-next", "rustfs-config", "rustfs-ecstore", + "rustfs-kafka-async", "rustfs-s3-common", "rustfs-utils", "rustls", @@ -8856,9 +8920,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.38" +version = "0.23.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f9466fb2c14ea04357e91413efb882e2a6d4a406e625449bc0a5d360d53a21" +checksum = "7c2c118cb077cca2822033836dfb1b975355dfb784b5e8da48f7b6c5db74e60e" dependencies = [ "aws-lc-rs", "log", diff --git a/Cargo.toml b/Cargo.toml index 524d6b48a..3c5a7d127 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,6 +132,7 @@ http = "1.4.0" http-body = "1.0.1" http-body-util = "0.1.3" reqwest = { version = "0.13.2", default-features = false, features = ["rustls", "charset", "http2", "system-proxy", "stream", "json", "blocking", "query", "form"] } +rustfs-kafka-async = { version = "1.2.0" } socket2 = { version = "0.6.3", features = ["all"] } tokio = { version = "1.52.1", features = ["fs", "rt-multi-thread"] } tokio-rustls = { version = "0.26.4", default-features = false, features = ["logging", "tls12", "aws-lc-rs"] } @@ -169,7 +170,7 @@ jsonwebtoken = { version = "10.3.0", features = ["aws_lc_rs"] } openidconnect = { version = "4.0", default-features = false } pbkdf2 = "0.13.0-rc.10" rsa = { version = "0.10.0-rc.17" } -rustls = { version = "0.23.38", default-features = false, features = ["aws-lc-rs", "logging", "tls12", "prefer-post-quantum", "std"] } +rustls = { version = "0.23.39", default-features = false, features = ["aws-lc-rs", "logging", "tls12", "prefer-post-quantum", "std"] } rustls-pki-types = "1.14.0" sha1 = "0.11.0" sha2 = "0.11.0" diff --git a/crates/audit/src/factory.rs b/crates/audit/src/factory.rs index 8fbb3c570..c27026268 100644 --- a/crates/audit/src/factory.rs +++ b/crates/audit/src/factory.rs @@ -15,13 +15,13 @@ use crate::AuditEntry; use async_trait::async_trait; use rustfs_config::AUDIT_DEFAULT_DIR; -use rustfs_config::audit::{AUDIT_MQTT_KEYS, AUDIT_NATS_KEYS, AUDIT_PULSAR_KEYS, AUDIT_WEBHOOK_KEYS}; +use rustfs_config::audit::{AUDIT_KAFKA_KEYS, AUDIT_MQTT_KEYS, AUDIT_NATS_KEYS, AUDIT_PULSAR_KEYS, AUDIT_WEBHOOK_KEYS}; use rustfs_ecstore::config::KVS; use rustfs_targets::{ Target, config::{ - build_mqtt_args, build_nats_args, build_pulsar_args, build_webhook_args, validate_mqtt_config, validate_nats_config, - validate_pulsar_config, validate_webhook_config, + build_kafka_args, build_mqtt_args, build_nats_args, build_pulsar_args, build_webhook_args, validate_kafka_config, + validate_mqtt_config, validate_nats_config, validate_pulsar_config, validate_webhook_config, }, error::TargetError, target::TargetType, @@ -119,3 +119,22 @@ impl TargetFactory for PulsarTargetFactory { AUDIT_PULSAR_KEYS.iter().map(|s| s.to_string()).collect() } } + +pub struct KafkaTargetFactory; + +#[async_trait] +impl TargetFactory for KafkaTargetFactory { + async fn create_target(&self, id: String, config: &KVS) -> Result + Send + Sync>, TargetError> { + let args = build_kafka_args(config, AUDIT_DEFAULT_DIR, TargetType::AuditLog)?; + let target = rustfs_targets::target::kafka::KafkaTarget::new(id, args)?; + Ok(Box::new(target)) + } + + fn validate_config(&self, _id: &str, config: &KVS) -> Result<(), TargetError> { + validate_kafka_config(config, AUDIT_DEFAULT_DIR) + } + + fn get_valid_fields(&self) -> HashSet { + AUDIT_KAFKA_KEYS.iter().map(|s| s.to_string()).collect() + } +} diff --git a/crates/audit/src/registry.rs b/crates/audit/src/registry.rs index a9822c930..d7c477fa7 100644 --- a/crates/audit/src/registry.rs +++ b/crates/audit/src/registry.rs @@ -14,7 +14,9 @@ use crate::{ AuditEntry, AuditError, AuditResult, - factory::{MQTTTargetFactory, NATSTargetFactory, PulsarTargetFactory, TargetFactory, WebhookTargetFactory}, + factory::{ + KafkaTargetFactory, MQTTTargetFactory, NATSTargetFactory, PulsarTargetFactory, TargetFactory, WebhookTargetFactory, + }, }; use futures::StreamExt; use futures::stream::FuturesUnordered; @@ -53,6 +55,7 @@ impl AuditRegistry { registry.register(ChannelTargetType::Mqtt.as_str(), Box::new(MQTTTargetFactory)); registry.register(ChannelTargetType::Nats.as_str(), Box::new(NATSTargetFactory)); registry.register(ChannelTargetType::Pulsar.as_str(), Box::new(PulsarTargetFactory)); + registry.register(ChannelTargetType::Kafka.as_str(), Box::new(KafkaTargetFactory)); registry } diff --git a/crates/config/src/audit/kafka.rs b/crates/config/src/audit/kafka.rs new file mode 100644 index 000000000..b67d566a2 --- /dev/null +++ b/crates/config/src/audit/kafka.rs @@ -0,0 +1,53 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Kafka Environment Variables +pub const ENV_AUDIT_KAFKA_ENABLE: &str = "RUSTFS_AUDIT_KAFKA_ENABLE"; +pub const ENV_AUDIT_KAFKA_BROKERS: &str = "RUSTFS_AUDIT_KAFKA_BROKERS"; +pub const ENV_AUDIT_KAFKA_TOPIC: &str = "RUSTFS_AUDIT_KAFKA_TOPIC"; +pub const ENV_AUDIT_KAFKA_ACKS: &str = "RUSTFS_AUDIT_KAFKA_ACKS"; +pub const ENV_AUDIT_KAFKA_TLS_ENABLE: &str = "RUSTFS_AUDIT_KAFKA_TLS_ENABLE"; +pub const ENV_AUDIT_KAFKA_TLS_CA: &str = "RUSTFS_AUDIT_KAFKA_TLS_CA"; +pub const ENV_AUDIT_KAFKA_TLS_CLIENT_CERT: &str = "RUSTFS_AUDIT_KAFKA_TLS_CLIENT_CERT"; +pub const ENV_AUDIT_KAFKA_TLS_CLIENT_KEY: &str = "RUSTFS_AUDIT_KAFKA_TLS_CLIENT_KEY"; +pub const ENV_AUDIT_KAFKA_QUEUE_DIR: &str = "RUSTFS_AUDIT_KAFKA_QUEUE_DIR"; +pub const ENV_AUDIT_KAFKA_QUEUE_LIMIT: &str = "RUSTFS_AUDIT_KAFKA_QUEUE_LIMIT"; + +pub const ENV_AUDIT_KAFKA_KEYS: &[&str; 10] = &[ + ENV_AUDIT_KAFKA_ENABLE, + ENV_AUDIT_KAFKA_BROKERS, + ENV_AUDIT_KAFKA_TOPIC, + ENV_AUDIT_KAFKA_ACKS, + ENV_AUDIT_KAFKA_TLS_ENABLE, + ENV_AUDIT_KAFKA_TLS_CA, + ENV_AUDIT_KAFKA_TLS_CLIENT_CERT, + ENV_AUDIT_KAFKA_TLS_CLIENT_KEY, + ENV_AUDIT_KAFKA_QUEUE_DIR, + ENV_AUDIT_KAFKA_QUEUE_LIMIT, +]; + +/// A list of all valid configuration keys for a Kafka audit target. +pub const AUDIT_KAFKA_KEYS: &[&str] = &[ + crate::ENABLE_KEY, + crate::KAFKA_BROKERS, + crate::KAFKA_TOPIC, + crate::KAFKA_ACKS, + crate::KAFKA_TLS_ENABLE, + crate::KAFKA_TLS_CA, + crate::KAFKA_TLS_CLIENT_CERT, + crate::KAFKA_TLS_CLIENT_KEY, + crate::KAFKA_QUEUE_DIR, + crate::KAFKA_QUEUE_LIMIT, + crate::COMMENT_KEY, +]; diff --git a/crates/config/src/audit/mod.rs b/crates/config/src/audit/mod.rs index a752e6c0c..9390ec854 100644 --- a/crates/config/src/audit/mod.rs +++ b/crates/config/src/audit/mod.rs @@ -16,11 +16,13 @@ //! This module defines the configuration for audit systems, including //! webhook and MQTT audit-related settings. +mod kafka; mod mqtt; mod nats; mod pulsar; mod webhook; +pub use kafka::*; pub use mqtt::*; pub use nats::*; pub use pulsar::*; @@ -33,6 +35,7 @@ pub const AUDIT_PREFIX: &str = "audit"; pub const AUDIT_ROUTE_PREFIX: &str = const_str::concat!(AUDIT_PREFIX, DEFAULT_DELIMITER); pub const AUDIT_WEBHOOK_SUB_SYS: &str = "audit_webhook"; +pub const AUDIT_KAFKA_SUB_SYS: &str = "audit_kafka"; pub const AUDIT_MQTT_SUB_SYS: &str = "audit_mqtt"; pub const AUDIT_NATS_SUB_SYS: &str = "audit_nats"; pub const AUDIT_PULSAR_SUB_SYS: &str = "audit_pulsar"; @@ -40,6 +43,7 @@ pub const AUDIT_PULSAR_SUB_SYS: &str = "audit_pulsar"; pub const AUDIT_STORE_EXTENSION: &str = ".audit"; #[allow(dead_code)] pub const AUDIT_SUB_SYSTEMS: &[&str] = &[ + AUDIT_KAFKA_SUB_SYS, AUDIT_MQTT_SUB_SYS, AUDIT_NATS_SUB_SYS, AUDIT_PULSAR_SUB_SYS, diff --git a/crates/config/src/constants/targets.rs b/crates/config/src/constants/targets.rs index 0daedc358..f33a14a08 100644 --- a/crates/config/src/constants/targets.rs +++ b/crates/config/src/constants/targets.rs @@ -40,6 +40,15 @@ pub const MQTT_TLS_CLIENT_CERT: &str = "tls_client_cert"; pub const MQTT_TLS_CLIENT_KEY: &str = "tls_client_key"; pub const MQTT_TLS_TRUST_LEAF_AS_CA: &str = "tls_trust_leaf_as_ca"; pub const MQTT_WS_PATH_ALLOWLIST: &str = "ws_path_allowlist"; +pub const KAFKA_BROKERS: &str = "brokers"; +pub const KAFKA_TOPIC: &str = "topic"; +pub const KAFKA_ACKS: &str = "acks"; +pub const KAFKA_QUEUE_DIR: &str = "queue_dir"; +pub const KAFKA_QUEUE_LIMIT: &str = "queue_limit"; +pub const KAFKA_TLS_ENABLE: &str = "tls_enable"; +pub const KAFKA_TLS_CA: &str = "tls_ca"; +pub const KAFKA_TLS_CLIENT_CERT: &str = "tls_client_cert"; +pub const KAFKA_TLS_CLIENT_KEY: &str = "tls_client_key"; pub const NATS_ADDRESS: &str = "address"; pub const NATS_SUBJECT: &str = "subject"; diff --git a/crates/config/src/notify/kafka.rs b/crates/config/src/notify/kafka.rs new file mode 100644 index 000000000..e8112096e --- /dev/null +++ b/crates/config/src/notify/kafka.rs @@ -0,0 +1,53 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// A list of all valid configuration keys for a Kafka target. +pub const NOTIFY_KAFKA_KEYS: &[&str] = &[ + crate::ENABLE_KEY, + crate::KAFKA_BROKERS, + crate::KAFKA_TOPIC, + crate::KAFKA_ACKS, + crate::KAFKA_TLS_ENABLE, + crate::KAFKA_TLS_CA, + crate::KAFKA_TLS_CLIENT_CERT, + crate::KAFKA_TLS_CLIENT_KEY, + crate::KAFKA_QUEUE_DIR, + crate::KAFKA_QUEUE_LIMIT, + crate::COMMENT_KEY, +]; + +// Kafka Environment Variables +pub const ENV_NOTIFY_KAFKA_ENABLE: &str = "RUSTFS_NOTIFY_KAFKA_ENABLE"; +pub const ENV_NOTIFY_KAFKA_BROKERS: &str = "RUSTFS_NOTIFY_KAFKA_BROKERS"; +pub const ENV_NOTIFY_KAFKA_TOPIC: &str = "RUSTFS_NOTIFY_KAFKA_TOPIC"; +pub const ENV_NOTIFY_KAFKA_ACKS: &str = "RUSTFS_NOTIFY_KAFKA_ACKS"; +pub const ENV_NOTIFY_KAFKA_TLS_ENABLE: &str = "RUSTFS_NOTIFY_KAFKA_TLS_ENABLE"; +pub const ENV_NOTIFY_KAFKA_TLS_CA: &str = "RUSTFS_NOTIFY_KAFKA_TLS_CA"; +pub const ENV_NOTIFY_KAFKA_TLS_CLIENT_CERT: &str = "RUSTFS_NOTIFY_KAFKA_TLS_CLIENT_CERT"; +pub const ENV_NOTIFY_KAFKA_TLS_CLIENT_KEY: &str = "RUSTFS_NOTIFY_KAFKA_TLS_CLIENT_KEY"; +pub const ENV_NOTIFY_KAFKA_QUEUE_DIR: &str = "RUSTFS_NOTIFY_KAFKA_QUEUE_DIR"; +pub const ENV_NOTIFY_KAFKA_QUEUE_LIMIT: &str = "RUSTFS_NOTIFY_KAFKA_QUEUE_LIMIT"; + +pub const ENV_NOTIFY_KAFKA_KEYS: &[&str; 10] = &[ + ENV_NOTIFY_KAFKA_ENABLE, + ENV_NOTIFY_KAFKA_BROKERS, + ENV_NOTIFY_KAFKA_TOPIC, + ENV_NOTIFY_KAFKA_ACKS, + ENV_NOTIFY_KAFKA_TLS_ENABLE, + ENV_NOTIFY_KAFKA_TLS_CA, + ENV_NOTIFY_KAFKA_TLS_CLIENT_CERT, + ENV_NOTIFY_KAFKA_TLS_CLIENT_KEY, + ENV_NOTIFY_KAFKA_QUEUE_DIR, + ENV_NOTIFY_KAFKA_QUEUE_LIMIT, +]; diff --git a/crates/config/src/notify/mod.rs b/crates/config/src/notify/mod.rs index 26a3f1778..7a96dfebb 100644 --- a/crates/config/src/notify/mod.rs +++ b/crates/config/src/notify/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod arn; +mod kafka; mod mqtt; mod nats; mod pulsar; @@ -20,6 +21,7 @@ mod store; mod webhook; pub use arn::*; +pub use kafka::*; pub use mqtt::*; pub use nats::*; pub use pulsar::*; @@ -69,13 +71,13 @@ pub const DEFAULT_NOTIFY_SEND_CONCURRENCY: usize = 64; #[allow(dead_code)] pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[ + NOTIFY_KAFKA_SUB_SYS, NOTIFY_MQTT_SUB_SYS, NOTIFY_NATS_SUB_SYS, NOTIFY_PULSAR_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, ]; -#[allow(dead_code)] pub const NOTIFY_KAFKA_SUB_SYS: &str = "notify_kafka"; pub const NOTIFY_MQTT_SUB_SYS: &str = "notify_mqtt"; #[allow(dead_code)] diff --git a/crates/ecstore/src/config/audit.rs b/crates/ecstore/src/config/audit.rs index de1e86e8d..c55e90700 100644 --- a/crates/ecstore/src/config/audit.rs +++ b/crates/ecstore/src/config/audit.rs @@ -14,15 +14,16 @@ use crate::config::{KV, KVS}; use rustfs_config::{ - COMMENT_KEY, DEFAULT_LIMIT, ENABLE_KEY, EVENT_DEFAULT_DIR, EnableState, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, - MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TLS_CA, MQTT_TLS_CLIENT_CERT, MQTT_TLS_CLIENT_KEY, - MQTT_TLS_POLICY, MQTT_TLS_TRUST_LEAF_AS_CA, MQTT_TOPIC, MQTT_USERNAME, MQTT_WS_PATH_ALLOWLIST, NATS_ADDRESS, - NATS_CREDENTIALS_FILE, NATS_PASSWORD, NATS_QUEUE_DIR, NATS_QUEUE_LIMIT, NATS_SUBJECT, NATS_TLS_CA, NATS_TLS_CLIENT_CERT, - NATS_TLS_CLIENT_KEY, NATS_TLS_REQUIRED, NATS_TOKEN, NATS_USERNAME, PULSAR_AUTH_TOKEN, PULSAR_BROKER, PULSAR_PASSWORD, - PULSAR_QUEUE_DIR, PULSAR_QUEUE_LIMIT, PULSAR_TLS_ALLOW_INSECURE, PULSAR_TLS_CA, PULSAR_TLS_HOSTNAME_VERIFICATION, - PULSAR_TOPIC, PULSAR_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_BATCH_SIZE, WEBHOOK_CLIENT_CA, WEBHOOK_CLIENT_CERT, - WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_HTTP_TIMEOUT, WEBHOOK_MAX_RETRY, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, - WEBHOOK_RETRY_INTERVAL, WEBHOOK_SKIP_TLS_VERIFY, + COMMENT_KEY, DEFAULT_LIMIT, ENABLE_KEY, EVENT_DEFAULT_DIR, EnableState, KAFKA_ACKS, KAFKA_BROKERS, KAFKA_QUEUE_DIR, + KAFKA_QUEUE_LIMIT, KAFKA_TLS_CA, KAFKA_TLS_CLIENT_CERT, KAFKA_TLS_CLIENT_KEY, KAFKA_TLS_ENABLE, KAFKA_TOPIC, MQTT_BROKER, + MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TLS_CA, + MQTT_TLS_CLIENT_CERT, MQTT_TLS_CLIENT_KEY, MQTT_TLS_POLICY, MQTT_TLS_TRUST_LEAF_AS_CA, MQTT_TOPIC, MQTT_USERNAME, + MQTT_WS_PATH_ALLOWLIST, NATS_ADDRESS, NATS_CREDENTIALS_FILE, NATS_PASSWORD, NATS_QUEUE_DIR, NATS_QUEUE_LIMIT, NATS_SUBJECT, + NATS_TLS_CA, NATS_TLS_CLIENT_CERT, NATS_TLS_CLIENT_KEY, NATS_TLS_REQUIRED, NATS_TOKEN, NATS_USERNAME, PULSAR_AUTH_TOKEN, + PULSAR_BROKER, PULSAR_PASSWORD, PULSAR_QUEUE_DIR, PULSAR_QUEUE_LIMIT, PULSAR_TLS_ALLOW_INSECURE, PULSAR_TLS_CA, + PULSAR_TLS_HOSTNAME_VERIFICATION, PULSAR_TOPIC, PULSAR_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_BATCH_SIZE, WEBHOOK_CLIENT_CA, + WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_HTTP_TIMEOUT, WEBHOOK_MAX_RETRY, WEBHOOK_QUEUE_DIR, + WEBHOOK_QUEUE_LIMIT, WEBHOOK_RETRY_INTERVAL, WEBHOOK_SKIP_TLS_VERIFY, }; use std::sync::LazyLock; @@ -337,3 +338,63 @@ pub static DEFAULT_AUDIT_PULSAR_KVS: LazyLock = LazyLock::new(|| { }, ]) }); + +pub static DEFAULT_AUDIT_KAFKA_KVS: LazyLock = LazyLock::new(|| { + KVS(vec![ + KV { + key: ENABLE_KEY.to_owned(), + value: EnableState::Off.to_string(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_BROKERS.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_TOPIC.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_ACKS.to_owned(), + value: "1".to_owned(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_TLS_ENABLE.to_owned(), + value: EnableState::Off.to_string(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_TLS_CA.to_owned(), + value: "".to_owned(), + hidden_if_empty: true, + }, + KV { + key: KAFKA_TLS_CLIENT_CERT.to_owned(), + value: "".to_owned(), + hidden_if_empty: true, + }, + KV { + key: KAFKA_TLS_CLIENT_KEY.to_owned(), + value: "".to_owned(), + hidden_if_empty: true, + }, + KV { + key: KAFKA_QUEUE_DIR.to_owned(), + value: EVENT_DEFAULT_DIR.to_owned(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_QUEUE_LIMIT.to_owned(), + value: DEFAULT_LIMIT.to_string(), + hidden_if_empty: false, + }, + KV { + key: COMMENT_KEY.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + ]) +}); diff --git a/crates/ecstore/src/config/com.rs b/crates/ecstore/src/config/com.rs index cc3c0de25..456a73051 100644 --- a/crates/ecstore/src/config/com.rs +++ b/crates/ecstore/src/config/com.rs @@ -19,12 +19,12 @@ use crate::global::is_first_cluster_node_local; use crate::store_api::{ObjectInfo, ObjectOptions, PutObjReader, StorageAPI}; use http::HeaderMap; use rustfs_config::audit::{ - AUDIT_MQTT_KEYS, AUDIT_MQTT_SUB_SYS, AUDIT_NATS_KEYS, AUDIT_NATS_SUB_SYS, AUDIT_PULSAR_KEYS, AUDIT_PULSAR_SUB_SYS, - AUDIT_WEBHOOK_KEYS, AUDIT_WEBHOOK_SUB_SYS, + AUDIT_KAFKA_KEYS, AUDIT_KAFKA_SUB_SYS, AUDIT_MQTT_KEYS, AUDIT_MQTT_SUB_SYS, AUDIT_NATS_KEYS, AUDIT_NATS_SUB_SYS, + AUDIT_PULSAR_KEYS, AUDIT_PULSAR_SUB_SYS, AUDIT_WEBHOOK_KEYS, AUDIT_WEBHOOK_SUB_SYS, }; use rustfs_config::notify::{ - NOTIFY_MQTT_KEYS, NOTIFY_MQTT_SUB_SYS, NOTIFY_NATS_KEYS, NOTIFY_NATS_SUB_SYS, NOTIFY_PULSAR_KEYS, NOTIFY_PULSAR_SUB_SYS, - NOTIFY_WEBHOOK_KEYS, NOTIFY_WEBHOOK_SUB_SYS, + NOTIFY_KAFKA_KEYS, NOTIFY_KAFKA_SUB_SYS, NOTIFY_MQTT_KEYS, NOTIFY_MQTT_SUB_SYS, NOTIFY_NATS_KEYS, NOTIFY_NATS_SUB_SYS, + NOTIFY_PULSAR_KEYS, NOTIFY_PULSAR_SUB_SYS, NOTIFY_WEBHOOK_KEYS, NOTIFY_WEBHOOK_SUB_SYS, }; use rustfs_config::oidc::{IDENTITY_OPENID_KEYS, IDENTITY_OPENID_SUB_SYS, OIDC_REDIRECT_URI_DYNAMIC}; use rustfs_config::{COMMENT_KEY, DEFAULT_DELIMITER, ENABLE_KEY, EnableState, RUSTFS_REGION}; @@ -58,7 +58,7 @@ struct TargetConfigDescriptor { valid_keys: &'static [&'static str], } -fn notify_target_descriptors() -> [TargetConfigDescriptor; 4] { +fn notify_target_descriptors() -> [TargetConfigDescriptor; 5] { [ TargetConfigDescriptor { external_key: "webhook", @@ -66,6 +66,12 @@ fn notify_target_descriptors() -> [TargetConfigDescriptor; 4] { default_kvs: ¬ify::DEFAULT_NOTIFY_WEBHOOK_KVS, valid_keys: NOTIFY_WEBHOOK_KEYS, }, + TargetConfigDescriptor { + external_key: "kafka", + subsystem_key: NOTIFY_KAFKA_SUB_SYS, + default_kvs: ¬ify::DEFAULT_NOTIFY_KAFKA_KVS, + valid_keys: NOTIFY_KAFKA_KEYS, + }, TargetConfigDescriptor { external_key: "mqtt", subsystem_key: NOTIFY_MQTT_SUB_SYS, @@ -87,7 +93,7 @@ fn notify_target_descriptors() -> [TargetConfigDescriptor; 4] { ] } -fn audit_target_descriptors() -> [TargetConfigDescriptor; 4] { +fn audit_target_descriptors() -> [TargetConfigDescriptor; 5] { [ TargetConfigDescriptor { external_key: "webhook", @@ -95,6 +101,12 @@ fn audit_target_descriptors() -> [TargetConfigDescriptor; 4] { default_kvs: &audit::DEFAULT_AUDIT_WEBHOOK_KVS, valid_keys: AUDIT_WEBHOOK_KEYS, }, + TargetConfigDescriptor { + external_key: "kafka", + subsystem_key: AUDIT_KAFKA_SUB_SYS, + default_kvs: &audit::DEFAULT_AUDIT_KAFKA_KVS, + valid_keys: AUDIT_KAFKA_KEYS, + }, TargetConfigDescriptor { external_key: "mqtt", subsystem_key: AUDIT_MQTT_SUB_SYS, @@ -335,7 +347,7 @@ fn apply_external_oidc_map(cfg: &mut Config, root: &Map) -> bool applied } -fn parse_notify_scalar_value(key: &str, value: &Value) -> Option { +fn parse_target_scalar_value(key: &str, value: &Value) -> Option { match value { Value::String(v) => Some(v.trim().to_string()), Value::Bool(v) if key == ENABLE_KEY || key == rustfs_config::WEBHOOK_SKIP_TLS_VERIFY => Some(if *v { @@ -350,7 +362,7 @@ fn parse_notify_scalar_value(key: &str, value: &Value) -> Option { } } -fn decode_notify_instance_object(instance: &Map, valid_keys: &[&str]) -> KVS { +fn decode_target_instance_object(instance: &Map, valid_keys: &[&str]) -> KVS { let mut kvs = KVS::new(); for (key, value) in instance { @@ -358,7 +370,7 @@ fn decode_notify_instance_object(instance: &Map, valid_keys: &[&s continue; } - if let Some(parsed) = parse_notify_scalar_value(key, value) { + if let Some(parsed) = parse_target_scalar_value(key, value) { kvs.insert(key.clone(), parsed); } } @@ -366,21 +378,21 @@ fn decode_notify_instance_object(instance: &Map, valid_keys: &[&s kvs } -fn decode_notify_instance_value(value: &Value, valid_keys: &[&str]) -> Option { +fn decode_target_instance_value(value: &Value, valid_keys: &[&str]) -> Option { match value { - Value::Object(instance) => Some(decode_notify_instance_object(instance, valid_keys)), + Value::Object(instance) => Some(decode_target_instance_object(instance, valid_keys)), Value::Array(_) => serde_json::from_value::(value.clone()).ok(), _ => None, } } -fn is_notify_instance_shorthand(section: &Map, valid_keys: &[&str]) -> bool { +fn is_target_instance_shorthand(section: &Map, valid_keys: &[&str]) -> bool { section .iter() - .any(|(key, value)| valid_keys.contains(&key.as_str()) && parse_notify_scalar_value(key, value).is_some()) + .any(|(key, value)| valid_keys.contains(&key.as_str()) && parse_target_scalar_value(key, value).is_some()) } -fn apply_external_notify_section( +fn apply_external_target_section( cfg: &mut Config, notify_obj: &Map, external_key: &str, @@ -399,8 +411,8 @@ fn apply_external_notify_section( let subsystem = cfg.0.entry(subsystem_key.to_string()).or_default(); let mut applied = false; - if is_notify_instance_shorthand(section_obj, valid_keys) { - let kvs = decode_notify_instance_object(section_obj, valid_keys); + if is_target_instance_shorthand(section_obj, valid_keys) { + let kvs = decode_target_instance_object(section_obj, valid_keys); if !kvs.is_empty() { let mut merged = default_kvs.clone(); merged.extend(kvs); @@ -411,7 +423,7 @@ fn apply_external_notify_section( } for (raw_instance, value) in section_obj { - let Some(mut kvs) = decode_notify_instance_value(value, valid_keys) else { + let Some(mut kvs) = decode_target_instance_value(value, valid_keys) else { continue; }; if kvs.is_empty() { @@ -444,7 +456,7 @@ fn apply_external_target_descriptors( ) -> bool { let mut applied = false; for descriptor in descriptors { - applied |= apply_external_notify_section( + applied |= apply_external_target_section( cfg, section_obj, descriptor.external_key, @@ -663,11 +675,12 @@ fn build_semantic_oidc_object(cfg: &Config) -> Map { oidc_obj } -fn is_notify_bool_key(key: &str) -> bool { +fn is_target_bool_key(key: &str) -> bool { matches!( key, ENABLE_KEY | rustfs_config::WEBHOOK_SKIP_TLS_VERIFY + | rustfs_config::KAFKA_TLS_ENABLE | rustfs_config::MQTT_TLS_TRUST_LEAF_AS_CA | rustfs_config::NATS_TLS_REQUIRED | rustfs_config::PULSAR_TLS_ALLOW_INSECURE @@ -675,8 +688,8 @@ fn is_notify_bool_key(key: &str) -> bool { ) } -fn encode_notify_scalar_value(key: &str, value: &str) -> Value { - if is_notify_bool_key(key) { +fn encode_target_scalar_value(key: &str, value: &str) -> Value { + if is_target_bool_key(key) { if let Ok(state) = value.parse::() { return Value::Bool(state.is_enabled()); } @@ -697,7 +710,7 @@ fn is_hidden_if_empty(default_kvs: &KVS, key: &str) -> bool { .unwrap_or(false) } -fn build_notify_instance_diff_object(kvs: &KVS, baseline: &KVS, valid_keys: &[&str], default_kvs: &KVS) -> Map { +fn build_target_instance_diff_object(kvs: &KVS, baseline: &KVS, valid_keys: &[&str], default_kvs: &KVS) -> Map { let mut instance = Map::new(); for key in valid_keys { @@ -720,13 +733,13 @@ fn build_notify_instance_diff_object(kvs: &KVS, baseline: &KVS, valid_keys: &[&s continue; } - instance.insert((*key).to_string(), encode_notify_scalar_value(key, &effective_value)); + instance.insert((*key).to_string(), encode_target_scalar_value(key, &effective_value)); } instance } -fn merged_notify_default_kvs(subsystem: &HashMap, default_kvs: &KVS) -> KVS { +fn merged_target_default_kvs(subsystem: &HashMap, default_kvs: &KVS) -> KVS { let mut merged = default_kvs.clone(); if let Some(kvs) = subsystem.get(DEFAULT_DELIMITER) { merged.extend(kvs.clone()); @@ -734,7 +747,7 @@ fn merged_notify_default_kvs(subsystem: &HashMap, default_kvs: &KVS merged } -fn build_notify_subsystem_object( +fn build_target_subsystem_object( cfg: &Config, subsystem_key: &str, default_kvs: &KVS, @@ -744,11 +757,11 @@ fn build_notify_subsystem_object( return Map::new(); }; - let effective_default = merged_notify_default_kvs(subsystem, default_kvs); + let effective_default = merged_target_default_kvs(subsystem, default_kvs); let mut subsystem_obj = Map::new(); if let Some(default_instance) = subsystem.get(DEFAULT_DELIMITER) { - let default_obj = build_notify_instance_diff_object(default_instance, default_kvs, valid_keys, default_kvs); + let default_obj = build_target_instance_diff_object(default_instance, default_kvs, valid_keys, default_kvs); if !default_obj.is_empty() { subsystem_obj.insert("default".to_string(), Value::Object(default_obj)); } @@ -761,7 +774,7 @@ fn build_notify_subsystem_object( instances.sort_by_key(|(lhs, _)| *lhs); for (instance_key, kvs) in instances { - let instance_obj = build_notify_instance_diff_object(kvs, &effective_default, valid_keys, default_kvs); + let instance_obj = build_target_instance_diff_object(kvs, &effective_default, valid_keys, default_kvs); if !instance_obj.is_empty() { subsystem_obj.insert(instance_key.clone(), Value::Object(instance_obj)); } @@ -774,7 +787,7 @@ fn build_target_object(cfg: &Config, descriptors: &[TargetConfigDescriptor]) -> let mut target_obj = Map::new(); for descriptor in descriptors { let subsystem_obj = - build_notify_subsystem_object(cfg, descriptor.subsystem_key, descriptor.default_kvs, descriptor.valid_keys); + build_target_subsystem_object(cfg, descriptor.subsystem_key, descriptor.default_kvs, descriptor.valid_keys); if !subsystem_obj.is_empty() { target_obj.insert(descriptor.external_key.to_string(), Value::Object(subsystem_obj)); } @@ -1118,8 +1131,8 @@ mod tests { ObjectOptions, ObjectToDelete, PartInfo, PutObjReader, StorageAPI, WalkOptions, }; use http::HeaderMap; - use rustfs_config::audit::{AUDIT_MQTT_SUB_SYS, AUDIT_WEBHOOK_SUB_SYS}; - use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS}; + use rustfs_config::audit::{AUDIT_KAFKA_SUB_SYS, AUDIT_MQTT_SUB_SYS, AUDIT_WEBHOOK_SUB_SYS}; + use rustfs_config::notify::{NOTIFY_KAFKA_SUB_SYS, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS}; use rustfs_config::oidc::IDENTITY_OPENID_SUB_SYS; use rustfs_config::{DEFAULT_DELIMITER, ENABLE_KEY, EnableState}; use rustfs_filemeta::FileInfo; @@ -1753,6 +1766,15 @@ mod tests { "topic":"events", "queue_dir":"" } + }, + "kafka":{ + "streaming":{ + "enable":true, + "brokers":"127.0.0.1:9092,127.0.0.1:9093", + "topic":"events-kafka", + "acks":"all", + "tls_enable":true + } } } }"#; @@ -1781,6 +1803,14 @@ mod tests { .expect("mqtt target should be decoded"); assert_eq!(mqtt.get(rustfs_config::MQTT_BROKER), "tcp://127.0.0.1:1883"); assert_eq!(mqtt.get(rustfs_config::MQTT_QUEUE_DIR), ""); + + let kafka = cfg + .get_value(NOTIFY_KAFKA_SUB_SYS, "streaming") + .expect("kafka target should be decoded"); + assert_eq!(kafka.get(rustfs_config::KAFKA_BROKERS), "127.0.0.1:9092,127.0.0.1:9093"); + assert_eq!(kafka.get(rustfs_config::KAFKA_TOPIC), "events-kafka"); + assert_eq!(kafka.get(rustfs_config::KAFKA_ACKS), "all"); + assert_eq!(kafka.get(rustfs_config::KAFKA_TLS_ENABLE), "true"); } #[test] @@ -1850,6 +1880,14 @@ mod tests { "broker":"tcp://127.0.0.1:1883", "topic":"audit-events" } + }, + "kafka":{ + "auditlog":{ + "enable":true, + "brokers":"127.0.0.1:9092", + "topic":"audit-events-kafka", + "acks":"1" + } } } }"#; @@ -1873,6 +1911,12 @@ mod tests { .get_value(AUDIT_MQTT_SUB_SYS, "analytics") .expect("audit mqtt target should be decoded"); assert_eq!(mqtt.get(rustfs_config::MQTT_BROKER), "tcp://127.0.0.1:1883"); + + let kafka = cfg + .get_value(AUDIT_KAFKA_SUB_SYS, "auditlog") + .expect("audit kafka target should be decoded"); + assert_eq!(kafka.get(rustfs_config::KAFKA_BROKERS), "127.0.0.1:9092"); + assert_eq!(kafka.get(rustfs_config::KAFKA_TOPIC), "audit-events-kafka"); } #[test] @@ -1993,6 +2037,38 @@ mod tests { ); cfg.0.insert(NOTIFY_MQTT_SUB_SYS.to_string(), mqtt_section); + let mut kafka_default = notify::DEFAULT_NOTIFY_KAFKA_KVS.clone(); + kafka_default.insert(ENABLE_KEY.to_string(), EnableState::On.to_string()); + kafka_default.insert(rustfs_config::KAFKA_TOPIC.to_string(), "events-kafka".to_string()); + let mut kafka_section = std::collections::HashMap::new(); + kafka_section.insert(DEFAULT_DELIMITER.to_string(), kafka_default); + kafka_section.insert( + "streaming".to_string(), + crate::config::KVS(vec![ + crate::config::KV { + key: ENABLE_KEY.to_string(), + value: EnableState::On.to_string(), + hidden_if_empty: false, + }, + crate::config::KV { + key: rustfs_config::KAFKA_BROKERS.to_string(), + value: "127.0.0.1:9092,127.0.0.1:9093".to_string(), + hidden_if_empty: false, + }, + crate::config::KV { + key: rustfs_config::KAFKA_ACKS.to_string(), + value: "all".to_string(), + hidden_if_empty: false, + }, + crate::config::KV { + key: rustfs_config::KAFKA_TLS_ENABLE.to_string(), + value: EnableState::On.to_string(), + hidden_if_empty: false, + }, + ]), + ); + cfg.0.insert(NOTIFY_KAFKA_SUB_SYS.to_string(), kafka_section); + let out = encode_server_config_blob(&cfg, None).expect("encode should succeed"); let v: Value = serde_json::from_slice(&out).expect("output should be json"); let notify = v @@ -2028,6 +2104,19 @@ mod tests { .expect("mqtt target should be encoded"); assert_eq!(mqtt.get(rustfs_config::MQTT_BROKER).and_then(Value::as_str), Some("tcp://127.0.0.1:1883")); assert_eq!(mqtt.get(rustfs_config::MQTT_QUEUE_DIR).and_then(Value::as_str), Some("")); + + let kafka = notify + .get("kafka") + .and_then(Value::as_object) + .and_then(|targets| targets.get("streaming")) + .and_then(Value::as_object) + .expect("kafka target should be encoded"); + assert_eq!( + kafka.get(rustfs_config::KAFKA_BROKERS).and_then(Value::as_str), + Some("127.0.0.1:9092,127.0.0.1:9093") + ); + assert_eq!(kafka.get(rustfs_config::KAFKA_ACKS).and_then(Value::as_str), Some("all")); + assert_eq!(kafka.get(rustfs_config::KAFKA_TLS_ENABLE).and_then(Value::as_bool), Some(true)); } #[test] @@ -2079,6 +2168,28 @@ mod tests { ); cfg.0.insert(AUDIT_MQTT_SUB_SYS.to_string(), mqtt_section); + let mut kafka_default = audit::DEFAULT_AUDIT_KAFKA_KVS.clone(); + kafka_default.insert(ENABLE_KEY.to_string(), EnableState::On.to_string()); + kafka_default.insert(rustfs_config::KAFKA_TOPIC.to_string(), "audit-events-kafka".to_string()); + let mut kafka_section = std::collections::HashMap::new(); + kafka_section.insert(DEFAULT_DELIMITER.to_string(), kafka_default); + kafka_section.insert( + "auditlog".to_string(), + crate::config::KVS(vec![ + crate::config::KV { + key: ENABLE_KEY.to_string(), + value: EnableState::On.to_string(), + hidden_if_empty: false, + }, + crate::config::KV { + key: rustfs_config::KAFKA_BROKERS.to_string(), + value: "127.0.0.1:9092".to_string(), + hidden_if_empty: false, + }, + ]), + ); + cfg.0.insert(AUDIT_KAFKA_SUB_SYS.to_string(), kafka_section); + let out = encode_server_config_blob(&cfg, None).expect("encode should succeed"); let v: Value = serde_json::from_slice(&out).expect("output should be json"); let logger = v @@ -2105,6 +2216,14 @@ mod tests { .expect("audit mqtt default should be encoded"); assert_eq!(mqtt_default.get(ENABLE_KEY).and_then(Value::as_bool), Some(true)); assert_eq!(mqtt_default.get(rustfs_config::MQTT_TOPIC).and_then(Value::as_str), Some("audit-events")); + + let kafka = logger + .get("kafka") + .and_then(Value::as_object) + .and_then(|targets| targets.get("auditlog")) + .and_then(Value::as_object) + .expect("audit kafka target should be encoded"); + assert_eq!(kafka.get(rustfs_config::KAFKA_BROKERS).and_then(Value::as_str), Some("127.0.0.1:9092")); } #[test] diff --git a/crates/ecstore/src/config/mod.rs b/crates/ecstore/src/config/mod.rs index f505a2670..09975bb8b 100644 --- a/crates/ecstore/src/config/mod.rs +++ b/crates/ecstore/src/config/mod.rs @@ -25,8 +25,12 @@ use crate::store::ECStore; use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate}; use rustfs_config::COMMENT_KEY; use rustfs_config::DEFAULT_DELIMITER; -use rustfs_config::audit::{AUDIT_MQTT_SUB_SYS, AUDIT_NATS_SUB_SYS, AUDIT_PULSAR_SUB_SYS, AUDIT_WEBHOOK_SUB_SYS}; -use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_NATS_SUB_SYS, NOTIFY_PULSAR_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS}; +use rustfs_config::audit::{ + AUDIT_KAFKA_SUB_SYS, AUDIT_MQTT_SUB_SYS, AUDIT_NATS_SUB_SYS, AUDIT_PULSAR_SUB_SYS, AUDIT_WEBHOOK_SUB_SYS, +}; +use rustfs_config::notify::{ + NOTIFY_KAFKA_SUB_SYS, NOTIFY_MQTT_SUB_SYS, NOTIFY_NATS_SUB_SYS, NOTIFY_PULSAR_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, +}; use rustfs_config::oidc::IDENTITY_OPENID_SUB_SYS; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -245,6 +249,8 @@ pub fn init() { kvs.insert(AUDIT_NATS_SUB_SYS.to_owned(), audit::DEFAULT_AUDIT_NATS_KVS.clone()); kvs.insert(NOTIFY_PULSAR_SUB_SYS.to_owned(), notify::DEFAULT_NOTIFY_PULSAR_KVS.clone()); kvs.insert(AUDIT_PULSAR_SUB_SYS.to_owned(), audit::DEFAULT_AUDIT_PULSAR_KVS.clone()); + kvs.insert(NOTIFY_KAFKA_SUB_SYS.to_owned(), notify::DEFAULT_NOTIFY_KAFKA_KVS.clone()); + kvs.insert(AUDIT_KAFKA_SUB_SYS.to_owned(), audit::DEFAULT_AUDIT_KAFKA_KVS.clone()); kvs.insert(IDENTITY_OPENID_SUB_SYS.to_owned(), oidc::DEFAULT_IDENTITY_OPENID_KVS.clone()); // Register all default configurations diff --git a/crates/ecstore/src/config/notify.rs b/crates/ecstore/src/config/notify.rs index 7904ef077..d6fbdbe0b 100644 --- a/crates/ecstore/src/config/notify.rs +++ b/crates/ecstore/src/config/notify.rs @@ -14,14 +14,15 @@ use crate::config::{KV, KVS}; use rustfs_config::{ - COMMENT_KEY, DEFAULT_LIMIT, ENABLE_KEY, EVENT_DEFAULT_DIR, EnableState, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, - MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TLS_CA, MQTT_TLS_CLIENT_CERT, MQTT_TLS_CLIENT_KEY, - MQTT_TLS_POLICY, MQTT_TLS_TRUST_LEAF_AS_CA, MQTT_TOPIC, MQTT_USERNAME, MQTT_WS_PATH_ALLOWLIST, NATS_ADDRESS, - NATS_CREDENTIALS_FILE, NATS_PASSWORD, NATS_QUEUE_DIR, NATS_QUEUE_LIMIT, NATS_SUBJECT, NATS_TLS_CA, NATS_TLS_CLIENT_CERT, - NATS_TLS_CLIENT_KEY, NATS_TLS_REQUIRED, NATS_TOKEN, NATS_USERNAME, PULSAR_AUTH_TOKEN, PULSAR_BROKER, PULSAR_PASSWORD, - PULSAR_QUEUE_DIR, PULSAR_QUEUE_LIMIT, PULSAR_TLS_ALLOW_INSECURE, PULSAR_TLS_CA, PULSAR_TLS_HOSTNAME_VERIFICATION, - PULSAR_TOPIC, PULSAR_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CA, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, - WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, WEBHOOK_SKIP_TLS_VERIFY, + COMMENT_KEY, DEFAULT_LIMIT, ENABLE_KEY, EVENT_DEFAULT_DIR, EnableState, KAFKA_ACKS, KAFKA_BROKERS, KAFKA_QUEUE_DIR, + KAFKA_QUEUE_LIMIT, KAFKA_TLS_CA, KAFKA_TLS_CLIENT_CERT, KAFKA_TLS_CLIENT_KEY, KAFKA_TLS_ENABLE, KAFKA_TOPIC, MQTT_BROKER, + MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TLS_CA, + MQTT_TLS_CLIENT_CERT, MQTT_TLS_CLIENT_KEY, MQTT_TLS_POLICY, MQTT_TLS_TRUST_LEAF_AS_CA, MQTT_TOPIC, MQTT_USERNAME, + MQTT_WS_PATH_ALLOWLIST, NATS_ADDRESS, NATS_CREDENTIALS_FILE, NATS_PASSWORD, NATS_QUEUE_DIR, NATS_QUEUE_LIMIT, NATS_SUBJECT, + NATS_TLS_CA, NATS_TLS_CLIENT_CERT, NATS_TLS_CLIENT_KEY, NATS_TLS_REQUIRED, NATS_TOKEN, NATS_USERNAME, PULSAR_AUTH_TOKEN, + PULSAR_BROKER, PULSAR_PASSWORD, PULSAR_QUEUE_DIR, PULSAR_QUEUE_LIMIT, PULSAR_TLS_ALLOW_INSECURE, PULSAR_TLS_CA, + PULSAR_TLS_HOSTNAME_VERIFICATION, PULSAR_TOPIC, PULSAR_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CA, WEBHOOK_CLIENT_CERT, + WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, WEBHOOK_SKIP_TLS_VERIFY, }; use std::sync::LazyLock; @@ -314,3 +315,63 @@ pub static DEFAULT_NOTIFY_PULSAR_KVS: LazyLock = LazyLock::new(|| { }, ]) }); + +pub static DEFAULT_NOTIFY_KAFKA_KVS: LazyLock = LazyLock::new(|| { + KVS(vec![ + KV { + key: ENABLE_KEY.to_owned(), + value: EnableState::Off.to_string(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_BROKERS.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_TOPIC.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_ACKS.to_owned(), + value: "1".to_owned(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_TLS_ENABLE.to_owned(), + value: EnableState::Off.to_string(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_TLS_CA.to_owned(), + value: "".to_owned(), + hidden_if_empty: true, + }, + KV { + key: KAFKA_TLS_CLIENT_CERT.to_owned(), + value: "".to_owned(), + hidden_if_empty: true, + }, + KV { + key: KAFKA_TLS_CLIENT_KEY.to_owned(), + value: "".to_owned(), + hidden_if_empty: true, + }, + KV { + key: KAFKA_QUEUE_DIR.to_owned(), + value: EVENT_DEFAULT_DIR.to_owned(), + hidden_if_empty: false, + }, + KV { + key: KAFKA_QUEUE_LIMIT.to_owned(), + value: DEFAULT_LIMIT.to_string(), + hidden_if_empty: false, + }, + KV { + key: COMMENT_KEY.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + ]) +}); diff --git a/crates/notify/src/factory.rs b/crates/notify/src/factory.rs index 1688b7834..f9cdc9967 100644 --- a/crates/notify/src/factory.rs +++ b/crates/notify/src/factory.rs @@ -15,13 +15,13 @@ use crate::Event; use async_trait::async_trait; use rustfs_config::EVENT_DEFAULT_DIR; -use rustfs_config::notify::{NOTIFY_MQTT_KEYS, NOTIFY_NATS_KEYS, NOTIFY_PULSAR_KEYS, NOTIFY_WEBHOOK_KEYS}; +use rustfs_config::notify::{NOTIFY_KAFKA_KEYS, NOTIFY_MQTT_KEYS, NOTIFY_NATS_KEYS, NOTIFY_PULSAR_KEYS, NOTIFY_WEBHOOK_KEYS}; use rustfs_ecstore::config::KVS; use rustfs_targets::{ Target, config::{ - build_mqtt_args, build_nats_args, build_pulsar_args, build_webhook_args, validate_mqtt_config, validate_nats_config, - validate_pulsar_config, validate_webhook_config, + build_kafka_args, build_mqtt_args, build_nats_args, build_pulsar_args, build_webhook_args, validate_kafka_config, + validate_mqtt_config, validate_nats_config, validate_pulsar_config, validate_webhook_config, }, error::TargetError, target::TargetType, @@ -119,3 +119,22 @@ impl TargetFactory for PulsarTargetFactory { NOTIFY_PULSAR_KEYS.iter().map(|s| s.to_string()).collect() } } + +pub struct KafkaTargetFactory; + +#[async_trait] +impl TargetFactory for KafkaTargetFactory { + async fn create_target(&self, id: String, config: &KVS) -> Result + Send + Sync>, TargetError> { + let args = build_kafka_args(config, EVENT_DEFAULT_DIR, TargetType::NotifyEvent)?; + let target = rustfs_targets::target::kafka::KafkaTarget::new(id, args)?; + Ok(Box::new(target)) + } + + fn validate_config(&self, _id: &str, config: &KVS) -> Result<(), TargetError> { + validate_kafka_config(config, EVENT_DEFAULT_DIR) + } + + fn get_valid_fields(&self) -> HashSet { + NOTIFY_KAFKA_KEYS.iter().map(|s| s.to_string()).collect() + } +} diff --git a/crates/notify/src/integration.rs b/crates/notify/src/integration.rs index dd2ceea3a..5a3d9ebbe 100644 --- a/crates/notify/src/integration.rs +++ b/crates/notify/src/integration.rs @@ -24,8 +24,8 @@ use crate::{ }; use hashbrown::HashMap; use rustfs_config::notify::{ - DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY, ENV_NOTIFY_TARGET_STREAM_CONCURRENCY, NOTIFY_MQTT_SUB_SYS, NOTIFY_NATS_SUB_SYS, - NOTIFY_PULSAR_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, + DEFAULT_NOTIFY_TARGET_STREAM_CONCURRENCY, ENV_NOTIFY_TARGET_STREAM_CONCURRENCY, NOTIFY_KAFKA_SUB_SYS, NOTIFY_MQTT_SUB_SYS, + NOTIFY_NATS_SUB_SYS, NOTIFY_PULSAR_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, }; use rustfs_ecstore::config::{Config, KVS}; use rustfs_s3_common::EventName; @@ -45,6 +45,7 @@ const MAX_RECENT_LIVE_EVENTS: usize = 1024; fn subsystem_target_type(target_type: &str) -> &str { match target_type { NOTIFY_WEBHOOK_SUB_SYS => "webhook", + NOTIFY_KAFKA_SUB_SYS => "kafka", NOTIFY_MQTT_SUB_SYS => "mqtt", NOTIFY_NATS_SUB_SYS => "nats", NOTIFY_PULSAR_SUB_SYS => "pulsar", @@ -765,6 +766,13 @@ mod tests { assert_eq!(target_id.name, "mqtt"); } + #[test] + fn runtime_target_id_for_subsystem_maps_notify_kafka_to_runtime_type() { + let target_id = runtime_target_id_for_subsystem(NOTIFY_KAFKA_SUB_SYS, "EventBus"); + assert_eq!(target_id.id, "eventbus"); + assert_eq!(target_id.name, "kafka"); + } + #[test] fn runtime_target_id_for_subsystem_maps_notify_nats_to_runtime_type() { let target_id = runtime_target_id_for_subsystem(NOTIFY_NATS_SUB_SYS, "Bus"); diff --git a/crates/notify/src/registry.rs b/crates/notify/src/registry.rs index ec60b1861..edea07b89 100644 --- a/crates/notify/src/registry.rs +++ b/crates/notify/src/registry.rs @@ -13,7 +13,9 @@ // limitations under the License. use crate::Event; -use crate::factory::{MQTTTargetFactory, NATSTargetFactory, PulsarTargetFactory, TargetFactory, WebhookTargetFactory}; +use crate::factory::{ + KafkaTargetFactory, MQTTTargetFactory, NATSTargetFactory, PulsarTargetFactory, TargetFactory, WebhookTargetFactory, +}; use futures::stream::{FuturesUnordered, StreamExt}; use hashbrown::HashMap; use rustfs_config::notify::NOTIFY_ROUTE_PREFIX; @@ -45,6 +47,7 @@ impl TargetRegistry { registry.register(ChannelTargetType::Mqtt.as_str(), Box::new(MQTTTargetFactory)); registry.register(ChannelTargetType::Nats.as_str(), Box::new(NATSTargetFactory)); registry.register(ChannelTargetType::Pulsar.as_str(), Box::new(PulsarTargetFactory)); + registry.register(ChannelTargetType::Kafka.as_str(), Box::new(KafkaTargetFactory)); registry } diff --git a/crates/targets/Cargo.toml b/crates/targets/Cargo.toml index 40c87cc50..31b6aed6a 100644 --- a/crates/targets/Cargo.toml +++ b/crates/targets/Cargo.toml @@ -33,6 +33,7 @@ url = { workspace = true } urlencoding = { workspace = true } uuid = { workspace = true, features = ["v4", "serde"] } sysinfo = { workspace = true, features = ["multithread"] } +rustfs-kafka-async = { workspace = true } [dev-dependencies] criterion = { workspace = true } diff --git a/crates/targets/src/check.rs b/crates/targets/src/check.rs index 19dd10210..4e61e1fa3 100644 --- a/crates/targets/src/check.rs +++ b/crates/targets/src/check.rs @@ -128,3 +128,50 @@ pub async fn check_pulsar_broker_available(args: &crate::target::pulsar::PulsarA Err(_) => Err(crate::TargetError::Timeout("Pulsar connection timed out".to_string())), } } + +pub async fn check_kafka_broker_available(args: &crate::target::kafka::KafkaArgs) -> Result<(), crate::TargetError> { + use rustfs_kafka_async::error::{ConnectionError, Error as KafkaError}; + use rustfs_kafka_async::{AsyncProducer, AsyncProducerConfig, RequiredAcks, SecurityConfig}; + use std::time::Duration; + + let map_kafka_error = |err: KafkaError, context: &str| match err { + KafkaError::Connection(ConnectionError::NoHostReachable) => crate::TargetError::NotConnected, + KafkaError::Connection(ConnectionError::Timeout(_)) => crate::TargetError::Timeout(format!("{context}: {err}")), + KafkaError::Connection(_) => crate::TargetError::Network(format!("{context}: {err}")), + KafkaError::Config(_) => crate::TargetError::Configuration(format!("{context}: {err}")), + _ => crate::TargetError::Request(format!("{context}: {err}")), + }; + + let acks = match args.acks { + 0 => RequiredAcks::None, + 1 => RequiredAcks::One, + _ => RequiredAcks::All, + }; + + let mut config = AsyncProducerConfig::new() + .with_ack_timeout(Duration::from_secs(5)) + .with_required_acks(acks); + + if args.tls_enable { + let mut security = SecurityConfig::new(); + if !args.tls_ca.is_empty() { + security = security.with_ca_cert(args.tls_ca.clone()); + } + if !args.tls_client_cert.is_empty() && !args.tls_client_key.is_empty() { + security = security.with_client_cert(args.tls_client_cert.clone(), args.tls_client_key.clone()); + } + config = config.with_security(security); + } + + match tokio::time::timeout(Duration::from_secs(5), async { + let _ = AsyncProducer::from_hosts_with_config(args.brokers.clone(), config) + .await + .map_err(|err| map_kafka_error(err, "Kafka broker check failed to create producer"))?; + Ok(()) + }) + .await + { + Ok(result) => result, + Err(_) => Err(crate::TargetError::Timeout("Kafka connection timed out".to_string())), + } +} diff --git a/crates/targets/src/config/mod.rs b/crates/targets/src/config/mod.rs index 81204027e..ca27ef3e8 100644 --- a/crates/targets/src/config/mod.rs +++ b/crates/targets/src/config/mod.rs @@ -21,6 +21,6 @@ pub use loader::{ collect_target_configs_from_env, }; pub use target_args::{ - build_mqtt_args, build_nats_args, build_pulsar_args, build_webhook_args, validate_mqtt_config, validate_nats_config, - validate_pulsar_config, validate_webhook_config, + build_kafka_args, build_mqtt_args, build_nats_args, build_pulsar_args, build_webhook_args, validate_kafka_config, + validate_mqtt_config, validate_nats_config, validate_pulsar_config, validate_webhook_config, }; diff --git a/crates/targets/src/config/target_args.rs b/crates/targets/src/config/target_args.rs index f077911aa..b6cee82e2 100644 --- a/crates/targets/src/config/target_args.rs +++ b/crates/targets/src/config/target_args.rs @@ -16,6 +16,7 @@ use super::common::{parse_target_bool, parse_url, validate_nats_server_config, v use crate::error::TargetError; use crate::target::{ TargetType, + kafka::KafkaArgs, mqtt::{MQTTArgs, MQTTTlsConfig, validate_mqtt_broker_url}, nats::{NATSArgs, validate_nats_address}, pulsar::{PulsarArgs, validate_pulsar_broker}, @@ -23,19 +24,38 @@ use crate::target::{ }; use rumqttc::QoS; use rustfs_config::{ - DEFAULT_LIMIT, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, - MQTT_RECONNECT_INTERVAL, MQTT_TLS_CA, MQTT_TLS_CLIENT_CERT, MQTT_TLS_CLIENT_KEY, MQTT_TLS_POLICY, MQTT_TLS_TRUST_LEAF_AS_CA, - MQTT_TOPIC, MQTT_USERNAME, MQTT_WS_PATH_ALLOWLIST, NATS_ADDRESS, NATS_CREDENTIALS_FILE, NATS_PASSWORD, NATS_QUEUE_DIR, - NATS_QUEUE_LIMIT, NATS_SUBJECT, NATS_TLS_CA, NATS_TLS_CLIENT_CERT, NATS_TLS_CLIENT_KEY, NATS_TLS_REQUIRED, NATS_TOKEN, - NATS_USERNAME, PULSAR_AUTH_TOKEN, PULSAR_BROKER, PULSAR_PASSWORD, PULSAR_QUEUE_DIR, PULSAR_QUEUE_LIMIT, - PULSAR_TLS_ALLOW_INSECURE, PULSAR_TLS_CA, PULSAR_TLS_HOSTNAME_VERIFICATION, PULSAR_TOPIC, PULSAR_USERNAME, - RUSTFS_WEBHOOK_SKIP_TLS_VERIFY_DEFAULT, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CA, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, - WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, WEBHOOK_SKIP_TLS_VERIFY, + DEFAULT_LIMIT, KAFKA_ACKS, KAFKA_BROKERS, KAFKA_QUEUE_DIR, KAFKA_QUEUE_LIMIT, KAFKA_TLS_CA, KAFKA_TLS_CLIENT_CERT, + KAFKA_TLS_CLIENT_KEY, KAFKA_TLS_ENABLE, KAFKA_TOPIC, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, + MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TLS_CA, MQTT_TLS_CLIENT_CERT, MQTT_TLS_CLIENT_KEY, + MQTT_TLS_POLICY, MQTT_TLS_TRUST_LEAF_AS_CA, MQTT_TOPIC, MQTT_USERNAME, MQTT_WS_PATH_ALLOWLIST, NATS_ADDRESS, + NATS_CREDENTIALS_FILE, NATS_PASSWORD, NATS_QUEUE_DIR, NATS_QUEUE_LIMIT, NATS_SUBJECT, NATS_TLS_CA, NATS_TLS_CLIENT_CERT, + NATS_TLS_CLIENT_KEY, NATS_TLS_REQUIRED, NATS_TOKEN, NATS_USERNAME, PULSAR_AUTH_TOKEN, PULSAR_BROKER, PULSAR_PASSWORD, + PULSAR_QUEUE_DIR, PULSAR_QUEUE_LIMIT, PULSAR_TLS_ALLOW_INSECURE, PULSAR_TLS_CA, PULSAR_TLS_HOSTNAME_VERIFICATION, + PULSAR_TOPIC, PULSAR_USERNAME, RUSTFS_WEBHOOK_SKIP_TLS_VERIFY_DEFAULT, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CA, + WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, WEBHOOK_SKIP_TLS_VERIFY, }; use rustfs_ecstore::config::KVS; use std::path::Path; use std::time::Duration; +fn parse_kafka_acks_value(value: Option<&str>) -> Result { + let Some(value) = value else { + return Ok(1); + }; + + let normalized = value.trim(); + if normalized.is_empty() { + return Err(TargetError::Configuration("Kafka acks must be one of: 0, 1, -1, all".to_string())); + } + + match normalized.to_ascii_lowercase().as_str() { + "0" => Ok(0), + "1" => Ok(1), + "-1" | "all" => Ok(-1), + _ => Err(TargetError::Configuration("Kafka acks must be one of: 0, 1, -1, all".to_string())), + } +} + pub fn build_webhook_args(config: &KVS, default_queue_dir: &str, target_type: TargetType) -> Result { let endpoint = config .lookup(WEBHOOK_ENDPOINT) @@ -265,3 +285,114 @@ pub fn validate_pulsar_config(config: &KVS, default_queue_dir: &str) -> Result<( .ok_or_else(|| TargetError::Configuration("Missing Pulsar broker".to_string()))?; validate_pulsar_broker_config(&broker, config, default_queue_dir) } + +pub fn build_kafka_args(config: &KVS, default_queue_dir: &str, target_type: TargetType) -> Result { + let brokers_raw = config + .lookup(KAFKA_BROKERS) + .ok_or_else(|| TargetError::Configuration("Missing Kafka brokers".to_string()))?; + if brokers_raw.split(',').all(|s| s.trim().is_empty()) { + return Err(TargetError::Configuration("Kafka brokers cannot be empty".to_string())); + } + let brokers: Vec = brokers_raw + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + let topic = config + .lookup(KAFKA_TOPIC) + .ok_or_else(|| TargetError::Configuration("Missing Kafka topic".to_string()))?; + + Ok(KafkaArgs { + enable: true, + brokers, + topic, + acks: parse_kafka_acks_value(config.lookup(KAFKA_ACKS).as_deref())?, + tls_enable: parse_target_bool(config.lookup(KAFKA_TLS_ENABLE).as_deref()).unwrap_or(false), + tls_ca: config.lookup(KAFKA_TLS_CA).unwrap_or_default(), + tls_client_cert: config.lookup(KAFKA_TLS_CLIENT_CERT).unwrap_or_default(), + tls_client_key: config.lookup(KAFKA_TLS_CLIENT_KEY).unwrap_or_default(), + queue_dir: config + .lookup(KAFKA_QUEUE_DIR) + .unwrap_or_else(|| default_queue_dir.to_string()), + queue_limit: config + .lookup(KAFKA_QUEUE_LIMIT) + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_LIMIT), + target_type, + }) +} + +pub fn validate_kafka_config(config: &KVS, default_queue_dir: &str) -> Result<(), TargetError> { + let brokers_raw = config + .lookup(KAFKA_BROKERS) + .ok_or_else(|| TargetError::Configuration("Missing Kafka brokers".to_string()))?; + if brokers_raw.split(',').map(|s| s.trim()).all(|s| s.is_empty()) { + return Err(TargetError::Configuration("Kafka brokers cannot be empty".to_string())); + } + + if config.lookup(KAFKA_TOPIC).is_none() { + return Err(TargetError::Configuration("Missing Kafka topic".to_string())); + } + + parse_kafka_acks_value(config.lookup(KAFKA_ACKS).as_deref())?; + + let tls_client_cert = config.lookup(KAFKA_TLS_CLIENT_CERT).unwrap_or_default(); + let tls_client_key = config.lookup(KAFKA_TLS_CLIENT_KEY).unwrap_or_default(); + if tls_client_cert.is_empty() != tls_client_key.is_empty() { + return Err(TargetError::Configuration( + "Kafka tls_client_cert and tls_client_key must be specified together".to_string(), + )); + } + + let queue_dir = config + .lookup(KAFKA_QUEUE_DIR) + .unwrap_or_else(|| default_queue_dir.to_string()); + if !queue_dir.is_empty() && !std::path::Path::new(&queue_dir).is_absolute() { + return Err(TargetError::Configuration("Kafka queue directory must be an absolute path".to_string())); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::{build_kafka_args, validate_kafka_config}; + use crate::target::TargetType; + use rustfs_config::{KAFKA_ACKS, KAFKA_BROKERS, KAFKA_TOPIC}; + use rustfs_ecstore::config::KVS; + + fn kafka_base_config() -> KVS { + let mut config = KVS::new(); + config.insert(KAFKA_BROKERS.to_string(), "127.0.0.1:9092".to_string()); + config.insert(KAFKA_TOPIC.to_string(), "events".to_string()); + config + } + + #[test] + fn build_kafka_args_accepts_all_ack_alias() { + let mut config = kafka_base_config(); + config.insert(KAFKA_ACKS.to_string(), "all".to_string()); + + let args = build_kafka_args(&config, "", TargetType::NotifyEvent).expect("valid kafka args"); + assert_eq!(args.acks, -1); + } + + #[test] + fn build_kafka_args_rejects_invalid_acks() { + let mut config = kafka_base_config(); + config.insert(KAFKA_ACKS.to_string(), "leader".to_string()); + + let err = build_kafka_args(&config, "", TargetType::NotifyEvent).expect_err("invalid acks should fail"); + assert!(err.to_string().contains("Kafka acks must be one of")); + } + + #[test] + fn validate_kafka_config_rejects_invalid_acks() { + let mut config = kafka_base_config(); + config.insert(KAFKA_ACKS.to_string(), "2".to_string()); + + let err = validate_kafka_config(&config, "").expect_err("invalid acks should fail"); + assert!(err.to_string().contains("Kafka acks must be one of")); + } +} diff --git a/crates/targets/src/lib.rs b/crates/targets/src/lib.rs index 8562f4e02..483e9fd21 100644 --- a/crates/targets/src/lib.rs +++ b/crates/targets/src/lib.rs @@ -21,7 +21,8 @@ pub mod sys; pub mod target; pub use check::{ - check_mqtt_broker_available, check_mqtt_broker_available_with_tls, check_nats_server_available, check_pulsar_broker_available, + check_kafka_broker_available, check_mqtt_broker_available, check_mqtt_broker_available_with_tls, check_nats_server_available, + check_pulsar_broker_available, }; pub use error::{StoreError, TargetError}; pub use rustfs_s3_common::EventName; diff --git a/crates/targets/src/target/kafka.rs b/crates/targets/src/target/kafka.rs new file mode 100644 index 000000000..e1e14b5ea --- /dev/null +++ b/crates/targets/src/target/kafka.rs @@ -0,0 +1,437 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ + StoreError, Target, TargetLog, + arn::TargetID, + error::TargetError, + store::{Key, QueueStore, Store}, + target::{ + ChannelTargetType, EntityTarget, QueuedPayload, QueuedPayloadMeta, TargetDeliveryCounters, TargetDeliverySnapshot, + TargetType, + }, +}; +use async_trait::async_trait; +use rustfs_config::audit::AUDIT_STORE_EXTENSION; +use rustfs_config::notify::NOTIFY_STORE_EXTENSION; +use rustfs_kafka_async::error::{ConnectionError, Error as KafkaError}; +use rustfs_kafka_async::{AsyncProducer, AsyncProducerConfig, Record, RequiredAcks, SecurityConfig}; +use serde::Serialize; +use serde::de::DeserializeOwned; +use std::{marker::PhantomData, path::PathBuf, sync::Arc, time::Duration}; +use tokio::sync::Mutex; +use tracing::{debug, error, info, instrument, warn}; + +/// Arguments for configuring a Kafka target +#[derive(Debug, Clone)] +pub struct KafkaArgs { + /// Whether the target is enabled + pub enable: bool, + /// Comma-separated list of broker addresses (e.g. "localhost:9092,broker2:9092") + pub brokers: Vec, + /// The topic to publish events to + pub topic: String, + /// Required acks: 0 = none, 1 = leader, -1 = all + pub acks: i16, + /// Whether to enable TLS for Kafka transport + pub tls_enable: bool, + /// Optional path to CA cert used for broker verification + pub tls_ca: String, + /// Optional path to client certificate for mTLS + pub tls_client_cert: String, + /// Optional path to client private key for mTLS + pub tls_client_key: String, + /// The directory to store events in case of failure + pub queue_dir: String, + /// The maximum number of events to store + pub queue_limit: u64, + /// The target type (audit or notify) + pub target_type: TargetType, +} + +impl KafkaArgs { + /// Validates the KafkaArgs configuration + pub fn validate(&self) -> Result<(), TargetError> { + if !self.enable { + return Ok(()); + } + + if self.brokers.is_empty() { + return Err(TargetError::Configuration("kafka brokers cannot be empty".to_string())); + } + + if self.topic.is_empty() { + return Err(TargetError::Configuration("kafka topic cannot be empty".to_string())); + } + + if !matches!(self.acks, -1..=1) { + return Err(TargetError::Configuration("kafka acks must be one of: 0, 1, -1".to_string())); + } + + if self.tls_client_cert.is_empty() != self.tls_client_key.is_empty() { + return Err(TargetError::Configuration( + "kafka tls_client_cert and tls_client_key must be specified together".to_string(), + )); + } + + if !self.queue_dir.is_empty() { + let path = std::path::Path::new(&self.queue_dir); + if !path.is_absolute() { + return Err(TargetError::Configuration("kafka queueDir path should be absolute".to_string())); + } + } + + Ok(()) + } +} + +/// A target that sends events to an Apache Kafka topic +pub struct KafkaTarget +where + E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned, +{ + id: TargetID, + args: KafkaArgs, + store: Option + Send + Sync>>, + producer: Arc>>>, + delivery_counters: Arc, + _phantom: PhantomData, +} + +impl KafkaTarget +where + E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned, +{ + fn map_kafka_error(err: KafkaError, context: &str) -> TargetError { + match err { + KafkaError::Connection(ConnectionError::NoHostReachable) => TargetError::NotConnected, + KafkaError::Connection(ConnectionError::Timeout(_)) => TargetError::Timeout(format!("{context}: {err}")), + KafkaError::Connection(_) => TargetError::Network(format!("{context}: {err}")), + KafkaError::Config(_) => TargetError::Configuration(format!("{context}: {err}")), + _ => TargetError::Request(format!("{context}: {err}")), + } + } + + fn is_connection_error(err: &TargetError) -> bool { + matches!(err, TargetError::NotConnected | TargetError::Timeout(_) | TargetError::Network(_)) + } + + /// Creates a new KafkaTarget + #[instrument(skip(args), fields(target_id = %id))] + pub fn new(id: String, args: KafkaArgs) -> Result { + args.validate()?; + + let target_id = TargetID::new(id, ChannelTargetType::Kafka.as_str().to_string()); + + let queue_store = if !args.queue_dir.is_empty() { + let queue_dir = + PathBuf::from(&args.queue_dir).join(format!("rustfs-{}-{}", ChannelTargetType::Kafka.as_str(), target_id.id)); + + let extension = match args.target_type { + TargetType::AuditLog => AUDIT_STORE_EXTENSION, + TargetType::NotifyEvent => NOTIFY_STORE_EXTENSION, + }; + + let store = QueueStore::::new(queue_dir, args.queue_limit, extension); + if let Err(e) = store.open() { + error!("Failed to open store for Kafka target {}: {}", target_id.id, e); + return Err(TargetError::Storage(format!("{e}"))); + } + + Some(Box::new(store) as Box + Send + Sync>) + } else { + None + }; + + info!(target_id = %target_id.id, "Kafka target created"); + Ok(KafkaTarget { + id: target_id, + args, + store: queue_store, + producer: Arc::new(Mutex::new(None)), + delivery_counters: Arc::new(TargetDeliveryCounters::default()), + _phantom: PhantomData, + }) + } + + /// Builds a Kafka producer from the current args + async fn build_producer(&self) -> Result { + let acks = match self.args.acks { + 0 => RequiredAcks::None, + 1 => RequiredAcks::One, + _ => RequiredAcks::All, + }; + + let mut config = AsyncProducerConfig::new() + .with_ack_timeout(Duration::from_secs(30)) + .with_required_acks(acks); + + if self.args.tls_enable { + let mut security = SecurityConfig::new(); + if !self.args.tls_ca.is_empty() { + security = security.with_ca_cert(self.args.tls_ca.clone()); + } + if !self.args.tls_client_cert.is_empty() && !self.args.tls_client_key.is_empty() { + security = security.with_client_cert(self.args.tls_client_cert.clone(), self.args.tls_client_key.clone()); + } + config = config.with_security(security); + } + + AsyncProducer::from_hosts_with_config(self.args.brokers.clone(), config) + .await + .map_err(|e| Self::map_kafka_error(e, "Failed to create Kafka producer")) + } + + async fn get_or_build_producer(&self) -> Result, TargetError> { + let mut cached = self.producer.lock().await; + if let Some(producer) = cached.as_ref() { + return Ok(Arc::clone(producer)); + } + + let producer = Arc::new(self.build_producer().await?); + *cached = Some(Arc::clone(&producer)); + Ok(producer) + } + + async fn invalidate_cached_producer(&self) { + let mut cached = self.producer.lock().await; + *cached = None; + } + + /// Serializes the event and builds a QueuedPayload + fn build_queued_payload(&self, event: &EntityTarget) -> Result { + let object_name = crate::target::decode_object_name(&event.object_name)?; + let key = format!("{}/{}", event.bucket_name, object_name); + + let log = TargetLog { + event_name: event.event_name, + key, + records: vec![event.data.clone()], + }; + + let body = serde_json::to_vec(&log).map_err(|e| TargetError::Serialization(format!("Failed to serialize event: {e}")))?; + + let meta = QueuedPayloadMeta::new( + event.event_name, + event.bucket_name.clone(), + event.object_name.clone(), + "application/json", + body.len(), + ); + + Ok(QueuedPayload::new(meta, body)) + } + + /// Sends the raw body to Kafka + #[instrument(skip(self, body, meta), fields(target_id = %self.id))] + async fn send_body(&self, body: Vec, meta: &QueuedPayloadMeta) -> Result<(), TargetError> { + debug!( + target = %self.id, + bucket = %meta.bucket_name, + object = %meta.object_name, + event = %meta.event_name, + payload_len = body.len(), + "Sending Kafka payload" + ); + + let producer = self.get_or_build_producer().await?; + + if let Err(err) = producer.send(&Record::from_value(&self.args.topic, body.as_slice())).await { + let mapped = Self::map_kafka_error(err, "Failed to send message to Kafka"); + if Self::is_connection_error(&mapped) { + self.invalidate_cached_producer().await; + } + return Err(mapped); + } + + debug!(target_id = %self.id, topic = %self.args.topic, "Event published to Kafka topic"); + self.delivery_counters.record_success(); + Ok(()) + } + + /// Clones this target into a boxed trait object + pub fn clone_box(&self) -> Box + Send + Sync> { + Box::new(KafkaTarget:: { + id: self.id.clone(), + args: self.args.clone(), + store: self.store.as_ref().map(|s| s.boxed_clone()), + producer: Arc::clone(&self.producer), + delivery_counters: Arc::clone(&self.delivery_counters), + _phantom: PhantomData, + }) + } +} + +#[async_trait] +impl Target for KafkaTarget +where + E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned, +{ + fn id(&self) -> TargetID { + self.id.clone() + } + + async fn is_active(&self) -> Result { + let _ = self.get_or_build_producer().await?; + Ok(true) + } + + async fn save(&self, event: Arc>) -> Result<(), TargetError> { + let queued = match self.build_queued_payload(&event) { + Ok(queued) => queued, + Err(err) => { + self.delivery_counters.record_final_failure(); + return Err(err); + } + }; + + if let Some(store) = &self.store { + let encoded = match queued.encode() { + Ok(encoded) => encoded, + Err(err) => { + self.delivery_counters.record_final_failure(); + return Err(TargetError::Storage(format!("Failed to encode queued payload: {err}"))); + } + }; + if let Err(e) = store.put_raw(&encoded) { + self.delivery_counters.record_final_failure(); + return Err(TargetError::Storage(format!("Failed to save event to store: {e}"))); + } + debug!("Event saved to store for Kafka target: {}", self.id); + Ok(()) + } else { + if let Err(err) = self.send_body(queued.body, &queued.meta).await { + self.delivery_counters.record_final_failure(); + return Err(err); + } + Ok(()) + } + } + + async fn send_raw_from_store(&self, key: Key, body: Vec, meta: QueuedPayloadMeta) -> Result<(), TargetError> { + debug!("Sending queued payload from store for Kafka target: {}, key: {}", self.id, key); + + if let Err(e) = self.send_body(body, &meta).await { + if matches!(e, TargetError::NotConnected) { + warn!(target_id = %self.id, "Kafka not reachable, event remains in store."); + return Err(TargetError::NotConnected); + } + error!(target_id = %self.id, error = %e, "Failed to send event from store."); + return Err(e); + } + + debug!("Event sent from store for Kafka target: {}", self.id); + Ok(()) + } + + async fn close(&self) -> Result<(), TargetError> { + info!("Kafka target closed: {}", self.id); + Ok(()) + } + + fn store(&self) -> Option<&(dyn Store + Send + Sync)> { + self.store.as_deref() + } + + fn clone_dyn(&self) -> Box + Send + Sync> { + self.clone_box() + } + + fn is_enabled(&self) -> bool { + self.args.enable + } + + fn delivery_snapshot(&self) -> TargetDeliverySnapshot { + self.delivery_counters + .snapshot(self.store.as_deref().map_or(0, |store| store.len() as u64)) + } + + fn record_final_failure(&self) { + self.delivery_counters.record_final_failure(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn base_args() -> KafkaArgs { + KafkaArgs { + enable: true, + brokers: vec!["localhost:9092".to_string()], + topic: "rustfs-events".to_string(), + acks: 1, + tls_enable: false, + tls_ca: String::new(), + tls_client_cert: String::new(), + tls_client_key: String::new(), + queue_dir: String::new(), + queue_limit: 0, + target_type: TargetType::NotifyEvent, + } + } + + #[test] + fn test_validate_empty_brokers() { + let args = KafkaArgs { + brokers: vec![], + ..base_args() + }; + assert!(args.validate().is_err()); + } + + #[test] + fn test_validate_empty_topic() { + let args = KafkaArgs { + topic: String::new(), + ..base_args() + }; + assert!(args.validate().is_err()); + } + + #[test] + fn test_validate_relative_queue_dir() { + let args = KafkaArgs { + queue_dir: "relative/path".to_string(), + ..base_args() + }; + assert!(args.validate().is_err()); + } + + #[test] + fn test_validate_valid_args() { + assert!(base_args().validate().is_ok()); + } + + #[test] + fn test_validate_disabled_target_skips_validation() { + let args = KafkaArgs { + enable: false, + brokers: vec![], + topic: String::new(), + ..base_args() + }; + assert!(args.validate().is_ok()); + } + + #[test] + fn test_validate_tls_client_cert_and_key_must_be_paired() { + let args = KafkaArgs { + tls_client_cert: "/tmp/client.crt".to_string(), + tls_client_key: String::new(), + ..base_args() + }; + assert!(args.validate().is_err()); + } +} diff --git a/crates/targets/src/target/mod.rs b/crates/targets/src/target/mod.rs index 8a26c2e64..ad1429deb 100644 --- a/crates/targets/src/target/mod.rs +++ b/crates/targets/src/target/mod.rs @@ -25,6 +25,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::warn; +pub mod kafka; pub mod mqtt; pub mod nats; pub mod pulsar; diff --git a/rustfs/src/admin/handlers/audit.rs b/rustfs/src/admin/handlers/audit.rs index cb2f06381..d13fa9a26 100644 --- a/rustfs/src/admin/handlers/audit.rs +++ b/rustfs/src/admin/handlers/audit.rs @@ -15,10 +15,10 @@ use crate::admin::{ auth::validate_admin_request, handlers::target_descriptor::{ - AdminTargetSpec, AdminTargetValidator, EndpointKey, TargetDomain, allowed_target_keys, - collect_config_entry_keys as shared_collect_config_entry_keys, - collect_configured_endpoint_keys as shared_collect_configured_endpoint_keys, - collect_env_endpoint_keys as shared_collect_env_endpoint_keys, normalized_endpoint_key, target_service_name, target_spec, + AdminTargetSpec, AdminTargetValidator, EndpointKey, TargetDomain, TargetEndpointSource, allowed_target_keys, + collect_validated_key_values as shared_collect_validated_key_values, + merge_target_endpoints as shared_merge_target_endpoints, + target_mutation_block_reason as shared_target_mutation_block_reason, target_service_name, target_spec, validate_target_request, }, router::{AdminOperation, Operation, S3Router}, @@ -26,14 +26,13 @@ use crate::admin::{ use crate::auth::{check_key_valid, get_session_token}; use crate::server::{ADMIN_PREFIX, RemoteAddr}; use futures::stream::{FuturesUnordered, StreamExt}; -use hashbrown::HashSet as HbHashSet; use http::{HeaderMap, StatusCode}; use hyper::Method; use matchit::Params; use rustfs_audit::{audit_system, start_audit_system as start_global_audit_system, system::AuditSystemState}; use rustfs_config::audit::{ - AUDIT_MQTT_KEYS, AUDIT_MQTT_SUB_SYS, AUDIT_NATS_KEYS, AUDIT_NATS_SUB_SYS, AUDIT_PULSAR_KEYS, AUDIT_PULSAR_SUB_SYS, - AUDIT_ROUTE_PREFIX, AUDIT_WEBHOOK_KEYS, AUDIT_WEBHOOK_SUB_SYS, + AUDIT_KAFKA_KEYS, AUDIT_KAFKA_SUB_SYS, AUDIT_MQTT_KEYS, AUDIT_MQTT_SUB_SYS, AUDIT_NATS_KEYS, AUDIT_NATS_SUB_SYS, + AUDIT_PULSAR_KEYS, AUDIT_PULSAR_SUB_SYS, AUDIT_ROUTE_PREFIX, AUDIT_WEBHOOK_KEYS, AUDIT_WEBHOOK_SUB_SYS, }; use rustfs_config::{AUDIT_DEFAULT_DIR, DEFAULT_DELIMITER, ENABLE_KEY, EnableState, MAX_ADMIN_REQUEST_BODY_SIZE}; use rustfs_ecstore::config::Config; @@ -84,7 +83,7 @@ struct AuditEndpoint { account_id: String, service: String, status: String, - source: AuditEndpointSource, + source: TargetEndpointSource, } #[derive(Serialize, Debug)] @@ -92,16 +91,7 @@ struct AuditEndpointsResponse { audit_endpoints: Vec, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] -#[serde(rename_all = "lowercase")] -enum AuditEndpointSource { - Config, - Env, - Mixed, - Runtime, -} - -fn audit_target_specs() -> [AdminTargetSpec; 4] { +fn audit_target_specs() -> [AdminTargetSpec; 5] { [ AdminTargetSpec { subsystem: AUDIT_WEBHOOK_SUB_SYS, @@ -109,6 +99,12 @@ fn audit_target_specs() -> [AdminTargetSpec; 4] { valid_keys: AUDIT_WEBHOOK_KEYS, validator: AdminTargetValidator::Webhook, }, + AdminTargetSpec { + subsystem: AUDIT_KAFKA_SUB_SYS, + service: "kafka", + valid_keys: AUDIT_KAFKA_KEYS, + validator: AdminTargetValidator::Kafka(TargetDomain::Audit), + }, AdminTargetSpec { subsystem: AUDIT_MQTT_SUB_SYS, service: "mqtt", @@ -161,140 +157,27 @@ fn has_any_audit_targets(config: &Config) -> bool { false } -fn collect_configured_audit_endpoint_keys(config: &Config) -> Vec { - shared_collect_configured_endpoint_keys(&audit_target_specs(), config) -} - -fn collect_config_entry_keys(config: &Config) -> HbHashSet { - shared_collect_config_entry_keys(&audit_target_specs(), config) -} - -fn collect_env_endpoint_keys() -> HbHashSet { - shared_collect_env_endpoint_keys(&audit_target_specs(), AUDIT_ROUTE_PREFIX) -} - -fn classify_audit_endpoint_source( - config_targets: &HbHashSet, - env_targets: &HbHashSet, - key: &EndpointKey, -) -> AuditEndpointSource { - match (config_targets.contains(key), env_targets.contains(key)) { - (true, true) => AuditEndpointSource::Mixed, - (true, false) => AuditEndpointSource::Config, - (false, true) => AuditEndpointSource::Env, - (false, false) => AuditEndpointSource::Runtime, - } -} - -fn audit_endpoint_source(config: &Config, target_type: &str, target_name: &str) -> AuditEndpointSource { - let config_targets = collect_config_entry_keys(config); - let env_targets = collect_env_endpoint_keys(); - let service = target_service_name(&audit_target_specs(), target_type).unwrap_or_default(); - - let key = normalized_endpoint_key(target_name, service); - classify_audit_endpoint_source(&config_targets, &env_targets, &key) -} - fn audit_target_mutation_block_reason(config: &Config, target_type: &str, target_name: &str) -> Option { - match audit_endpoint_source(config, target_type, target_name) { - AuditEndpointSource::Env => Some(format!( - "audit target '{}' is managed by environment variables and cannot be modified from the console", - target_name - )), - AuditEndpointSource::Mixed => Some(format!( - "audit target '{}' is configured by both persisted config and environment variables; remove the environment variables first", - target_name - )), - AuditEndpointSource::Config | AuditEndpointSource::Runtime => None, - } + shared_target_mutation_block_reason( + &audit_target_specs(), + AUDIT_ROUTE_PREFIX, + config, + target_type, + target_name, + "audit target", + ) } fn merge_audit_endpoints(config: &Config, runtime_statuses: HashMap) -> Vec { - let mut audit_endpoints = Vec::new(); - let mut seen = HashSet::new(); - let configured_keys = collect_configured_audit_endpoint_keys(config); - let config_targets = collect_config_entry_keys(config); - let env_targets = collect_env_endpoint_keys(); - let mut normalized_runtime_statuses: HashMap = HashMap::new(); - for ((account_id, service), status) in runtime_statuses { - let normalized = normalized_endpoint_key(&account_id, &service); - normalized_runtime_statuses - .entry(normalized) - .or_insert((account_id, service, status)); - } - - for key in configured_keys { - let normalized = normalized_endpoint_key(&key.0, &key.1); - if !seen.insert(normalized.clone()) { - continue; - } - let status = normalized_runtime_statuses - .remove(&normalized) - .map(|(_, _, status)| status) - .unwrap_or_else(|| "offline".to_string()); - let source = classify_audit_endpoint_source(&config_targets, &env_targets, &normalized); - audit_endpoints.push(AuditEndpoint { - account_id: key.0, - service: key.1, - status, - source, - }); - } - - for (normalized, (account_id, service, status)) in normalized_runtime_statuses { - if seen.insert(normalized.clone()) { - audit_endpoints.push(AuditEndpoint { - account_id, - service, - status, - source: classify_audit_endpoint_source(&config_targets, &env_targets, &normalized), - }); - } - } - - for key in &env_targets { - if !seen.insert(key.clone()) { - continue; - } - - audit_endpoints.push(AuditEndpoint { - account_id: key.0.clone(), - service: key.1.clone(), - status: "offline".to_string(), - source: classify_audit_endpoint_source(&config_targets, &env_targets, key), - }); - } - - audit_endpoints.sort_by(|a, b| a.service.cmp(&b.service).then_with(|| a.account_id.cmp(&b.account_id))); - audit_endpoints -} - -fn collect_validated_key_values( - key_values: &[KeyValue], - allowed_keys: &HashSet<&str>, - target_type: &str, -) -> S3Result> { - let mut kv_map = HashMap::new(); - let mut seen = HashSet::new(); - - for kv in key_values { - if !allowed_keys.contains(kv.key.as_str()) { - return Err(s3_error!( - InvalidArgument, - "key '{}' not allowed for audit target type '{}'", - kv.key, - target_type - )); - } - - if !seen.insert(kv.key.as_str()) { - return Err(s3_error!(InvalidArgument, "duplicate key '{}' in request body", kv.key)); - } - - kv_map.insert(kv.key.clone(), kv.value.clone()); - } - - Ok(kv_map) + shared_merge_target_endpoints(&audit_target_specs(), AUDIT_ROUTE_PREFIX, config, runtime_statuses) + .into_iter() + .map(|endpoint| AuditEndpoint { + account_id: endpoint.account_id, + service: endpoint.service, + status: endpoint.status, + source: endpoint.source, + }) + .collect() } fn extract_target_params<'a>(params: &'a Params<'_, '_>) -> S3Result<(&'a str, &'a str)> { @@ -406,7 +289,12 @@ impl Operation for AuditTargetConfig { let specs = audit_target_specs(); let allowed_keys: HashSet<&str> = allowed_target_keys(&specs, target_type); - let kv_map = collect_validated_key_values(&audit_body.key_values, &allowed_keys, target_type)?; + let kv_map = shared_collect_validated_key_values( + audit_body.key_values.iter().map(|kv| (kv.key.as_str(), kv.value.as_str())), + &allowed_keys, + target_type, + "audit target", + )?; let spec = target_spec(&specs, target_type) .ok_or_else(|| s3_error!(InvalidArgument, "unsupported audit target type: '{}'", target_type))?; @@ -577,19 +465,55 @@ mod tests { .iter() .find(|entry| entry.account_id == "mixed-target") .expect("mixed target should be present"); - assert_eq!(mixed.source, AuditEndpointSource::Mixed); + assert_eq!(mixed.source, TargetEndpointSource::Mixed); let env_only = merged .iter() .find(|entry| entry.account_id == "env-only") .expect("env-only target should be present"); - assert_eq!(env_only.source, AuditEndpointSource::Env); + assert_eq!(env_only.source, TargetEndpointSource::Env); let config_only = merged .iter() .find(|entry| entry.account_id == "config-target") .expect("config target should be present"); - assert_eq!(config_only.source, AuditEndpointSource::Config); + assert_eq!(config_only.source, TargetEndpointSource::Config); + }, + ); + } + + #[test] + fn merge_audit_endpoints_marks_kafka_env_and_mixed_sources() { + let config = Config(HashMap::from([( + AUDIT_KAFKA_SUB_SYS.to_string(), + HashMap::from([("mixed-kafka".to_string(), enabled_kvs("on"))]), + )])); + + with_vars( + [ + ("RUSTFS_AUDIT_KAFKA_ENABLE_MIXED-KAFKA", Some("on")), + ("RUSTFS_AUDIT_KAFKA_BROKERS_MIXED-KAFKA", Some("127.0.0.1:9092")), + ("RUSTFS_AUDIT_KAFKA_ENABLE_ENV-KAFKA", Some("on")), + ("RUSTFS_AUDIT_KAFKA_BROKERS_ENV-KAFKA", Some("127.0.0.1:9093")), + ], + || { + let runtime = HashMap::from([ + (("mixed-kafka".to_string(), "kafka".to_string()), "online".to_string()), + (("env-kafka".to_string(), "kafka".to_string()), "online".to_string()), + ]); + let merged = merge_audit_endpoints(&config, runtime); + + let mixed = merged + .iter() + .find(|entry| entry.account_id == "mixed-kafka" && entry.service == "kafka") + .expect("mixed kafka target should be present"); + assert_eq!(mixed.source, TargetEndpointSource::Mixed); + + let env_only = merged + .iter() + .find(|entry| entry.account_id == "env-kafka" && entry.service == "kafka") + .expect("env kafka target should be present"); + assert_eq!(env_only.source, TargetEndpointSource::Env); }, ); } @@ -641,7 +565,7 @@ mod tests { .iter() .find(|entry| entry.account_id == "mixed-disabled") .expect("mixed target should be present"); - assert_eq!(mixed.source, AuditEndpointSource::Mixed); + assert_eq!(mixed.source, TargetEndpointSource::Mixed); assert_eq!(mixed.status, "offline"); }, ); @@ -662,7 +586,7 @@ mod tests { .iter() .find(|entry| entry.account_id == "env-only") .expect("env-only target should be present"); - assert_eq!(env_only.source, AuditEndpointSource::Env); + assert_eq!(env_only.source, TargetEndpointSource::Env); assert_eq!(env_only.status, "offline"); }, ); @@ -671,7 +595,7 @@ mod tests { #[test] fn collect_validated_key_values_rejects_duplicate_keys() { let allowed_keys: HashSet<&str> = ["endpoint", "auth_token"].into_iter().collect(); - let key_values = vec![ + let key_values = [ KeyValue { key: "endpoint".to_string(), value: "https://example.com/one".to_string(), @@ -682,19 +606,31 @@ mod tests { }, ]; - let err = collect_validated_key_values(&key_values, &allowed_keys, AUDIT_WEBHOOK_SUB_SYS).unwrap_err(); + let err = shared_collect_validated_key_values( + key_values.iter().map(|kv| (kv.key.as_str(), kv.value.as_str())), + &allowed_keys, + AUDIT_WEBHOOK_SUB_SYS, + "audit target", + ) + .unwrap_err(); assert!(err.to_string().contains("duplicate key")); } #[test] fn collect_validated_key_values_rejects_unsupported_key() { let allowed_keys: HashSet<&str> = AUDIT_WEBHOOK_KEYS.iter().copied().collect(); - let key_values = vec![KeyValue { + let key_values = [KeyValue { key: "not_a_real_key".to_string(), value: "/tmp/rustfs-audit".to_string(), }]; - let err = collect_validated_key_values(&key_values, &allowed_keys, AUDIT_WEBHOOK_SUB_SYS).unwrap_err(); + let err = shared_collect_validated_key_values( + key_values.iter().map(|kv| (kv.key.as_str(), kv.value.as_str())), + &allowed_keys, + AUDIT_WEBHOOK_SUB_SYS, + "audit target", + ) + .unwrap_err(); assert!(err.to_string().contains("not allowed for audit target type")); } @@ -711,11 +647,19 @@ mod tests { .insert("/v3/audit/target/{target_type}/{target_name}", ()) .expect("route should insert"); let unsupported_type_params = full_router - .at("/v3/audit/target/audit_kafka/primary") + .at("/v3/audit/target/audit_unknown/primary") .expect("route should match"); let unsupported_type = extract_target_params(&unsupported_type_params.params).unwrap_err(); assert!(unsupported_type.to_string().contains("unsupported audit target type")); + let supported_kafka_params = full_router + .at("/v3/audit/target/audit_kafka/primary") + .expect("route should match"); + let (target_type, target_name) = + extract_target_params(&supported_kafka_params.params).expect("audit kafka target should be supported"); + assert_eq!(target_type, AUDIT_KAFKA_SUB_SYS); + assert_eq!(target_name, "primary"); + let mut partial_router = Router::new(); partial_router .insert("/v3/audit/target/{target_type}", ()) @@ -746,7 +690,7 @@ mod tests { .iter() .find(|entry| entry.account_id == "PrimaryCase" && entry.service == "webhook") .expect("mixed target should be present"); - assert_eq!(mixed.source, AuditEndpointSource::Mixed); + assert_eq!(mixed.source, TargetEndpointSource::Mixed); }, ); } diff --git a/rustfs/src/admin/handlers/event.rs b/rustfs/src/admin/handlers/event.rs index 9f93ac167..dda500cf3 100644 --- a/rustfs/src/admin/handlers/event.rs +++ b/rustfs/src/admin/handlers/event.rs @@ -15,10 +15,10 @@ use crate::admin::{ auth::validate_admin_request, handlers::target_descriptor::{ - AdminTargetSpec, AdminTargetValidator, EndpointKey, TargetDomain, allowed_target_keys, - collect_config_entry_keys as shared_collect_config_entry_keys, - collect_configured_endpoint_keys as shared_collect_configured_endpoint_keys, - collect_env_endpoint_keys as shared_collect_env_endpoint_keys, normalized_endpoint_key, target_service_name, target_spec, + AdminTargetSpec, AdminTargetValidator, EndpointKey, TargetDomain, TargetEndpointSource, allowed_target_keys, + collect_validated_key_values as shared_collect_validated_key_values, + merge_target_endpoints as shared_merge_target_endpoints, + target_mutation_block_reason as shared_target_mutation_block_reason, target_service_name, target_spec, validate_target_request, }, router::{AdminOperation, Operation, S3Router}, @@ -26,13 +26,12 @@ use crate::admin::{ use crate::auth::{check_key_valid, get_session_token}; use crate::server::{ADMIN_PREFIX, RemoteAddr}; use futures::stream::{FuturesUnordered, StreamExt}; -use hashbrown::HashSet as HbHashSet; use http::{HeaderMap, StatusCode}; use hyper::Method; use matchit::Params; use rustfs_config::notify::{ - NOTIFY_MQTT_KEYS, NOTIFY_MQTT_SUB_SYS, NOTIFY_NATS_KEYS, NOTIFY_NATS_SUB_SYS, NOTIFY_PULSAR_KEYS, NOTIFY_PULSAR_SUB_SYS, - NOTIFY_ROUTE_PREFIX, NOTIFY_WEBHOOK_KEYS, NOTIFY_WEBHOOK_SUB_SYS, + NOTIFY_KAFKA_KEYS, NOTIFY_KAFKA_SUB_SYS, NOTIFY_MQTT_KEYS, NOTIFY_MQTT_SUB_SYS, NOTIFY_NATS_KEYS, NOTIFY_NATS_SUB_SYS, + NOTIFY_PULSAR_KEYS, NOTIFY_PULSAR_SUB_SYS, NOTIFY_ROUTE_PREFIX, NOTIFY_WEBHOOK_KEYS, NOTIFY_WEBHOOK_SUB_SYS, }; use rustfs_config::{ENABLE_KEY, EVENT_DEFAULT_DIR, EnableState, MAX_ADMIN_REQUEST_BODY_SIZE}; use rustfs_ecstore::config::Config; @@ -89,7 +88,7 @@ struct NotificationEndpoint { account_id: String, service: String, status: String, - source: NotificationEndpointSource, + source: TargetEndpointSource, } #[derive(Serialize, Debug)] @@ -97,16 +96,7 @@ struct NotificationEndpointsResponse { notification_endpoints: Vec, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] -#[serde(rename_all = "lowercase")] -enum NotificationEndpointSource { - Config, - Env, - Mixed, - Runtime, -} - -fn notification_target_specs() -> [AdminTargetSpec; 4] { +fn notification_target_specs() -> [AdminTargetSpec; 5] { [ AdminTargetSpec { subsystem: NOTIFY_WEBHOOK_SUB_SYS, @@ -114,6 +104,12 @@ fn notification_target_specs() -> [AdminTargetSpec; 4] { valid_keys: NOTIFY_WEBHOOK_KEYS, validator: AdminTargetValidator::Webhook, }, + AdminTargetSpec { + subsystem: NOTIFY_KAFKA_SUB_SYS, + service: "kafka", + valid_keys: NOTIFY_KAFKA_KEYS, + validator: AdminTargetValidator::Kafka(TargetDomain::Notify), + }, AdminTargetSpec { subsystem: NOTIFY_MQTT_SUB_SYS, service: "mqtt", @@ -160,112 +156,27 @@ fn build_response(status: StatusCode, body: Body, request_id: Option<&http::Head S3Response::with_headers((status, body), header) } -fn collect_configured_endpoint_keys(config: &Config) -> Vec { - shared_collect_configured_endpoint_keys(¬ification_target_specs(), config) -} - -fn collect_config_entry_keys(config: &Config) -> HbHashSet { - shared_collect_config_entry_keys(¬ification_target_specs(), config) -} - -fn collect_env_endpoint_keys() -> HbHashSet { - shared_collect_env_endpoint_keys(¬ification_target_specs(), NOTIFY_ROUTE_PREFIX) -} - -fn classify_notification_endpoint_source( - config_targets: &HbHashSet, - env_targets: &HbHashSet, - key: &EndpointKey, -) -> NotificationEndpointSource { - match (config_targets.contains(key), env_targets.contains(key)) { - (true, true) => NotificationEndpointSource::Mixed, - (true, false) => NotificationEndpointSource::Config, - (false, true) => NotificationEndpointSource::Env, - (false, false) => NotificationEndpointSource::Runtime, - } -} - -fn notification_endpoint_source(config: &Config, target_type: &str, target_name: &str) -> NotificationEndpointSource { - let config_targets = collect_config_entry_keys(config); - let env_targets = collect_env_endpoint_keys(); - let service = target_service_name(¬ification_target_specs(), target_type).unwrap_or_default(); - - let key = normalized_endpoint_key(target_name, service); - classify_notification_endpoint_source(&config_targets, &env_targets, &key) -} - fn target_mutation_block_reason(config: &Config, target_type: &str, target_name: &str) -> Option { - match notification_endpoint_source(config, target_type, target_name) { - NotificationEndpointSource::Env => Some(format!( - "target '{}' is managed by environment variables and cannot be modified from the console", - target_name - )), - NotificationEndpointSource::Mixed => Some(format!( - "target '{}' is configured by both persisted config and environment variables; remove the environment variables first", - target_name - )), - NotificationEndpointSource::Config | NotificationEndpointSource::Runtime => None, - } + shared_target_mutation_block_reason( + ¬ification_target_specs(), + NOTIFY_ROUTE_PREFIX, + config, + target_type, + target_name, + "target", + ) } fn merge_notification_endpoints(config: &Config, runtime_statuses: HashMap) -> Vec { - let mut notification_endpoints = Vec::new(); - let mut seen = HashSet::new(); - let configured_keys = collect_configured_endpoint_keys(config); - let config_targets = collect_config_entry_keys(config); - let env_targets = collect_env_endpoint_keys(); - let mut normalized_runtime_statuses: HashMap = HashMap::new(); - for ((account_id, service), status) in runtime_statuses { - let normalized = normalized_endpoint_key(&account_id, &service); - normalized_runtime_statuses - .entry(normalized) - .or_insert((account_id, service, status)); - } - - for key in configured_keys { - let normalized = normalized_endpoint_key(&key.0, &key.1); - if !seen.insert(normalized.clone()) { - continue; - } - let status = normalized_runtime_statuses - .remove(&normalized) - .map(|(_, _, status)| status) - .unwrap_or_else(|| "offline".to_string()); - let source = classify_notification_endpoint_source(&config_targets, &env_targets, &normalized); - notification_endpoints.push(NotificationEndpoint { - account_id: key.0, - service: key.1, - status, - source, - }); - } - - for (normalized, (account_id, service, status)) in normalized_runtime_statuses { - if seen.insert(normalized.clone()) { - notification_endpoints.push(NotificationEndpoint { - account_id, - service, - status, - source: classify_notification_endpoint_source(&config_targets, &env_targets, &normalized), - }); - } - } - - for key in &env_targets { - if !seen.insert(key.clone()) { - continue; - } - - notification_endpoints.push(NotificationEndpoint { - account_id: key.0.clone(), - service: key.1.clone(), - status: "offline".to_string(), - source: classify_notification_endpoint_source(&config_targets, &env_targets, key), - }); - } - - notification_endpoints.sort_by(|a, b| a.service.cmp(&b.service).then_with(|| a.account_id.cmp(&b.account_id))); - notification_endpoints + shared_merge_target_endpoints(¬ification_target_specs(), NOTIFY_ROUTE_PREFIX, config, runtime_statuses) + .into_iter() + .map(|endpoint| NotificationEndpoint { + account_id: endpoint.account_id, + service: endpoint.service, + status: endpoint.status, + source: endpoint.source, + }) + .collect() } fn collect_online_target_arns(region: &str, target_statuses: Vec<(rustfs_targets::arn::TargetID, String)>) -> Vec { @@ -275,34 +186,6 @@ fn collect_online_target_arns(region: &str, target_statuses: Vec<(rustfs_targets .collect() } -fn collect_validated_key_values( - key_values: &[KeyValue], - allowed_keys: &HashSet<&str>, - target_type: &str, -) -> S3Result> { - let mut kv_map = HashMap::new(); - let mut seen = HashSet::new(); - - for kv in key_values { - if !allowed_keys.contains(kv.key.as_str()) { - return Err(s3_error!( - InvalidArgument, - "key '{}' not allowed for target type '{}'", - kv.key, - target_type - )); - } - - if !seen.insert(kv.key.as_str()) { - return Err(s3_error!(InvalidArgument, "duplicate key '{}' in request body", kv.key)); - } - - kv_map.insert(kv.key.clone(), kv.value.clone()); - } - - Ok(kv_map) -} - // --- Operations --- pub struct NotificationTarget {} @@ -332,7 +215,15 @@ impl Operation for NotificationTarget { let specs = notification_target_specs(); let allowed_keys: HashSet<&str> = allowed_target_keys(&specs, target_type); - let kv_map = collect_validated_key_values(¬ification_body.key_values, &allowed_keys, target_type)?; + let kv_map = shared_collect_validated_key_values( + notification_body + .key_values + .iter() + .map(|kv| (kv.key.as_str(), kv.value.as_str())), + &allowed_keys, + target_type, + "target", + )?; let spec = target_spec(&specs, target_type) .ok_or_else(|| s3_error!(InvalidArgument, "unsupported target type: '{}'", target_type))?; timeout(Duration::from_secs(10), validate_target_request(spec, &kv_map, EVENT_DEFAULT_DIR)) @@ -478,6 +369,7 @@ fn extract_target_params<'a>(params: &'a Params<'_, '_>) -> S3Result<(&'a str, & #[cfg(test)] mod tests { use super::*; + use matchit::Router; use rustfs_config::DEFAULT_DELIMITER; use rustfs_ecstore::config::{KV, KVS}; use rustfs_targets::arn::TargetID; @@ -513,14 +405,14 @@ mod tests { .find(|entry| entry.account_id == "mqtt-a" && entry.service == "mqtt") .expect("mqtt-a should be present"); assert_eq!(mqtt.status, "offline"); - assert_eq!(mqtt.source, NotificationEndpointSource::Config); + assert_eq!(mqtt.source, TargetEndpointSource::Config); let webhook = merged .iter() .find(|entry| entry.account_id == "webhook-a" && entry.service == "webhook") .expect("webhook-a should be present"); assert_eq!(webhook.status, "online"); - assert_eq!(webhook.source, NotificationEndpointSource::Config); + assert_eq!(webhook.source, TargetEndpointSource::Config); } #[test] @@ -542,14 +434,14 @@ mod tests { .find(|entry| entry.account_id == "env-only" && entry.service == "mqtt") .expect("env-only should be present"); assert_eq!(env_only.status, "offline"); - assert_eq!(env_only.source, NotificationEndpointSource::Runtime); + assert_eq!(env_only.source, TargetEndpointSource::Runtime); let enabled = merged .iter() .find(|entry| entry.account_id == "webhook-enabled" && entry.service == "webhook") .expect("webhook-enabled should be present"); assert_eq!(enabled.status, "online"); - assert_eq!(enabled.source, NotificationEndpointSource::Config); + assert_eq!(enabled.source, TargetEndpointSource::Config); } #[test] @@ -582,19 +474,55 @@ mod tests { .iter() .find(|entry| entry.account_id == "mixed-target") .expect("mixed target should be present"); - assert_eq!(mixed.source, NotificationEndpointSource::Mixed); + assert_eq!(mixed.source, TargetEndpointSource::Mixed); let env_only = merged .iter() .find(|entry| entry.account_id == "env-only") .expect("env-only target should be present"); - assert_eq!(env_only.source, NotificationEndpointSource::Env); + assert_eq!(env_only.source, TargetEndpointSource::Env); let config_only = merged .iter() .find(|entry| entry.account_id == "config-target") .expect("config target should be present"); - assert_eq!(config_only.source, NotificationEndpointSource::Config); + assert_eq!(config_only.source, TargetEndpointSource::Config); + }, + ); + } + + #[test] + fn merge_notification_endpoints_marks_kafka_env_and_mixed_sources() { + let config = Config(HashMap::from([( + NOTIFY_KAFKA_SUB_SYS.to_string(), + HashMap::from([("mixed-kafka".to_string(), enabled_kvs("on"))]), + )])); + + with_vars( + [ + ("RUSTFS_NOTIFY_KAFKA_ENABLE_MIXED-KAFKA", Some("on")), + ("RUSTFS_NOTIFY_KAFKA_BROKERS_MIXED-KAFKA", Some("127.0.0.1:9092")), + ("RUSTFS_NOTIFY_KAFKA_ENABLE_ENV-KAFKA", Some("on")), + ("RUSTFS_NOTIFY_KAFKA_BROKERS_ENV-KAFKA", Some("127.0.0.1:9093")), + ], + || { + let runtime = HashMap::from([ + (("mixed-kafka".to_string(), "kafka".to_string()), "online".to_string()), + (("env-kafka".to_string(), "kafka".to_string()), "online".to_string()), + ]); + let merged = merge_notification_endpoints(&config, runtime); + + let mixed = merged + .iter() + .find(|entry| entry.account_id == "mixed-kafka" && entry.service == "kafka") + .expect("mixed kafka target should be present"); + assert_eq!(mixed.source, TargetEndpointSource::Mixed); + + let env_only = merged + .iter() + .find(|entry| entry.account_id == "env-kafka" && entry.service == "kafka") + .expect("env kafka target should be present"); + assert_eq!(env_only.source, TargetEndpointSource::Env); }, ); } @@ -656,7 +584,7 @@ mod tests { .iter() .find(|entry| entry.account_id == "mixed-disabled") .expect("mixed target should be present"); - assert_eq!(mixed.source, NotificationEndpointSource::Mixed); + assert_eq!(mixed.source, TargetEndpointSource::Mixed); assert_eq!(mixed.status, "offline"); }, ); @@ -677,7 +605,7 @@ mod tests { .iter() .find(|entry| entry.account_id == "env-only") .expect("env-only target should be present"); - assert_eq!(env_only.source, NotificationEndpointSource::Env); + assert_eq!(env_only.source, TargetEndpointSource::Env); assert_eq!(env_only.status, "offline"); }, ); @@ -686,7 +614,7 @@ mod tests { #[test] fn collect_validated_key_values_rejects_duplicate_keys() { let allowed_keys: HashSet<&str> = ["endpoint", "auth_token"].into_iter().collect(); - let key_values = vec![ + let key_values = [ KeyValue { key: "endpoint".to_string(), value: "https://example.com/one".to_string(), @@ -697,7 +625,13 @@ mod tests { }, ]; - let err = collect_validated_key_values(&key_values, &allowed_keys, NOTIFY_WEBHOOK_SUB_SYS).unwrap_err(); + let err = shared_collect_validated_key_values( + key_values.iter().map(|kv| (kv.key.as_str(), kv.value.as_str())), + &allowed_keys, + NOTIFY_WEBHOOK_SUB_SYS, + "target", + ) + .unwrap_err(); assert!(err.to_string().contains("duplicate key")); } @@ -720,7 +654,7 @@ mod tests { .iter() .find(|entry| entry.account_id == "PrimaryCase" && entry.service == "webhook") .expect("mixed target should be present"); - assert_eq!(mixed.source, NotificationEndpointSource::Mixed); + assert_eq!(mixed.source, TargetEndpointSource::Mixed); }, ); } @@ -785,6 +719,22 @@ mod tests { ); } + #[test] + fn extract_target_params_accepts_kafka_target_type() { + let mut router = Router::new(); + router + .insert("/v3/target/{target_type}/{target_name}", ()) + .expect("route should insert"); + + let params = router + .at("/v3/target/notify_kafka/streaming") + .expect("route should match") + .params; + let (target_type, target_name) = extract_target_params(¶ms).expect("kafka target type should be accepted"); + assert_eq!(target_type, NOTIFY_KAFKA_SUB_SYS); + assert_eq!(target_name, "streaming"); + } + fn extract_block_between_markers<'a>(src: &'a str, start_marker: &str, end_marker: &str) -> &'a str { let start = src .find(start_marker) diff --git a/rustfs/src/admin/handlers/target_descriptor.rs b/rustfs/src/admin/handlers/target_descriptor.rs index de2c78db6..115cb83d3 100644 --- a/rustfs/src/admin/handlers/target_descriptor.rs +++ b/rustfs/src/admin/handlers/target_descriptor.rs @@ -14,16 +14,19 @@ use hashbrown::HashSet as HbHashSet; use rustfs_config::{ - ENABLE_KEY, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_TLS_CA, MQTT_TLS_CLIENT_CERT, MQTT_TLS_CLIENT_KEY, MQTT_TLS_POLICY, - MQTT_TLS_TRUST_LEAF_AS_CA, MQTT_TOPIC, MQTT_USERNAME, MQTT_WS_PATH_ALLOWLIST, + ENABLE_KEY, KAFKA_BROKERS, KAFKA_QUEUE_DIR, KAFKA_TOPIC, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_TLS_CA, + MQTT_TLS_CLIENT_CERT, MQTT_TLS_CLIENT_KEY, MQTT_TLS_POLICY, MQTT_TLS_TRUST_LEAF_AS_CA, MQTT_TOPIC, MQTT_USERNAME, + MQTT_WS_PATH_ALLOWLIST, }; use rustfs_ecstore::config::Config; use rustfs_targets::{ - TargetError, check_mqtt_broker_available_with_tls, check_nats_server_available, check_pulsar_broker_available, - config::{build_nats_args, build_pulsar_args, collect_env_target_instance_ids}, + TargetError, check_kafka_broker_available, check_mqtt_broker_available_with_tls, check_nats_server_available, + check_pulsar_broker_available, + config::{build_kafka_args, build_nats_args, build_pulsar_args, collect_env_target_instance_ids}, target::{TargetType, mqtt::MQTTTlsConfig}, }; use s3s::{S3Result, s3_error}; +use serde::Serialize; use std::collections::{HashMap, HashSet}; use std::io::{Error, ErrorKind}; use std::path::Path; @@ -32,6 +35,22 @@ use url::Url; pub(crate) type EndpointKey = (String, String); +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] +#[serde(rename_all = "lowercase")] +pub(crate) enum TargetEndpointSource { + Config, + Env, + Mixed, + Runtime, +} + +pub(crate) struct MergedTargetEndpoint { + pub account_id: String, + pub service: String, + pub status: String, + pub source: TargetEndpointSource, +} + #[derive(Clone, Copy)] pub(crate) enum TargetDomain { Notify, @@ -51,6 +70,7 @@ impl TargetDomain { pub(crate) enum AdminTargetValidator { Webhook, Mqtt, + Kafka(TargetDomain), Nats(TargetDomain), Pulsar(TargetDomain), } @@ -125,12 +145,160 @@ pub(crate) fn collect_env_endpoint_keys(specs: &[AdminTargetSpec], route_prefix: endpoints } +pub(crate) fn classify_endpoint_source( + config_targets: &HbHashSet, + env_targets: &HbHashSet, + key: &EndpointKey, +) -> TargetEndpointSource { + match (config_targets.contains(key), env_targets.contains(key)) { + (true, true) => TargetEndpointSource::Mixed, + (true, false) => TargetEndpointSource::Config, + (false, true) => TargetEndpointSource::Env, + (false, false) => TargetEndpointSource::Runtime, + } +} + +pub(crate) fn endpoint_source( + specs: &[AdminTargetSpec], + route_prefix: &str, + config: &Config, + target_type: &str, + target_name: &str, +) -> TargetEndpointSource { + let config_targets = collect_config_entry_keys(specs, config); + let env_targets = collect_env_endpoint_keys(specs, route_prefix); + let service = target_service_name(specs, target_type).unwrap_or_default(); + let key = normalized_endpoint_key(target_name, service); + classify_endpoint_source(&config_targets, &env_targets, &key) +} + +pub(crate) fn target_mutation_block_reason( + specs: &[AdminTargetSpec], + route_prefix: &str, + config: &Config, + target_type: &str, + target_name: &str, + target_label: &str, +) -> Option { + match endpoint_source(specs, route_prefix, config, target_type, target_name) { + TargetEndpointSource::Env => Some(format!( + "{} '{}' is managed by environment variables and cannot be modified from the console", + target_label, target_name + )), + TargetEndpointSource::Mixed => Some(format!( + "{} '{}' is configured by both persisted config and environment variables; remove the environment variables first", + target_label, target_name + )), + TargetEndpointSource::Config | TargetEndpointSource::Runtime => None, + } +} + +pub(crate) fn merge_target_endpoints( + specs: &[AdminTargetSpec], + route_prefix: &str, + config: &Config, + runtime_statuses: HashMap, +) -> Vec { + let mut endpoints = Vec::new(); + let mut seen = HashSet::new(); + let configured_keys = collect_configured_endpoint_keys(specs, config); + let config_targets = collect_config_entry_keys(specs, config); + let env_targets = collect_env_endpoint_keys(specs, route_prefix); + let mut normalized_runtime_statuses: HashMap = HashMap::new(); + + for ((account_id, service), status) in runtime_statuses { + let normalized = normalized_endpoint_key(&account_id, &service); + normalized_runtime_statuses + .entry(normalized) + .or_insert((account_id, service, status)); + } + + for key in configured_keys { + let normalized = normalized_endpoint_key(&key.0, &key.1); + if !seen.insert(normalized.clone()) { + continue; + } + + let status = normalized_runtime_statuses + .remove(&normalized) + .map(|(_, _, status)| status) + .unwrap_or_else(|| "offline".to_string()); + + endpoints.push(MergedTargetEndpoint { + account_id: key.0, + service: key.1, + status, + source: classify_endpoint_source(&config_targets, &env_targets, &normalized), + }); + } + + for (normalized, (account_id, service, status)) in normalized_runtime_statuses { + if seen.insert(normalized.clone()) { + endpoints.push(MergedTargetEndpoint { + account_id, + service, + status, + source: classify_endpoint_source(&config_targets, &env_targets, &normalized), + }); + } + } + + for key in &env_targets { + if !seen.insert(key.clone()) { + continue; + } + + endpoints.push(MergedTargetEndpoint { + account_id: key.0.clone(), + service: key.1.clone(), + status: "offline".to_string(), + source: classify_endpoint_source(&config_targets, &env_targets, key), + }); + } + + endpoints.sort_by(|a, b| a.service.cmp(&b.service).then_with(|| a.account_id.cmp(&b.account_id))); + endpoints +} + pub(crate) fn allowed_target_keys(specs: &[AdminTargetSpec], target_type: &str) -> HashSet<&'static str> { target_spec(specs, target_type) .map(|spec| spec.valid_keys.iter().copied().collect()) .unwrap_or_default() } +pub(crate) fn collect_validated_key_values<'a, I>( + key_values: I, + allowed_keys: &HashSet<&str>, + target_type: &str, + target_label: &str, +) -> S3Result> +where + I: IntoIterator, +{ + let mut kv_map = HashMap::new(); + let mut seen = HashSet::new(); + + for (key, value) in key_values { + if !allowed_keys.contains(key) { + return Err(s3_error!( + InvalidArgument, + "key '{}' not allowed for {} type '{}'", + key, + target_label, + target_type + )); + } + + if !seen.insert(key) { + return Err(s3_error!(InvalidArgument, "duplicate key '{}' in request body", key)); + } + + kv_map.insert(key.to_string(), value.to_string()); + } + + Ok(kv_map) +} + pub(crate) async fn validate_queue_dir(queue_dir: &str) -> S3Result<()> { if !queue_dir.is_empty() { if !Path::new(queue_dir).is_absolute() { @@ -159,6 +327,7 @@ pub(crate) async fn validate_target_request( match spec.validator { AdminTargetValidator::Webhook => validate_webhook_request(kv_map).await, AdminTargetValidator::Mqtt => validate_mqtt_request(kv_map).await, + AdminTargetValidator::Kafka(domain) => validate_kafka_request(kv_map, default_queue_dir, domain).await, AdminTargetValidator::Nats(domain) => validate_nats_request(kv_map, default_queue_dir, domain).await, AdminTargetValidator::Pulsar(domain) => validate_pulsar_request(kv_map, default_queue_dir, domain).await, } @@ -274,6 +443,26 @@ async fn validate_nats_request(kv_map: &HashMap, default_queue_d }) } +async fn validate_kafka_request(kv_map: &HashMap, default_queue_dir: &str, domain: TargetDomain) -> S3Result<()> { + if let Some(queue_dir) = kv_map.get(KAFKA_QUEUE_DIR) { + validate_queue_dir(queue_dir.as_str()).await?; + } + + if !kv_map.contains_key(KAFKA_BROKERS) { + return Err(s3_error!(InvalidArgument, "Kafka brokers are required")); + } + if !kv_map.contains_key(KAFKA_TOPIC) { + return Err(s3_error!(InvalidArgument, "Kafka topic is required")); + } + + let args = build_kafka_args(&to_kvs(kv_map), default_queue_dir, domain.runtime_target_type()) + .map_err(|e| s3_error!(InvalidArgument, "{}", e))?; + check_kafka_broker_available(&args).await.map_err(|e| match e { + TargetError::Configuration(_) => s3_error!(InvalidArgument, "{}", e), + _ => s3_error!(InvalidArgument, "Kafka broker check failed: {}", e), + }) +} + async fn validate_pulsar_request( kv_map: &HashMap, default_queue_dir: &str,