Files
easynode/server/app/socket/file-transfer.js
2026-01-28 00:05:11 +08:00

930 lines
30 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const path = require('path')
const { Client: SSHClient } = require('ssh2')
const { fileTransferThrottle } = require('../utils/tools')
const { createSecureWs } = require('../utils/ws-tool')
const { getConnectionOptions } = require('./terminal')
const { FileTransferDB, HostListDB } = require('../utils/db-class')
const decryptAndExecuteAsync = require('../utils/decrypt-file')
const fileTransferDB = new FileTransferDB().getInstance()
const hostListDB = new HostListDB().getInstance()
// 全局传输任务管理
const activeTasks = new Map() // taskId -> { process, sshClient, status, ... }
// 任务排序函数运行中的任务优先然后按updateTime降序
function sortTasks(tasks) {
return tasks.sort((a, b) => {
// 状态优先级running > 其他状态
const statusPriority = { running: 0 }
const aPriority = statusPriority[a.status] ?? 1
const bPriority = statusPriority[b.status] ?? 1
if (aPriority !== bPriority) {
return aPriority - bPriority
}
// 相同状态按updateTime降序
return (b.updateTime || 0) - (a.updateTime || 0)
})
}
// 获取排序后的任务列表(合并数据库和内存状态)
async function getSortedTasksList() {
const tasks = await fileTransferDB.findAsync({}, { sort: { updateTime: -1 } })
// 合并内存中的活跃任务状态
const tasksWithStatus = tasks.map(task => {
const activeTask = activeTasks.get(task.taskId)
if (activeTask) {
return {
...task,
status: activeTask.status,
progress: activeTask.progress,
speed: activeTask.speed,
eta: activeTask.eta,
errorMessage: activeTask.errorMessage
}
}
return task
})
// 按状态和时间重新排序
return sortTasks(tasksWithStatus)
}
module.exports = (httpServer) => {
const serverIo = createSecureWs(httpServer, '/file-transfer')
let connectionCount = 0
const connectedSockets = new Set() // 跟踪所有连接的socket
serverIo.on('connection', async (socket) => {
connectionCount++
connectedSockets.add(socket)
logger.info(`file-transfer websocket 已连接 - 当前连接数: ${ connectionCount }`)
// 连接时发送当前所有任务状态
socket.on('get_tasks', async () => {
try {
const sortedTasks = await getSortedTasksList()
socket.emit('tasks_list', sortedTasks)
// 检查是否有运行中的任务,如果有则启动定时推送
const runningTasks = sortedTasks.filter(task => task.status === 'running')
if (runningTasks.length > 0) {
startProgressBroadcast(socket)
}
} catch (error) {
socket.emit('error', { message: '获取任务列表失败', error: error.message })
}
})
// 启动传输任务
socket.on('start_transfer', async (transferConfig) => {
try {
const { createTransferTask = null } = (await decryptAndExecuteAsync(path.join(__dirname, 'plus.js'))) || {}
if (!createTransferTask) throw new Error('Plus功能解锁失败: createTransferTask')
const task = await createTransferTask(transferConfig, socket, hostListDB, fileTransferDB, executeTransfer)
socket.emit('task_started', { taskId: task.taskId, message: '传输任务已启动' })
// 广播更新的任务列表
const updatedTasks = await getSortedTasksList()
socket.emit('tasks_list', updatedTasks)
} catch (error) {
logger.error('启动传输任务失败:', error)
socket.emit('task_failed', {
taskId: transferConfig.taskId,
message: error.message
})
}
})
// 取消任务
socket.on('cancel_task', async ({ taskId }) => {
try {
await cancelTransferTask(taskId)
socket.emit('task_cancelled', { taskId, message: '任务已取消' })
} catch (error) {
socket.emit('error', { message: '取消任务失败', error: error.message })
}
})
// 重试任务
socket.on('retry_task', async ({ taskId }) => {
try {
const task = await fileTransferDB.findOneAsync({ taskId })
if (!task) {
throw new Error('任务不存在')
}
// 重置任务状态
await fileTransferDB.updateAsync(
{ taskId },
{
$set: {
status: 'running',
progress: 0,
speed: 0,
errorMessage: null,
updateTime: Date.now()
}
}
)
// 重新启动任务(不创建新任务,重用现有任务)
executeTransfer(task, socket)
socket.emit('task_started', { taskId: task.taskId, message: '任务重试中' })
} catch (error) {
socket.emit('error', { message: '重试任务失败', error: error.message })
}
})
// 删除单个任务
socket.on('delete_task', async ({ taskId }) => {
try {
const task = await fileTransferDB.findOneAsync({ taskId })
if (!task) {
throw new Error('任务不存在')
}
// 检查任务状态,不允许删除正在进行的任务
if (task.status === 'running') {
throw new Error('无法删除正在进行的任务,请先取消任务')
}
// 从数据库删除任务
await fileTransferDB.removeAsync({ taskId })
socket.emit('task_deleted', { taskId, message: '任务已删除' })
// 广播任务列表更新
const updatedTasks = await getSortedTasksList()
socket.emit('tasks_list', updatedTasks)
} catch (error) {
socket.emit('error', { message: '删除任务失败', error: error.message })
}
})
// 清空已完成任务
socket.on('clear_completed_tasks', async () => {
try {
// 删除所有已完成、失败、取消的任务
const result = await fileTransferDB.removeAsync(
{
status: { $in: ['completed', 'failed', 'cancelled'] }
},
{ multi: true }
)
socket.emit('tasks_cleared', {
count: result,
message: `已清空 ${ result } 个任务`
})
// 广播任务列表更新
const updatedTasks = await getSortedTasksList()
socket.emit('tasks_list', updatedTasks)
} catch (error) {
socket.emit('error', { message: '清空任务失败', error: error.message })
}
})
socket.on('disconnect', () => {
connectionCount--
connectedSockets.delete(socket)
// 清理该socket的进度广播定时器
stopProgressBroadcast(socket)
logger.info(`file-transfer websocket 断开连接 - 当前连接数: ${ connectionCount }`)
})
})
}
// 执行传输任务
async function executeTransfer(taskData, socket) {
const { taskId } = taskData
try {
// 更新任务状态为运行中确保updateTime是最新的
await updateTaskStatus(taskId, 'running', socket)
// 获取源主机连接配置
const sourceOptions = await getConnectionOptions(taskData.sourceHostId)
// 建立SSH连接到源主机
const { sshClient } = await connectToHost(sourceOptions)
// 注册活跃任务
activeTasks.set(taskId, {
sshClient,
status: 'running',
progress: 0,
speed: 0,
startTime: Date.now(),
keyFile: null, // 用于跟踪临时密钥文件
totalFiles: taskData.sourcePaths.length, // 初始化文件总数
sourcePaths: taskData.sourcePaths // 保存源文件路径信息
})
// 只支持Rsync传输
if (taskData.method === 'rsync') {
await executeRsyncTransfer(taskData, sshClient, socket)
} else {
throw new Error(`不支持的传输方法: ${ taskData.method },仅支持 rsync`)
}
// 传输完成
await updateTaskStatus(taskId, 'completed', socket)
} catch (error) {
logger.error(`传输任务 ${ taskId } 失败:`, error)
await updateTaskStatus(taskId, 'failed', socket, error.message)
} finally {
// 清理资源
const activeTask = activeTasks.get(taskId)
if (activeTask) {
if (activeTask.sshClient) {
activeTask.sshClient.end()
}
activeTasks.delete(taskId)
}
}
}
// Rsync传输实现
async function executeRsyncTransfer(taskData, sshClient, socket) {
const { taskId, sourcePaths, targetPath } = taskData
// 获取目标主机信息
const targetConnectionData = await getConnectionOptions(taskData.targetHostId)
const targetOptions = targetConnectionData.authInfo
const targetHostAuthType = targetOptions.password ? 'password' : 'privateKey'
logger.info(`目标主机认证方式: ${ targetHostAuthType }`)
// 构建Rsync命令
let rsyncCmd = []
let envVars = {}
// ssh密钥tmp路径
let keyFile = null
// 如果目标主机使用密码认证源主机需使用sshpass
if (targetHostAuthType === 'password') {
// 检查源主机sshpass是否可用
try {
await checkSshpassAvailable(sshClient)
// 使用环境变量方式传递密码,避免命令行参数解析问题
envVars.SSHPASS = targetOptions.password
rsyncCmd.push('sshpass', '-e') // -e 表示从环境变量读取密码
} catch (error) {
throw new Error('源主机未安装sshpass工具无法进行密码认证传输。请使用密钥认证或在源主机安装sshpass: apt-get install sshpass 或 yum install sshpass')
}
} else {
keyFile = await createRemoteTempKeyFile(sshClient, targetOptions.privateKey)
}
rsyncCmd.push('rsync', '-avz', '--progress', '--partial') // 归档、详细、压缩、进度、支持断点续传
// 添加增量同步和安全选项
rsyncCmd.push('--inplace', '--append') // 断点续传关键选项
// 添加更详细的进度输出选项
rsyncCmd.push('--stats', '--human-readable', '--itemize-changes') // 统计信息、可读格式、详细变更
// 构建SSH命令选项
const sshOptions = [
'-p', (targetOptions.port || 22).toString(),
'-o', 'StrictHostKeyChecking=no',
'-o', 'UserKnownHostsFile=/dev/null',
'-o', 'GlobalKnownHostsFile=/dev/null'
]
// 根据认证类型设置不同的SSH选项
if (targetOptions.password) {
sshOptions.push('-o', 'PreferredAuthentications=password')
} else {
sshOptions.push('-o', 'BatchMode=yes')
}
if (keyFile) {
sshOptions.push('-i', `"${ keyFile }"`)
// 记录临时密钥文件路径到活跃任务中
const activeTask = activeTasks.get(taskId)
if (activeTask) {
activeTask.keyFile = keyFile
}
}
// 封装成一个整体的 SSH 命令,放到双引号内,确保 rsync 正确解析
const sshCmd = `ssh ${ sshOptions.join(' ') } -o LogLevel=ERROR`
rsyncCmd.push('-e', `"${ sshCmd }"`)
// 添加传输选项
if (taskData.options.delete) {
rsyncCmd.push('--delete')
}
if (taskData.options.excludePatterns && taskData.options.excludePatterns.length > 0) {
taskData.options.excludePatterns.forEach(pattern => {
rsyncCmd.push('--exclude', pattern)
})
}
// 添加源和目标路径
rsyncCmd.push(...sourcePaths.map(item => item.path))
rsyncCmd.push(`${ targetOptions.username }@${ targetOptions.host }:"${ targetPath }"`)
logger.info(`执行Rsync命令: ${ rsyncCmd.join(' ') }`)
if (Object.keys(envVars).length > 0) {
logger.info(`环境变量: ${ Object.keys(envVars).join(', ') }`)
}
return new Promise((resolve, reject) => {
// 在源主机上执行Rsync命令
let finalCommand = rsyncCmd.join(' ')
// 环境变量,需要在命令前设置
if (Object.keys(envVars).length > 0) {
const envString = Object.entries(envVars)
.map(([key, value]) => `${ key }='${ value.replace(/'/g, '\'"\'"\'') }'`)
.join(' ')
finalCommand = `${ envString } ${ finalCommand }`
}
// logger.info(`最终Rsync命令: ${ finalCommand }`)
let start = false
sshClient.exec(finalCommand, (err, stream) => {
if (err) {
reject(err)
return
}
let errorOutput = ''
const activeTask = activeTasks.get(taskId)
stream.on('close', async (code) => {
if (code === 0) {
// 传输成功,设置为校验状态
const activeTask = activeTasks.get(taskId)
if (activeTask && activeTask.progressTracker) {
activeTask.progressTracker.isVerifying = true
await updateTaskProgress(taskId, activeTask.progressTracker, socket)
}
logger.info(`Rsync传输完成: ${ taskId }`)
resolve()
} else {
logger.error(`Rsync传输失败: ${ taskId }, 退出码: ${ code }`)
reject(new Error(`Rsync传输失败: ${ errorOutput || '未知错误' }`))
}
})
stream.on('data', (data) => {
if (!start) {
start = true
// 清理密钥文件并从内存中移除
if (activeTask.keyFile) {
cleanupRemoteKeyFile(activeTask.sshClient, activeTask.keyFile)
}
return
}
fileTransferThrottle(parseRsyncProgress(data.toString(), taskId, socket))
})
stream.stderr.on('data', (data) => {
if (!start) {
start = true
// 清理密钥文件并从内存中移除
if (activeTask.keyFile) {
cleanupRemoteKeyFile(activeTask.sshClient, activeTask.keyFile)
}
return
}
const output = data.toString()
errorOutput += output
logger.warn(`Rsync stderr: ${ output }`)
// 解析错误信息中的进度信息
fileTransferThrottle(parseRsyncProgress(output, taskId, socket))
})
// 存储stream引用以支持取消操作
if (activeTask) {
activeTask.stream = stream
}
})
})
}
// 解析Rsync进度
function parseRsyncProgress(output, taskId, socket) {
const activeTask = activeTasks.get(taskId)
if (!activeTask) return
// 初始化进度跟踪器(如果不存在)
if (!activeTask.progressTracker) {
activeTask.progressTracker = {
totalFiles: activeTask.totalFiles || 1,
completedFiles: 0,
currentFile: null,
files: new Map(), // 文件路径 -> 进度信息
overallProgress: 0,
isVerifying: false
}
}
const tracker = activeTask.progressTracker
const outputLine = output.trim()
// 添加调试日志
logger.info(`Rsync输出 [${ taskId }]: "${ outputLine }"`)
// 检测是否在校验阶段
if (outputLine.includes('verifying') ||
outputLine.includes('checking') ||
outputLine.includes('delta-transmission disabled') ||
(tracker.overallProgress >= 100 && outputLine.includes('receiving'))) {
if (!tracker.isVerifying) {
tracker.isVerifying = true
updateTaskProgress(taskId, tracker, socket)
}
return
}
// 如果还没有当前文件且总文件数为1使用第一个源文件作为当前文件
if (!tracker.currentFile && tracker.totalFiles === 1) {
const activeTask = activeTasks.get(taskId)
if (activeTask && activeTask.sourcePaths && activeTask.sourcePaths.length > 0) {
const sourcePath = activeTask.sourcePaths[0].path
const fileName = sourcePath.split('/').pop()
tracker.currentFile = fileName || sourcePath
if (!tracker.files.has(tracker.currentFile)) {
tracker.files.set(tracker.currentFile, {
progress: 0,
size: activeTask.sourcePaths[0].size || 0,
transferred: 0,
speed: 0,
status: 'transferring'
})
}
logger.info(`单文件传输初始化: ${ tracker.currentFile }`)
}
}
// 检测新文件开始传输
// 模式1: itemize-changes 格式 - <f+++++++++ filename
let filePathMatch = outputLine.match(/^<f\+{5,}\s+(.+)$/)
if (filePathMatch) {
const filePath = filePathMatch[1].trim()
tracker.currentFile = filePath
if (!tracker.files.has(filePath)) {
tracker.files.set(filePath, {
progress: 0,
size: 0,
transferred: 0,
speed: 0,
status: 'transferring'
})
}
logger.info(`检测到新文件传输(itemize格式): ${ filePath }`)
return
}
// 模式2: 传统文件路径格式
filePathMatch = outputLine.match(/^([^\s]+\/[^\s]+|[^\s]+\.[^\s]+)\s*$/)
if (filePathMatch && !outputLine.includes('%')) {
const filePath = filePathMatch[1]
tracker.currentFile = filePath
if (!tracker.files.has(filePath)) {
tracker.files.set(filePath, {
progress: 0,
size: 0,
transferred: 0,
speed: 0,
status: 'transferring'
})
}
logger.info(`检测到新文件传输(传统格式): ${ filePath }`)
return
}
// 解析进度信息
let fileProgress = null
let speed = 0
let eta = 0
let transferred = 0
// 模式1: 标准格式 - 1,024,000 100% 1.23MB/s 0:00:30 (xfr#1, to-chk=0/1)
let match = outputLine.match(/(\d+(?:,\d+)*)\s+(\d+)%\s+([\d.]+)([kKmMgGtT]?B\/s)\s+(\d+):(\d+):(\d+)/)
if (match) {
const [, transferredStr, percentage, speedVal, speedUnit, hours, minutes, seconds] = match
fileProgress = parseInt(percentage)
transferred = parseInt(transferredStr.replace(/,/g, ''))
const unitFactor = { 'B/s': 1, 'KB/s': 1024, 'MB/s': 1024 ** 2, 'GB/s': 1024 ** 3, 'TB/s': 1024 ** 4 }
speed = parseFloat(speedVal) * (unitFactor[speedUnit] || 1)
eta = parseInt(hours) * 3600 + parseInt(minutes) * 60 + parseInt(seconds)
logger.info(`Rsync标准格式解析 [${ taskId }]: ${ fileProgress }%, 速度: ${ speedVal }${ speedUnit }, ETA: ${ eta }s`)
} else {
// 模式2: 简化格式 - 100% 1.23MB/s
match = outputLine.match(/(\d+)%\s+([\d.]+)([KMGT]?B\/s)/)
if (match) {
fileProgress = parseInt(match[1])
const unit = match[3]
const unitFactor = { 'B/s': 1, 'KB/s': 1024, 'MB/s': 1024 ** 2, 'GB/s': 1024 ** 3, 'TB/s': 1024 ** 4 }
speed = parseFloat(match[2]) * (unitFactor[unit] || 1)
logger.info(`Rsync简化格式解析 [${ taskId }]: ${ fileProgress }%, 速度: ${ match[2] }${ unit }`)
} else {
// 模式3: 最简格式 - 只有百分比
match = outputLine.match(/(\d+)%/)
if (match) {
fileProgress = parseInt(match[1])
logger.info(`Rsync百分比解析 [${ taskId }]: ${ fileProgress }%`)
}
}
}
// 检测文件传输完成 - to-chk=x/y 格式
const checkMatch = outputLine.match(/to-chk=(\d+)\/(\d+)/)
if (checkMatch) {
const [, remaining, total] = checkMatch
const completed = parseInt(total) - parseInt(remaining)
tracker.completedFiles = completed
tracker.totalFiles = parseInt(total)
// 如果当前文件进度是100%,标记为完成并清除当前文件
if (tracker.currentFile && fileProgress === 100) {
const fileInfo = tracker.files.get(tracker.currentFile)
if (fileInfo) {
fileInfo.progress = 100
fileInfo.status = 'completed'
fileInfo.transferred = transferred
// 清除当前文件,为下一个文件做准备
logger.info(`文件传输完成: ${ tracker.currentFile }`)
tracker.currentFile = null
}
}
logger.info(`传输进度: ${ completed }/${ total } 文件完成`)
}
// 更新当前文件进度
if (fileProgress !== null) {
// 如果没有当前文件,但有进度信息,尝试找到正在传输的文件
if (!tracker.currentFile && tracker.files.size > 0) {
// 找到最后一个未完成的文件作为当前文件
for (const [fileName, fileInfo] of tracker.files.entries()) {
if (fileInfo.status === 'transferring' && fileInfo.progress < 100) {
tracker.currentFile = fileName
break
}
}
}
// 更新当前文件进度
if (tracker.currentFile) {
const fileInfo = tracker.files.get(tracker.currentFile)
if (fileInfo) {
fileInfo.progress = fileProgress
fileInfo.speed = speed
fileInfo.transferred = transferred
if (fileProgress === 100) {
fileInfo.status = 'completed'
// 文件完成后清除当前文件,为下一个文件做准备
logger.info(`文件传输完成: ${ tracker.currentFile }`)
tracker.currentFile = null
}
}
} else {
// 如果仍然没有当前文件,记录警告但继续处理
logger.warn(`收到进度信息但没有当前文件 [${ taskId }]: ${ fileProgress }%`)
}
}
// 计算总体进度
if (tracker.totalFiles > 0) {
if (tracker.totalFiles === 1) {
// 单文件传输:直接使用当前文件进度
if (tracker.currentFile && tracker.files.has(tracker.currentFile)) {
const currentFileInfo = tracker.files.get(tracker.currentFile)
tracker.overallProgress = currentFileInfo ? currentFileInfo.progress : 0
} else {
// 如果没有文件信息使用解析出的fileProgress
tracker.overallProgress = fileProgress || 0
}
} else {
// 多文件传输:基于已完成文件数 + 当前文件进度的总体进度
let overallProgress = (tracker.completedFiles / tracker.totalFiles) * 100
if (tracker.currentFile && tracker.files.has(tracker.currentFile)) {
const currentFileInfo = tracker.files.get(tracker.currentFile)
if (currentFileInfo && currentFileInfo.status !== 'completed') {
overallProgress += (currentFileInfo.progress / tracker.totalFiles)
}
}
tracker.overallProgress = Math.min(100, Math.round(overallProgress))
}
}
// 更新活跃任务状态
activeTask.progress = tracker.overallProgress
activeTask.speed = speed
updateTaskProgress(taskId, tracker, socket)
}
// 更新任务进度
async function updateTaskProgress(taskId, progressTracker, socket) {
try {
// 准备数据库更新数据
const updateData = {
progress: progressTracker.overallProgress,
updateTime: Date.now()
}
// 如果在校验阶段,添加状态
if (progressTracker.isVerifying) {
updateData.status = 'verifying'
}
await fileTransferDB.updateAsync(
{ taskId },
{ $set: updateData }
)
// 准备发送给前端的详细进度数据
const activeTask = activeTasks.get(taskId)
const progressData = {
taskId,
overallProgress: progressTracker.overallProgress,
completedFiles: progressTracker.completedFiles,
totalFiles: progressTracker.totalFiles,
currentFile: progressTracker.currentFile,
isVerifying: progressTracker.isVerifying,
speed: activeTask ? activeTask.speed : 0,
files: Array.from(progressTracker.files.entries()).map(([path, info]) => ({
path,
progress: info.progress,
status: info.status,
speed: info.speed,
transferred: info.transferred
}))
}
logger.info(`推送进度更新 [${ taskId }]:`, {
overall: progressData.overallProgress,
files: `${ progressData.completedFiles }/${ progressData.totalFiles }`,
current: progressData.currentFile,
speed: `${ (progressData.speed / 1024 / 1024).toFixed(1) }MB/s`,
verifying: progressData.isVerifying
})
// 发送给原始socket如果存在且连接
if (socket && socket.connected) {
socket.emit('task_progress', progressData)
}
} catch (error) {
logger.error('更新任务进度失败:', error)
}
}
// 更新任务状态
async function updateTaskStatus(taskId, status, socket, errorMessage = null) {
try {
const updateData = {
status,
updateTime: Date.now()
}
if (errorMessage) {
updateData.errorMessage = errorMessage
}
await fileTransferDB.updateAsync({ taskId }, { $set: updateData })
// 更新内存中的状态
const activeTask = activeTasks.get(taskId)
if (activeTask) {
activeTask.status = status
if (errorMessage) {
activeTask.errorMessage = errorMessage
}
}
// 通知前端
socket.emit('task_status_changed', {
taskId,
status,
errorMessage
})
} catch (error) {
logger.error('更新任务状态失败:', error)
}
}
// 取消传输任务
async function cancelTransferTask(taskId) {
const activeTask = activeTasks.get(taskId)
if (activeTask) {
// 终止SSH连接
if (activeTask.sshClient) {
activeTask.sshClient.end()
}
activeTasks.delete(taskId)
}
// 更新数据库状态
await fileTransferDB.updateAsync(
{ taskId },
{
$set: {
status: 'cancelled',
updateTime: Date.now()
}
}
)
}
// 连接到主机(文件传输专用,直连不走代理)
async function connectToHost(connectionOptions) {
return new Promise((resolve, reject) => {
const sshClient = new SSHClient()
sshClient.on('ready', () => {
resolve({ sshClient })
})
sshClient.on('error', (err) => {
reject(err)
})
// 直接连接,不使用代理
sshClient.connect(connectionOptions.authInfo)
})
}
// 检查sshpass工具是否可用
async function checkSshpassAvailable(sshClient) {
return new Promise((resolve, reject) => {
sshClient.exec('which sshpass', (err, stream) => {
if (err) {
reject(err)
return
}
let output = ''
stream.on('close', (code) => {
if (code === 0 && output.trim()) {
resolve(true)
} else {
reject(new Error('sshpass not found'))
}
})
stream.on('data', (data) => {
output += data.toString()
})
stream.stderr.on('data', () => {
// 忽略stderr
})
})
})
}
// 在源主机上创建临时密钥文件
async function createRemoteTempKeyFile(sshClient, privateKey) {
return new Promise((resolve, reject) => {
const remotePath = `/tmp/easynode_key_${ Date.now() }_${ Math.random().toString(36).slice(2) }`
sshClient.sftp((err, sftp) => {
if (err) return reject(err)
// 打开文件句柄
sftp.open(remotePath, 'w', 0o600, (openErr, handle) => {
if (openErr) {
sftp.end()
return reject(openErr)
}
// 写入私钥内容Buffer
const keyBuffer = Buffer.from(privateKey, 'utf-8')
sftp.write(handle, keyBuffer, 0, keyBuffer.length, 0, (writeErr) => {
if (writeErr) {
sftp.close(handle, () => sftp.end())
return reject(writeErr)
}
sftp.close(handle, (closeErr) => {
sftp.end()
if (closeErr) return reject(closeErr)
resolve(remotePath)
})
})
})
})
})
}
// 安全清理远程密钥文件
function cleanupRemoteKeyFile(sshClient, keyFile) {
if (!keyFile || !sshClient) return
logger.info(`清理远程密钥文件: ${ keyFile }`)
// 先删除文件,再验证删除
// const cleanupCmd = `rm -f "${ keyFile }" && if [ -f "${ keyFile }" ]; then echo "CLEANUP_FAILED"; else echo "CLEANUP_SUCCESS"; fi`
const cleanupCmd = 'cd /tmp && rm -f easynode_key_* && if ls easynode_key_* 2>/dev/null; then echo "CLEANUP_FAILED"; else echo "CLEANUP_SUCCESS"; fi'
sshClient.exec(cleanupCmd, (err, stream) => {
if (err) {
logger.error('清理密钥文件时SSH错误:', err)
return
}
let output = ''
stream.on('data', (data) => {
output += data.toString()
})
stream.on('close', () => {
if (output.includes('CLEANUP_SUCCESS')) {
logger.info(`密钥文件清理成功: ${ keyFile }`)
} else if (output.includes('CLEANUP_FAILED')) {
logger.error(`密钥文件清理失败: ${ keyFile }`)
// 强制清理尝试 - 覆盖后删除
const forceCleanup = `echo "" > "${ keyFile }" && rm -f "${ keyFile }"`
sshClient.exec(forceCleanup, () => {
logger.info(`强制清理密钥文件: ${ keyFile }`)
})
} else {
logger.warn(`密钥文件清理状态未知: ${ keyFile }`)
}
})
stream.stderr.on('data', (data) => {
logger.warn('清理密钥文件stderr:', data.toString())
})
})
}
// 定时进度广播管理
const progressBroadcastTimers = new Map() // socket -> timer
// 启动进度广播
function startProgressBroadcast(socket) {
// 如果该socket已经有定时器先清除
if (progressBroadcastTimers.has(socket)) {
clearInterval(progressBroadcastTimers.get(socket))
}
// 启动新的定时器,每秒检查并推送运行中任务的进度
const timer = setInterval(async () => {
try {
// 检查是否还有运行中的任务
const runningTasks = Array.from(activeTasks.values()).filter(task => task.status === 'running')
if (runningTasks.length === 0) {
// 没有运行中的任务,停止广播
logger.info('没有运行中的任务,停止进度广播')
stopProgressBroadcast(socket)
return
}
// 推送所有运行中任务的最新进度
for (const [taskId, activeTask] of activeTasks.entries()) {
if (activeTask.status === 'running' && activeTask.progressTracker) {
const progressData = {
taskId,
overallProgress: activeTask.progressTracker.overallProgress || 0,
completedFiles: activeTask.progressTracker.completedFiles || 0,
totalFiles: activeTask.progressTracker.totalFiles || 1,
currentFile: activeTask.progressTracker.currentFile,
isVerifying: activeTask.progressTracker.isVerifying || false,
speed: activeTask.speed || 0
}
// 检查socket是否仍然连接
if (socket.connected) {
socket.emit('task_progress', progressData)
} else {
// socket已断开停止广播
stopProgressBroadcast(socket)
return
}
}
}
} catch (error) {
logger.error('进度广播出错:', error)
stopProgressBroadcast(socket)
}
}, 1500)
progressBroadcastTimers.set(socket, timer)
logger.info('已启动进度广播定时器')
}
// 停止进度广播
function stopProgressBroadcast(socket) {
const timer = progressBroadcastTimers.get(socket)
if (timer) {
clearInterval(timer)
progressBroadcastTimers.delete(socket)
logger.info('已停止进度广播定时器')
}
}