mirror of
https://github.com/rustfs/rustfs.git
synced 2026-05-07 06:37:42 +08:00
feat(obs): enhance OpenTelemetry configuration and logging
Improve observability setup with the following changes: - Replace static OnceCell with tokio::sync::OnceCell for guard management - Add logger_level to OtelConfig for configurable tracing verbosity - Improve telemetry initialization with better error handling - Enhance logging filters and span configuration Breaking Changes: - Configuration now requires logger_level field - Global guard management uses async-safe primitives Example config: observability: endpoint: "http://localhost:4317" logger_level: "debug" # New required field
This commit is contained in:
@@ -5,7 +5,8 @@ sample_ratio = 0.5
|
||||
meter_interval = 30
|
||||
service_name = "rustfs"
|
||||
service_version = "0.1.0"
|
||||
deployment_environment = "develop"
|
||||
environment = "develop"
|
||||
looger_level = "info"
|
||||
|
||||
[sinks]
|
||||
[sinks.kafka] # Kafka sink is disabled by default
|
||||
|
||||
@@ -1,22 +1,40 @@
|
||||
use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION};
|
||||
use config::{Config, File, FileFormat};
|
||||
use serde::Deserialize;
|
||||
use std::env;
|
||||
|
||||
/// OpenTelemetry Configuration
|
||||
/// Add service name, service version, deployment environment
|
||||
/// Add service name, service version, environment
|
||||
/// Add interval time for metric collection
|
||||
/// Add sample ratio for trace sampling
|
||||
/// Add endpoint for metric collection
|
||||
/// Add use_stdout for output to stdout
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
/// Add logger level for log level
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct OtelConfig {
|
||||
pub endpoint: String,
|
||||
pub use_stdout: bool,
|
||||
pub sample_ratio: f64,
|
||||
pub meter_interval: u64,
|
||||
pub service_name: String,
|
||||
pub service_version: String,
|
||||
pub deployment_environment: String,
|
||||
pub use_stdout: Option<bool>,
|
||||
pub sample_ratio: Option<f64>,
|
||||
pub meter_interval: Option<u64>,
|
||||
pub service_name: Option<String>,
|
||||
pub service_version: Option<String>,
|
||||
pub environment: Option<String>,
|
||||
pub logger_level: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for OtelConfig {
|
||||
fn default() -> Self {
|
||||
OtelConfig {
|
||||
endpoint: "".to_string(),
|
||||
use_stdout: Some(true),
|
||||
sample_ratio: Some(SAMPLE_RATIO),
|
||||
meter_interval: Some(METER_INTERVAL),
|
||||
service_name: Some(SERVICE_NAME.to_string()),
|
||||
service_version: Some(SERVICE_VERSION.to_string()),
|
||||
environment: Some(ENVIRONMENT.to_string()),
|
||||
logger_level: Some(LOGGER_LEVEL.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Kafka Sink Configuration - Add batch parameters
|
||||
@@ -39,7 +57,7 @@ pub struct WebhookSinkConfig {
|
||||
}
|
||||
|
||||
/// File Sink Configuration - Add buffering parameters
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct FileSinkConfig {
|
||||
pub enabled: bool,
|
||||
pub path: String,
|
||||
@@ -48,6 +66,18 @@ pub struct FileSinkConfig {
|
||||
pub flush_threshold: Option<usize>, // Refresh threshold, default 100 logs
|
||||
}
|
||||
|
||||
impl Default for FileSinkConfig {
|
||||
fn default() -> Self {
|
||||
FileSinkConfig {
|
||||
enabled: true,
|
||||
path: "logs/app.log".to_string(),
|
||||
buffer_size: Some(8192),
|
||||
flush_interval_ms: Some(1000),
|
||||
flush_threshold: Some(100),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Sink configuration collection
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
pub struct SinkConfig {
|
||||
@@ -57,11 +87,19 @@ pub struct SinkConfig {
|
||||
}
|
||||
|
||||
///Logger Configuration
|
||||
#[derive(Debug, Deserialize, Clone, Default)]
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct LoggerConfig {
|
||||
pub queue_capacity: Option<usize>,
|
||||
}
|
||||
|
||||
impl Default for LoggerConfig {
|
||||
fn default() -> Self {
|
||||
LoggerConfig {
|
||||
queue_capacity: Some(1000),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Overall application configuration
|
||||
/// Add observability, sinks, and logger configuration
|
||||
///
|
||||
|
||||
89
packages/obs/src/global.rs
Normal file
89
packages/obs/src/global.rs
Normal file
@@ -0,0 +1,89 @@
|
||||
use crate::telemetry::OtelGuard;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::{OnceCell, SetError};
|
||||
use tracing::{error, info};
|
||||
|
||||
pub(crate) const USE_STDOUT: bool = true;
|
||||
pub(crate) const SERVICE_NAME: &str = "RustFS";
|
||||
pub(crate) const SAMPLE_RATIO: f64 = 1.0;
|
||||
pub(crate) const METER_INTERVAL: u64 = 60;
|
||||
pub(crate) const SERVICE_VERSION: &str = "0.1.0";
|
||||
pub(crate) const ENVIRONMENT: &str = "development";
|
||||
pub(crate) const LOGGER_LEVEL: &str = "info";
|
||||
|
||||
/// Global guard for OpenTelemetry tracing
|
||||
static GLOBAL_GUARD: OnceCell<Arc<Mutex<OtelGuard>>> = OnceCell::const_new();
|
||||
|
||||
/// Error type for global guard operations
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum GuardError {
|
||||
#[error("Failed to set global guard: {0}")]
|
||||
SetError(#[from] SetError<Arc<Mutex<OtelGuard>>>),
|
||||
#[error("Global guard not initialized")]
|
||||
NotInitialized,
|
||||
}
|
||||
|
||||
/// Set the global guard for OpenTelemetry
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `guard` - The OtelGuard instance to set globally
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Ok(())` if successful
|
||||
/// * `Err(GuardError)` if setting fails
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use rustfs_obs::{init_telemetry, load_config, set_global_guard};
|
||||
///
|
||||
/// async fn init() -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let config = load_config(None);
|
||||
/// let guard = init_telemetry(&config.observability).await?;
|
||||
/// set_global_guard(guard)?;
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub fn set_global_guard(guard: OtelGuard) -> Result<(), GuardError> {
|
||||
info!("Initializing global OpenTelemetry guard");
|
||||
GLOBAL_GUARD.set(Arc::new(Mutex::new(guard))).map_err(GuardError::SetError)
|
||||
}
|
||||
|
||||
/// Get the global guard for OpenTelemetry
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Ok(Arc<Mutex<OtelGuard>>)` if guard exists
|
||||
/// * `Err(GuardError)` if guard not initialized
|
||||
///
|
||||
/// # Example
|
||||
/// ```rust
|
||||
/// use rustfs_obs::get_global_guard;
|
||||
///
|
||||
/// async fn trace_operation() -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let guard = get_global_guard()?;
|
||||
/// let _lock = guard.lock().unwrap();
|
||||
/// // Perform traced operation
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub fn get_global_guard() -> Result<Arc<Mutex<OtelGuard>>, GuardError> {
|
||||
GLOBAL_GUARD.get().cloned().ok_or(GuardError::NotInitialized)
|
||||
}
|
||||
|
||||
/// Try to get the global guard for OpenTelemetry
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Some(Arc<Mutex<OtelGuard>>)` if guard exists
|
||||
/// * `None` if guard not initialized
|
||||
pub fn try_get_global_guard() -> Option<Arc<Mutex<OtelGuard>>> {
|
||||
GLOBAL_GUARD.get().cloned()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
#[tokio::test]
|
||||
async fn test_get_uninitialized_guard() {
|
||||
let result = get_global_guard();
|
||||
assert!(matches!(result, Err(GuardError::NotInitialized)));
|
||||
}
|
||||
}
|
||||
@@ -29,12 +29,12 @@
|
||||
/// ```
|
||||
mod config;
|
||||
mod entry;
|
||||
mod global;
|
||||
mod logger;
|
||||
mod sink;
|
||||
mod telemetry;
|
||||
mod utils;
|
||||
mod worker;
|
||||
|
||||
pub use config::load_config;
|
||||
pub use config::{AppConfig, OtelConfig};
|
||||
pub use entry::args::Args;
|
||||
@@ -42,15 +42,15 @@ pub use entry::audit::{ApiDetails, AuditLogEntry};
|
||||
pub use entry::base::BaseLogEntry;
|
||||
pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry};
|
||||
pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel};
|
||||
pub use logger::start_logger;
|
||||
pub use logger::{
|
||||
ensure_logger_initialized, get_global_logger, init_global_logger, locked_logger, log_debug, log_error, log_info, log_trace,
|
||||
log_warn, log_with_context,
|
||||
};
|
||||
pub use global::{get_global_guard, set_global_guard, try_get_global_guard, GuardError};
|
||||
pub use logger::{ensure_logger_initialized, log_debug, log_error, log_info, log_trace, log_warn, log_with_context};
|
||||
pub use logger::{get_global_logger, init_global_logger, locked_logger, start_logger};
|
||||
pub use logger::{log_init_state, InitLogStatus};
|
||||
pub use logger::{LogError, Logger};
|
||||
pub use sink::Sink;
|
||||
use std::sync::Arc;
|
||||
pub use telemetry::{get_global_registry, init_telemetry, metrics_handler};
|
||||
pub use telemetry::init_telemetry;
|
||||
pub use telemetry::{get_global_registry, metrics};
|
||||
use tokio::sync::Mutex;
|
||||
pub use utils::{get_local_ip, get_local_ip_with_default};
|
||||
pub use worker::start_worker;
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, ServerLogEntry, Sink, UnifiedLogEntry};
|
||||
use crate::global::{ENVIRONMENT, SERVICE_NAME, SERVICE_VERSION};
|
||||
use crate::{AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, OtelConfig, ServerLogEntry, Sink, UnifiedLogEntry};
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use tokio::sync::{Mutex, OnceCell};
|
||||
use tracing_core::Level;
|
||||
@@ -471,3 +473,62 @@ pub async fn log_with_context(
|
||||
.write_with_context(message, source, level, request_id, user_id, fields)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Log initialization status
|
||||
#[derive(Debug)]
|
||||
pub struct InitLogStatus {
|
||||
pub timestamp: SystemTime,
|
||||
pub service_name: String,
|
||||
pub version: String,
|
||||
pub environment: String,
|
||||
}
|
||||
|
||||
impl Default for InitLogStatus {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
timestamp: SystemTime::now(),
|
||||
service_name: String::from(SERVICE_NAME),
|
||||
version: SERVICE_VERSION.to_string(),
|
||||
environment: ENVIRONMENT.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InitLogStatus {
|
||||
pub fn new_config(config: &OtelConfig) -> Self {
|
||||
let config = config.clone();
|
||||
let environment = config.environment.unwrap_or(ENVIRONMENT.to_string());
|
||||
let version = config.service_version.unwrap_or(SERVICE_VERSION.to_string());
|
||||
Self {
|
||||
timestamp: SystemTime::now(),
|
||||
service_name: String::from(SERVICE_NAME),
|
||||
version,
|
||||
environment,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn init_start_log(config: &OtelConfig) -> Result<(), LogError> {
|
||||
let status = Self::new_config(config);
|
||||
log_init_state(Some(status)).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Log initialization details during system startup
|
||||
pub async fn log_init_state(status: Option<InitLogStatus>) -> Result<(), LogError> {
|
||||
let status = status.unwrap_or_default();
|
||||
|
||||
let base_entry = BaseLogEntry::new()
|
||||
.timestamp(chrono::DateTime::from(status.timestamp))
|
||||
.message(Some(format!(
|
||||
"Service initialization started - {} v{} in {}",
|
||||
status.service_name, status.version, status.environment
|
||||
)))
|
||||
.request_id(Some("system_init".to_string()));
|
||||
|
||||
let server_entry = ServerLogEntry::new(Level::INFO, "system_initialization".to_string())
|
||||
.with_base(base_entry)
|
||||
.user_id(Some("system".to_string()));
|
||||
|
||||
get_global_logger().lock().await.log_server_entry(server_entry).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::global::{ENVIRONMENT, LOGGER_LEVEL, METER_INTERVAL, SAMPLE_RATIO, SERVICE_NAME, SERVICE_VERSION, USE_STDOUT};
|
||||
use crate::{get_local_ip_with_default, OtelConfig};
|
||||
use opentelemetry::trace::TracerProvider;
|
||||
use opentelemetry::{global, KeyValue};
|
||||
@@ -10,7 +11,7 @@ use opentelemetry_sdk::{
|
||||
Resource,
|
||||
};
|
||||
use opentelemetry_semantic_conventions::{
|
||||
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_NAME, SERVICE_VERSION},
|
||||
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, NETWORK_LOCAL_ADDRESS, SERVICE_VERSION as OTEL_SERVICE_VERSION},
|
||||
SCHEMA_URL,
|
||||
};
|
||||
use prometheus::{Encoder, Registry, TextEncoder};
|
||||
@@ -45,6 +46,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte
|
||||
/// // When it's dropped, all telemetry components are properly shut down
|
||||
/// drop(otel_guard);
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct OtelGuard {
|
||||
tracer_provider: SdkTracerProvider,
|
||||
meter_provider: SdkMeterProvider,
|
||||
@@ -94,14 +96,14 @@ pub fn get_global_registry() -> Arc<Mutex<Registry>> {
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// use rustfs_obs::metrics_handler;
|
||||
/// use rustfs_obs::metrics;
|
||||
///
|
||||
/// async fn main() {
|
||||
/// let metrics = metrics_handler().await;
|
||||
/// let metrics = metrics().await;
|
||||
/// println!("{}", metrics);
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn metrics_handler() -> String {
|
||||
pub async fn metrics() -> String {
|
||||
let encoder = TextEncoder::new();
|
||||
// Get a reference to the registry for reading metrics
|
||||
let registry = get_global_registry().lock().await.to_owned();
|
||||
@@ -118,13 +120,13 @@ pub async fn metrics_handler() -> String {
|
||||
|
||||
/// create OpenTelemetry Resource
|
||||
fn resource(config: &OtelConfig) -> Resource {
|
||||
let config = config.clone();
|
||||
Resource::builder()
|
||||
.with_service_name(config.service_name.clone())
|
||||
.with_service_name(config.service_name.unwrap_or(SERVICE_NAME.to_string()))
|
||||
.with_schema_url(
|
||||
[
|
||||
KeyValue::new(SERVICE_NAME, config.service_name.clone()),
|
||||
KeyValue::new(SERVICE_VERSION, config.service_version.clone()),
|
||||
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.deployment_environment.clone()),
|
||||
KeyValue::new(OTEL_SERVICE_VERSION, config.service_version.unwrap_or(SERVICE_VERSION.to_string())),
|
||||
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.environment.unwrap_or(ENVIRONMENT.to_string())),
|
||||
KeyValue::new(NETWORK_LOCAL_ADDRESS, get_local_ip_with_default()),
|
||||
],
|
||||
SCHEMA_URL,
|
||||
@@ -132,16 +134,19 @@ fn resource(config: &OtelConfig) -> Resource {
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Creates a periodic reader for stdout metrics
|
||||
fn create_periodic_reader(interval: u64) -> PeriodicReader<opentelemetry_stdout::MetricExporter> {
|
||||
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
|
||||
.with_interval(std::time::Duration::from_secs(interval))
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Initialize Meter Provider
|
||||
fn init_meter_provider(config: &OtelConfig) -> SdkMeterProvider {
|
||||
let mut builder = MeterProviderBuilder::default().with_resource(resource(config));
|
||||
// If endpoint is empty, use stdout output
|
||||
if config.endpoint.is_empty() {
|
||||
builder = builder.with_reader(
|
||||
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
|
||||
.with_interval(std::time::Duration::from_secs(config.meter_interval))
|
||||
.build(),
|
||||
);
|
||||
builder = builder.with_reader(create_periodic_reader(config.meter_interval.unwrap_or(METER_INTERVAL)));
|
||||
} else {
|
||||
// If endpoint is not empty, use otlp output
|
||||
let exporter = opentelemetry_otlp::MetricExporter::builder()
|
||||
@@ -152,16 +157,12 @@ fn init_meter_provider(config: &OtelConfig) -> SdkMeterProvider {
|
||||
.unwrap();
|
||||
builder = builder.with_reader(
|
||||
PeriodicReader::builder(exporter)
|
||||
.with_interval(std::time::Duration::from_secs(config.meter_interval))
|
||||
.with_interval(std::time::Duration::from_secs(config.meter_interval.unwrap_or(METER_INTERVAL)))
|
||||
.build(),
|
||||
);
|
||||
// If use_stdout is true, output to stdout at the same time
|
||||
if config.use_stdout {
|
||||
builder = builder.with_reader(
|
||||
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default())
|
||||
.with_interval(std::time::Duration::from_secs(config.meter_interval))
|
||||
.build(),
|
||||
);
|
||||
if config.use_stdout.unwrap_or(USE_STDOUT) {
|
||||
builder = builder.with_reader(create_periodic_reader(config.meter_interval.unwrap_or(METER_INTERVAL)));
|
||||
}
|
||||
}
|
||||
let registry = Registry::new();
|
||||
@@ -177,8 +178,9 @@ fn init_meter_provider(config: &OtelConfig) -> SdkMeterProvider {
|
||||
|
||||
/// Initialize Tracer Provider
|
||||
fn init_tracer_provider(config: &OtelConfig) -> SdkTracerProvider {
|
||||
let sampler = if config.sample_ratio > 0.0 && config.sample_ratio < 1.0 {
|
||||
Sampler::TraceIdRatioBased(config.sample_ratio)
|
||||
let sample_ratio = config.sample_ratio.unwrap_or(SAMPLE_RATIO);
|
||||
let sampler = if sample_ratio > 0.0 && sample_ratio < 1.0 {
|
||||
Sampler::TraceIdRatioBased(sample_ratio)
|
||||
} else {
|
||||
Sampler::AlwaysOn
|
||||
};
|
||||
@@ -197,7 +199,7 @@ fn init_tracer_provider(config: &OtelConfig) -> SdkTracerProvider {
|
||||
.with_endpoint(&config.endpoint)
|
||||
.build()
|
||||
.unwrap();
|
||||
if config.use_stdout {
|
||||
if config.use_stdout.unwrap_or(USE_STDOUT) {
|
||||
builder
|
||||
.with_batch_exporter(exporter)
|
||||
.with_batch_exporter(opentelemetry_stdout::SpanExporter::default())
|
||||
@@ -215,7 +217,6 @@ fn init_tracer_provider(config: &OtelConfig) -> SdkTracerProvider {
|
||||
pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
|
||||
let tracer_provider = init_tracer_provider(config);
|
||||
let meter_provider = init_meter_provider(config);
|
||||
let tracer = tracer_provider.tracer(config.service_name.clone());
|
||||
|
||||
// Initialize logger provider based on configuration
|
||||
let logger_provider = {
|
||||
@@ -235,30 +236,41 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
|
||||
builder = builder.with_batch_exporter(exporter);
|
||||
|
||||
// Add stdout exporter if requested
|
||||
if config.use_stdout {
|
||||
if config.use_stdout.unwrap_or(USE_STDOUT) {
|
||||
builder = builder.with_batch_exporter(opentelemetry_stdout::LogExporter::default());
|
||||
}
|
||||
}
|
||||
|
||||
builder.build()
|
||||
};
|
||||
|
||||
let config = config.clone();
|
||||
let logger_level = config.logger_level.unwrap_or(LOGGER_LEVEL.to_string());
|
||||
let logger_level = logger_level.as_str();
|
||||
// Setup OpenTelemetryTracingBridge layer
|
||||
let otel_layer = {
|
||||
// Filter to prevent infinite telemetry loops
|
||||
// This blocks events from OpenTelemetry and its dependent libraries (tonic, reqwest, etc.)
|
||||
// from being sent back to OpenTelemetry itself
|
||||
let filter_otel = EnvFilter::new("info")
|
||||
.add_directive("hyper=off".parse().unwrap())
|
||||
.add_directive("opentelemetry=off".parse().unwrap())
|
||||
.add_directive("tonic=off".parse().unwrap())
|
||||
.add_directive("h2=off".parse().unwrap())
|
||||
.add_directive("reqwest=off".parse().unwrap());
|
||||
let filter_otel = match logger_level {
|
||||
"trace" | "debug" => {
|
||||
info!("OpenTelemetry tracing initialized with level: {}", logger_level);
|
||||
EnvFilter::new(logger_level)
|
||||
}
|
||||
_ => {
|
||||
let mut filter = EnvFilter::new(logger_level);
|
||||
for directive in ["hyper", "opentelemetry", "tonic", "h2", "reqwest"] {
|
||||
filter = filter.add_directive(format!("{}=off", directive).parse().unwrap());
|
||||
}
|
||||
filter
|
||||
}
|
||||
};
|
||||
|
||||
layer::OpenTelemetryTracingBridge::new(&logger_provider).with_filter(filter_otel)
|
||||
};
|
||||
|
||||
let tracer = tracer_provider.tracer(config.service_name.unwrap_or(SERVICE_NAME.to_string()));
|
||||
let registry = tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::filter::LevelFilter::INFO)
|
||||
.with(switch_level(logger_level))
|
||||
.with(OpenTelemetryLayer::new(tracer))
|
||||
.with(MetricsLayer::new(meter_provider.clone()))
|
||||
.with(otel_layer);
|
||||
@@ -271,14 +283,13 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
|
||||
.with_line_number(true);
|
||||
|
||||
// Creating a formatting layer with explicit type to avoid type mismatches
|
||||
let fmt_layer = if config.endpoint.is_empty() {
|
||||
// Local debug mode: add filter to show OpenTelemetry debug logs
|
||||
let filter_fmt = EnvFilter::new("info").add_directive("opentelemetry=debug".parse().unwrap());
|
||||
fmt_layer.with_filter(filter_fmt)
|
||||
} else {
|
||||
// Production mode: use default filter settings
|
||||
fmt_layer.with_filter(EnvFilter::new("info").add_directive("opentelemetry=off".parse().unwrap()))
|
||||
};
|
||||
let fmt_layer = fmt_layer.with_filter(
|
||||
EnvFilter::new(logger_level).add_directive(
|
||||
format!("opentelemetry={}", if config.endpoint.is_empty() { logger_level } else { "off" })
|
||||
.parse()
|
||||
.unwrap(),
|
||||
),
|
||||
);
|
||||
|
||||
registry.with(ErrorLayer::default()).with(fmt_layer).init();
|
||||
if !config.endpoint.is_empty() {
|
||||
@@ -291,3 +302,15 @@ pub fn init_telemetry(config: &OtelConfig) -> OtelGuard {
|
||||
logger_provider,
|
||||
}
|
||||
}
|
||||
|
||||
/// Switch log level
|
||||
fn switch_level(logger_level: &str) -> tracing_subscriber::filter::LevelFilter {
|
||||
match logger_level {
|
||||
"error" => tracing_subscriber::filter::LevelFilter::ERROR,
|
||||
"warn" => tracing_subscriber::filter::LevelFilter::WARN,
|
||||
"info" => tracing_subscriber::filter::LevelFilter::INFO,
|
||||
"debug" => tracing_subscriber::filter::LevelFilter::DEBUG,
|
||||
"trace" => tracing_subscriber::filter::LevelFilter::TRACE,
|
||||
_ => tracing_subscriber::filter::LevelFilter::OFF,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,35 +34,18 @@ use hyper_util::{
|
||||
service::TowerToHyperService,
|
||||
};
|
||||
use iam::init_iam_sys;
|
||||
use once_cell::sync::OnceCell;
|
||||
use protos::proto_gen::node_service::node_service_server::NodeServiceServer;
|
||||
use rustfs_obs::{init_obs, load_config, BaseLogEntry, ServerLogEntry};
|
||||
use rustfs_obs::{init_obs, load_config, set_global_guard, InitLogStatus};
|
||||
use s3s::{host::MultiDomain, service::S3ServiceBuilder};
|
||||
use service::hybrid;
|
||||
use std::time::SystemTime;
|
||||
use std::{io::IsTerminal, net::SocketAddr};
|
||||
use tokio::net::TcpListener;
|
||||
use tonic::{metadata::MetadataValue, Request, Status};
|
||||
use tower_http::cors::CorsLayer;
|
||||
use tracing::{debug, error, info, info_span, warn};
|
||||
use tracing_core::Level;
|
||||
use tracing_error::ErrorLayer;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
/// Guard that holds a reference to the global observability subsystem.
|
||||
/// This struct ensures that tracing/logging components remain alive for the
|
||||
/// application's lifetime. The underlying guard is dropped when this struct
|
||||
/// is dropped, which may flush logs or perform cleanup operations.
|
||||
#[allow(dead_code)]
|
||||
struct TracingGuard(Box<dyn std::any::Any + Send + Sync>);
|
||||
|
||||
impl Drop for TracingGuard {
|
||||
fn drop(&mut self) {
|
||||
debug!("Dropping global tracing guard, flushing logs");
|
||||
}
|
||||
}
|
||||
static GLOBAL_GUARD: OnceCell<TracingGuard> = OnceCell::new();
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn setup_tracing() {
|
||||
use tracing_subscriber::EnvFilter;
|
||||
@@ -107,34 +90,23 @@ fn print_server_info() {
|
||||
async fn main() -> Result<()> {
|
||||
// Parse the obtained parameters
|
||||
let opt = config::Opt::parse();
|
||||
// println!("config: {:?}", &opt);
|
||||
|
||||
// Load the configuration file
|
||||
let config = load_config(Some(opt.clone().obs_config));
|
||||
|
||||
// Initialize Observability
|
||||
let (logger, guard) = init_obs(config).await;
|
||||
let (_logger, guard) = init_obs(config.clone()).await;
|
||||
|
||||
// Pack and store the guard
|
||||
GLOBAL_GUARD.set(TracingGuard(Box::new(guard))).unwrap_or_else(|_| {
|
||||
error!("Unable to set global tracing guard");
|
||||
});
|
||||
// Store in global storage
|
||||
set_global_guard(guard)?;
|
||||
|
||||
// Initialize the logger
|
||||
let start_time = SystemTime::now();
|
||||
let base_entry = BaseLogEntry::new()
|
||||
.timestamp(chrono::DateTime::from(start_time))
|
||||
.message(Some("main init obs start".to_string()))
|
||||
.request_id(Some("main".to_string()));
|
||||
let server_entry = ServerLogEntry::new(Level::INFO, "main_server_entry".to_string())
|
||||
.with_base(base_entry)
|
||||
.user_id(Some("user_id".to_string()));
|
||||
let _r = logger.lock().await.log_server_entry(server_entry).await;
|
||||
// Log initialization status
|
||||
InitLogStatus::init_start_log(&config.observability).await?;
|
||||
|
||||
// Run parameters
|
||||
run(opt).await
|
||||
}
|
||||
//
|
||||
|
||||
// #[tokio::main]
|
||||
async fn run(opt: config::Opt) -> Result<()> {
|
||||
let span = info_span!("trace-main-run");
|
||||
@@ -142,7 +114,7 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
|
||||
debug!("opt: {:?}", &opt);
|
||||
|
||||
let mut server_addr = net::check_local_server_addr(opt.address.as_str()).unwrap();
|
||||
let mut server_addr = net::check_local_server_addr(opt.address.as_str())?;
|
||||
|
||||
if server_addr.port() == 0 {
|
||||
server_addr.set_port(get_available_port());
|
||||
@@ -155,8 +127,7 @@ async fn run(opt: config::Opt) -> Result<()> {
|
||||
debug!("server_address {}", &server_address);
|
||||
|
||||
//设置 AK 和 SK
|
||||
|
||||
iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone())).unwrap();
|
||||
iam::init_global_action_cred(Some(opt.access_key.clone()), Some(opt.secret_key.clone()))?;
|
||||
set_global_rustfs_port(server_port);
|
||||
|
||||
//监听地址,端口从参数中获取
|
||||
|
||||
Reference in New Issue
Block a user