mirror of
https://github.com/dushixiang/pika.git
synced 2026-05-07 06:16:43 +08:00
Merge pull request #78 from yumusb/main
Support metric bundles and enable WS compression
This commit is contained in:
@@ -43,8 +43,9 @@ func NewAgentHandler(logger *zap.Logger, agentService *service.AgentService, tra
|
||||
|
||||
// 初始化upgrader,需要在创建handler之后因为需要引用h.checkOrigin
|
||||
h.upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024 * 32,
|
||||
WriteBufferSize: 1024 * 32,
|
||||
ReadBufferSize: 1024 * 32,
|
||||
WriteBufferSize: 1024 * 32,
|
||||
EnableCompression: true,
|
||||
}
|
||||
|
||||
// 设置WebSocket消息处理器
|
||||
|
||||
@@ -80,6 +80,9 @@ func (h *AgentHandler) handleWebSocketMessage(ctx context.Context, agentID strin
|
||||
case protocol.MessageTypeMetrics:
|
||||
return h.handleMetricsMessage(ctx, agentID, data)
|
||||
|
||||
case protocol.MessageTypeBundle:
|
||||
return h.handleBundleMessage(ctx, agentID, data)
|
||||
|
||||
case protocol.MessageTypeCommandResp:
|
||||
return h.handleCommandResponseMessage(ctx, agentID, data)
|
||||
|
||||
@@ -172,6 +175,26 @@ func (h *AgentHandler) handleMetricsMessage(ctx context.Context, agentID string,
|
||||
return h.metricService.HandleMetricData(ctx, agentID, string(metricsWrapper.Type), metricsData, metricsWrapper.Timestamp)
|
||||
}
|
||||
|
||||
func (h *AgentHandler) handleBundleMessage(ctx context.Context, agentID string, data json.RawMessage) error {
|
||||
var bundle protocol.BundlePayload
|
||||
if err := json.Unmarshal(data, &bundle); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, item := range bundle.Items {
|
||||
metricsData, err := json.Marshal(item.Data)
|
||||
if err != nil {
|
||||
h.logger.Warn("failed to marshal bundled metric item", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if err := h.metricService.HandleMetricData(ctx, agentID, string(item.Type), metricsData, item.Timestamp); err != nil {
|
||||
h.logger.Warn("failed to handle bundled metric item", zap.Error(err), zap.String("type", string(item.Type)))
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *AgentHandler) handleCommandResponseMessage(ctx context.Context, agentID string, data json.RawMessage) error {
|
||||
var cmdResp protocol.CommandResponse
|
||||
if err := json.Unmarshal(data, &cmdResp); err != nil {
|
||||
|
||||
@@ -43,6 +43,11 @@ type MetricsPayload struct {
|
||||
Data interface{} `json:"data"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"` // 客户端采集时间(毫秒)
|
||||
}
|
||||
|
||||
// BundlePayload 批量指标数据包装
|
||||
type BundlePayload struct {
|
||||
Items []MetricsPayload `json:"items"`
|
||||
}
|
||||
type MessageType string
|
||||
|
||||
// 控制消息
|
||||
@@ -55,6 +60,7 @@ const (
|
||||
MessageTypeUninstall MessageType = "uninstall"
|
||||
// 指标消息
|
||||
MessageTypeMetrics MessageType = "metrics"
|
||||
MessageTypeBundle MessageType = "bundle"
|
||||
MessageTypeMonitorConfig MessageType = "monitor_config"
|
||||
// 防篡改消息
|
||||
MessageTypeTamperProtect MessageType = "tamper_protect"
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
@@ -152,7 +153,11 @@ func (a *Agent) runOnce(ctx context.Context, onRegistered func()) error {
|
||||
slog.Info("正在连接到服务器", "url", wsURL)
|
||||
|
||||
// 创建自定义的 Dialer
|
||||
var dialer = websocket.DefaultDialer
|
||||
dialer := &websocket.Dialer{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
HandshakeTimeout: 45 * time.Second,
|
||||
EnableCompression: true,
|
||||
}
|
||||
if a.cfg.Server.InsecureSkipVerify {
|
||||
dialer.TLSClientConfig = &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
@@ -498,6 +503,26 @@ func (a *Agent) metricsLoop(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
type bundleWriter struct {
|
||||
items []protocol.MetricsPayload
|
||||
}
|
||||
|
||||
func (w *bundleWriter) WriteJSON(v interface{}) error {
|
||||
msg, ok := v.(protocol.OutboundMessage)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid message type for bundleWriter")
|
||||
}
|
||||
|
||||
if msg.Type == protocol.MessageTypeMetrics {
|
||||
payload, ok := msg.Data.(protocol.MetricsPayload)
|
||||
if ok {
|
||||
w.items = append(w.items, payload)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("only metrics messages can be bundled")
|
||||
}
|
||||
|
||||
// collectAndSendAllMetrics 采集并发送所有动态指标
|
||||
func (a *Agent) collectAndSendAllMetrics(manager *collector.Manager) error {
|
||||
if manager == nil {
|
||||
@@ -514,58 +539,73 @@ func (a *Agent) collectAndSendAllMetrics(manager *collector.Manager) error {
|
||||
}
|
||||
|
||||
writer := newOutboundWriter(conn, a.outboundBuffer)
|
||||
bw := &bundleWriter{}
|
||||
var hasError bool
|
||||
|
||||
// CPU 动态指标
|
||||
if err := manager.CollectAndSendCPU(writer); err != nil {
|
||||
slog.Warn("发送CPU指标失败", "error", err)
|
||||
if err := manager.CollectAndSendCPU(bw); err != nil {
|
||||
slog.Warn("采集CPU指标失败", "error", err)
|
||||
hasError = true
|
||||
}
|
||||
|
||||
// 内存动态指标
|
||||
if err := manager.CollectAndSendMemory(writer); err != nil {
|
||||
slog.Warn("发送内存指标失败", "error", err)
|
||||
if err := manager.CollectAndSendMemory(bw); err != nil {
|
||||
slog.Warn("采集内存指标失败", "error", err)
|
||||
hasError = true
|
||||
}
|
||||
|
||||
// 磁盘指标
|
||||
if err := manager.CollectAndSendDisk(writer); err != nil {
|
||||
slog.Warn("发送磁盘指标失败", "error", err)
|
||||
if err := manager.CollectAndSendDisk(bw); err != nil {
|
||||
slog.Warn("采集磁盘指标失败", "error", err)
|
||||
hasError = true
|
||||
}
|
||||
|
||||
// 磁盘 IO 指标
|
||||
if err := manager.CollectAndSendDiskIO(writer); err != nil {
|
||||
slog.Warn("发送磁盘IO指标失败", "error", err)
|
||||
if err := manager.CollectAndSendDiskIO(bw); err != nil {
|
||||
slog.Warn("采集磁盘IO指标失败", "error", err)
|
||||
hasError = true
|
||||
}
|
||||
|
||||
// 网络指标
|
||||
if err := manager.CollectAndSendNetwork(writer); err != nil {
|
||||
slog.Warn("发送网络指标失败", "error", err)
|
||||
if err := manager.CollectAndSendNetwork(bw); err != nil {
|
||||
slog.Warn("采集网络指标失败", "error", err)
|
||||
hasError = true
|
||||
}
|
||||
|
||||
// 网络连接统计
|
||||
if err := manager.CollectAndSendNetworkConnection(writer); err != nil {
|
||||
slog.Warn("发送网络连接统计失败", "error", err)
|
||||
if err := manager.CollectAndSendNetworkConnection(bw); err != nil {
|
||||
slog.Warn("采集网络连接统计失败", "error", err)
|
||||
hasError = true
|
||||
}
|
||||
|
||||
// 主机信息(包含 Load)
|
||||
if err := manager.CollectAndSendHost(writer); err != nil {
|
||||
slog.Warn("发送主机信息失败", "error", err)
|
||||
if err := manager.CollectAndSendHost(bw); err != nil {
|
||||
slog.Warn("采集主机信息失败", "error", err)
|
||||
hasError = true
|
||||
}
|
||||
|
||||
// GPU 信息(可选)
|
||||
if err := manager.CollectAndSendGPU(writer); err != nil {
|
||||
slog.Info("发送GPU信息失败", "error", err)
|
||||
if err := manager.CollectAndSendGPU(bw); err != nil {
|
||||
slog.Info("采集GPU信息失败", "error", err)
|
||||
}
|
||||
|
||||
// 温度信息(可选)
|
||||
if err := manager.CollectAndSendTemperature(writer); err != nil {
|
||||
slog.Info("发送温度信息失败", "error", err)
|
||||
if err := manager.CollectAndSendTemperature(bw); err != nil {
|
||||
slog.Info("采集温度信息失败", "error", err)
|
||||
}
|
||||
|
||||
// 发送打包后的指标
|
||||
if len(bw.items) > 0 {
|
||||
bundle := protocol.BundlePayload{
|
||||
Items: bw.items,
|
||||
}
|
||||
if err := writer.WriteJSON(protocol.OutboundMessage{
|
||||
Type: protocol.MessageTypeBundle,
|
||||
Data: bundle,
|
||||
}); err != nil {
|
||||
slog.Warn("发送打包指标失败", "error", err)
|
||||
hasError = true
|
||||
}
|
||||
}
|
||||
|
||||
if writer.buffered {
|
||||
|
||||
Reference in New Issue
Block a user