mirror of
https://github.com/chaos-zhu/easynode.git
synced 2026-06-06 19:40:08 +08:00
1769 lines
57 KiB
JavaScript
1769 lines
57 KiB
JavaScript
const rawPath = require('path')
|
||
const fs = require('fs-extra')
|
||
const SFTPClient = require('ssh2-sftp-client')
|
||
const iconv = require('iconv-lite')
|
||
const chardet = require('chardet')
|
||
const { v4: uuidv4 } = require('uuid')
|
||
const { sftpCacheDir } = require('../config')
|
||
const { createSecureWs } = require('../utils/ws-tool')
|
||
const { HostListDB, FavoriteSftpDB } = require('../utils/db-class')
|
||
const { getConnectionOptions, handleProxyAndJumpHostConnection } = require('./terminal')
|
||
const hostListDB = new HostListDB().getInstance()
|
||
const favoriteSftpDB = new FavoriteSftpDB().getInstance()
|
||
const { Client: SSHClient } = require('ssh2')
|
||
|
||
/**
|
||
* 将 Buffer 解码为字符串
|
||
* @param {Buffer} buffer - 要解码的 Buffer
|
||
* @param {string} encoding - 编码格式 (utf8, gbk, gb2312, gb18030 等)
|
||
* @returns {string} 解码后的字符串
|
||
*/
|
||
function decodeBuffer(buffer, encoding = 'utf8') {
|
||
if (encoding === 'utf8') {
|
||
return buffer.toString('utf8')
|
||
}
|
||
// 使用 iconv-lite 解码其他编码
|
||
return iconv.decode(buffer, encoding)
|
||
}
|
||
|
||
/**
|
||
* 将字符串编码为 Buffer
|
||
* @param {string} content - 要编码的字符串
|
||
* @param {string} encoding - 编码格式 (utf8, gbk, gb2312, gb18030 等)
|
||
* @returns {Buffer} 编码后的 Buffer
|
||
*/
|
||
function encodeString(content, encoding = 'utf8') {
|
||
if (encoding === 'utf8') {
|
||
return Buffer.from(content, 'utf8')
|
||
}
|
||
// 使用 iconv-lite 编码其他编码
|
||
return iconv.encode(content, encoding)
|
||
}
|
||
|
||
/**
|
||
* 智能检测文件编码
|
||
* @param {Buffer} buffer - 文件内容 Buffer
|
||
* @returns {Object} { encoding: 'utf8', confidence: 95 }
|
||
*/
|
||
function detectEncoding(buffer) {
|
||
try {
|
||
// 对于小文件(< 50 字节),直接返回 utf8
|
||
if (buffer.length < 50) {
|
||
return { encoding: 'utf8', confidence: 50, reason: '文件过小,默认 UTF-8' }
|
||
}
|
||
|
||
// 1. 检查 BOM(Byte Order Mark)
|
||
if (buffer[0] === 0xEF && buffer[1] === 0xBB && buffer[2] === 0xBF) {
|
||
return { encoding: 'utf8', confidence: 100, reason: '检测到 UTF-8 BOM' }
|
||
}
|
||
|
||
// 2. 使用 chardet 检测
|
||
const detected = chardet.detect(buffer)
|
||
if (!detected) {
|
||
return { encoding: 'utf8', confidence: 30, reason: '无法检测,默认 UTF-8' }
|
||
}
|
||
|
||
// 3. 分析置信度
|
||
const analysisResults = chardet.analyse(buffer)
|
||
const topResult = analysisResults[0] || { name: 'UTF-8', confidence: 30 }
|
||
|
||
// 规范化编码名称
|
||
let normalizedEncoding = detected.toLowerCase()
|
||
if (normalizedEncoding === 'gb2312' || normalizedEncoding === 'gb18030') {
|
||
// GB2312 和 GB18030 统一用对应名称
|
||
normalizedEncoding = detected.toLowerCase()
|
||
} else if (normalizedEncoding.includes('gb')) {
|
||
// 其他 GB 相关编码统一为 gbk
|
||
normalizedEncoding = 'gbk'
|
||
} else if (normalizedEncoding.includes('utf') || normalizedEncoding.includes('utf8')) {
|
||
normalizedEncoding = 'utf8'
|
||
}
|
||
|
||
const confidence = Math.round(topResult.confidence || 50)
|
||
|
||
logger.info(`编码检测结果: ${ normalizedEncoding } (置信度: ${ confidence }%)`)
|
||
|
||
return {
|
||
encoding: normalizedEncoding,
|
||
confidence,
|
||
reason: `chardet 检测为 ${ detected }`
|
||
}
|
||
} catch (error) {
|
||
logger.error('编码检测失败:', error.message)
|
||
return { encoding: 'utf8', confidence: 30, reason: '检测异常,默认 UTF-8' }
|
||
}
|
||
}
|
||
|
||
/**
|
||
* https://github.com/theophilusx/ssh2-sftp-client#readme
|
||
* The objects in the array returned by list() have the following properties:
|
||
* {
|
||
* type: '-', // file type(-, d, l)
|
||
* name: 'example.txt', // file name
|
||
* size: 43, // file size
|
||
* modifyTime: 1675645360000, // file timestamp of modified time
|
||
* accessTime: 1675645360000, // file timestamp of access time
|
||
* rights: {
|
||
* user: 'rw',
|
||
* group: 'r',
|
||
* other: 'r',
|
||
* },
|
||
* owner: 1000, // user ID
|
||
* group: 1000, // group ID
|
||
* longname: '-rw-r--r-- 1 fred fred 43 Feb 6 12:02 exaple.txt', // like ls -l line
|
||
* }
|
||
*/
|
||
|
||
// 格式化文件列表数据,添加权限字符串和用户名/组名
|
||
function formatFileList(fileList) {
|
||
return fileList.map(file => {
|
||
const formatted = { ...file }
|
||
|
||
// 从 rights 对象生成权限字符串
|
||
if (file.rights) {
|
||
const { user, group, other } = file.rights
|
||
formatted.permissions = `${ user }${ group }${ other }`
|
||
}
|
||
|
||
// 从 longname 中提取ownerName和groupName
|
||
if (file.longname) {
|
||
const parts = file.longname.trim().split(/\s+/)
|
||
if (parts.length >= 4) {
|
||
// parts[2] 是所有者用户名, parts[3] 是组名
|
||
formatted.ownerName = parts[2]
|
||
formatted.groupName = parts[3]
|
||
}
|
||
}
|
||
|
||
return formatted
|
||
})
|
||
}
|
||
|
||
const listenAction = (sftpClient, socket) => {
|
||
// 下载任务管理
|
||
const downloadTasks = new Map() // taskId -> { abortController, startTime, totalSize, downloadedSize }
|
||
|
||
// 上传任务管理
|
||
const uploadTasks = new Map() // taskId -> { abortController, startTime, totalSize, uploadedSize, tempFilePath, writeStream, receivedChunks }
|
||
|
||
socket.on('open_dir', async (path) => {
|
||
try {
|
||
const dirLs = await sftpClient.list(path)
|
||
console.log('dirLs:', dirLs.length)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, path)
|
||
} catch (err) {
|
||
socket.emit('not_exists_dir', err.message)
|
||
}
|
||
})
|
||
|
||
// rename file / directory
|
||
socket.on('rename', async ({ dirPath, oldName, newName }) => {
|
||
try {
|
||
if (!oldName || !newName || oldName === newName) throw new Error('无效文件名')
|
||
const src = rawPath.posix.join(dirPath, oldName)
|
||
const dest = rawPath.posix.join(dirPath, newName)
|
||
await sftpClient.rename(src, dest)
|
||
socket.emit('rename_success', { oldName, newName })
|
||
// 返回最新目录列表
|
||
const dirLs = await sftpClient.list(dirPath)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, dirPath)
|
||
} catch (err) {
|
||
socket.emit('rename_fail', err.message)
|
||
}
|
||
})
|
||
|
||
// delete file/dir
|
||
socket.on('delete', async ({ dirPath, name, type }) => {
|
||
try {
|
||
const target = rawPath.posix.join(dirPath, name)
|
||
if (type === 'd') {
|
||
await sftpClient.rmdir(target, true)
|
||
} else {
|
||
await sftpClient.delete(target)
|
||
}
|
||
socket.emit('delete_success')
|
||
const dirLs = await sftpClient.list(dirPath)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, dirPath)
|
||
} catch (err) {
|
||
socket.emit('delete_fail', err.message)
|
||
}
|
||
})
|
||
|
||
socket.on('delete_batch', async ({ dirPath, targets }) => {
|
||
try {
|
||
for (const { name, type } of targets) {
|
||
const target = rawPath.posix.join(dirPath, name)
|
||
if (type === 'd') {
|
||
await sftpClient.rmdir(target, true)
|
||
} else {
|
||
await sftpClient.delete(target)
|
||
}
|
||
}
|
||
socket.emit('delete_success')
|
||
const dirLs = await sftpClient.list(dirPath)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, dirPath)
|
||
} catch (err) {
|
||
socket.emit('delete_fail', err.message)
|
||
}
|
||
})
|
||
|
||
// move file/dir
|
||
socket.on('move', async ({ dirPath, destDir, name }) => {
|
||
try {
|
||
const src = rawPath.posix.join(dirPath, name)
|
||
await ensureDir(destDir)
|
||
const dest = rawPath.posix.join(destDir, name)
|
||
await sftpClient.rename(src, dest)
|
||
socket.emit('move_success')
|
||
const dirLs = await sftpClient.list(dirPath)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, dirPath)
|
||
} catch (err) {
|
||
socket.emit('move_fail', err.message)
|
||
}
|
||
})
|
||
|
||
socket.on('move_batch', async ({ dirPath, destDir, targets }) => {
|
||
try {
|
||
await ensureDir(destDir)
|
||
for (const { name } of targets) {
|
||
const src = rawPath.posix.join(dirPath, name)
|
||
const dest = rawPath.posix.join(destDir, name)
|
||
await sftpClient.rename(src, dest)
|
||
}
|
||
socket.emit('move_success')
|
||
const dirLs = await sftpClient.list(dirPath)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, dirPath)
|
||
} catch (err) {
|
||
socket.emit('move_fail', err.message)
|
||
}
|
||
})
|
||
|
||
async function ensureDir(dir) {
|
||
const exists = await sftpClient.exists(dir)
|
||
if (!exists) {
|
||
await sftpClient.mkdir(dir, true)
|
||
}
|
||
}
|
||
|
||
const execCommand = (cmd) => new Promise((res, rej) => {
|
||
// 检查SSH客户端连接状态
|
||
if (!sftpClient || !sftpClient.client) {
|
||
return rej(new Error('SSH客户端未初始化'))
|
||
}
|
||
|
||
// 检查底层socket连接状态
|
||
if (sftpClient.client._sock && sftpClient.client._sock.destroyed) {
|
||
return rej(new Error('SSH连接已断开'))
|
||
}
|
||
|
||
// 检查SSH客户端状态
|
||
if (sftpClient.client._readyTimeout === null && !sftpClient.client._authenticated) {
|
||
return rej(new Error('SSH客户端未认证'))
|
||
}
|
||
|
||
try {
|
||
sftpClient.client.exec(cmd, (err, stream) => {
|
||
if (err) {
|
||
logger.error('执行命令失败:', cmd, err.message)
|
||
// 检查是否是连接相关错误
|
||
if (err.message.includes('Not connected') || err.message.includes('Connection lost')) {
|
||
return rej(new Error('SSH连接已断开,无法执行命令'))
|
||
}
|
||
return rej(err)
|
||
}
|
||
|
||
if (!stream) {
|
||
return rej(new Error('无法创建命令执行流'))
|
||
}
|
||
|
||
let errMsg = ''
|
||
let isResolved = false
|
||
let timeout = null
|
||
|
||
// 清理函数
|
||
const cleanup = () => {
|
||
if (timeout) {
|
||
clearTimeout(timeout)
|
||
timeout = null
|
||
}
|
||
if (stream) {
|
||
try {
|
||
stream.removeAllListeners()
|
||
if (!stream.destroyed) {
|
||
stream.destroy()
|
||
}
|
||
} catch (cleanupErr) {
|
||
logger.warn('清理stream失败:', cleanupErr.message)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 统一的结果处理函数
|
||
const resolveOnce = (result, isError = false) => {
|
||
if (isResolved) return
|
||
isResolved = true
|
||
cleanup()
|
||
|
||
if (isError) {
|
||
rej(result)
|
||
} else {
|
||
res(result)
|
||
}
|
||
}
|
||
|
||
// 设置超时保护(60秒)
|
||
timeout = setTimeout(() => {
|
||
logger.warn('命令执行超时:', cmd)
|
||
resolveOnce(new Error(`命令执行超时: ${ cmd }`), true)
|
||
}, 60000)
|
||
|
||
// 处理错误输出
|
||
stream.stderr.on('data', d => {
|
||
errMsg += d.toString()
|
||
})
|
||
|
||
// 处理错误事件
|
||
stream.on('error', (streamErr) => {
|
||
logger.error('Stream错误:', cmd, streamErr.message)
|
||
if (streamErr.message.includes('Not connected')) {
|
||
resolveOnce(new Error('SSH连接在命令执行过程中断开'), true)
|
||
} else {
|
||
resolveOnce(streamErr, true)
|
||
}
|
||
})
|
||
|
||
// 处理连接断开
|
||
stream.on('close', (code) => {
|
||
if (errMsg) {
|
||
logger.error('命令执行错误:', cmd, errMsg)
|
||
resolveOnce(new Error(`命令执行失败: ${ code }: ${ errMsg }`), true)
|
||
} else {
|
||
logger.info('命令执行完成:', cmd)
|
||
resolveOnce('success')
|
||
}
|
||
})
|
||
|
||
// 处理命令退出
|
||
stream.on('exit', (code) => {
|
||
if (code === 0) {
|
||
logger.info('命令执行成功:', cmd)
|
||
resolveOnce() // 成功
|
||
} else {
|
||
const errorMessage = errMsg || `命令退出码: ${ code }`
|
||
logger.error('命令执行失败:', cmd, errorMessage)
|
||
resolveOnce(new Error(errorMessage), true)
|
||
}
|
||
})
|
||
|
||
// 消耗 stdout 防止阻塞
|
||
stream.on('data', () => {})
|
||
})
|
||
} catch (execErr) {
|
||
logger.error('execCommand异常:', cmd, execErr.message)
|
||
rej(execErr)
|
||
}
|
||
})
|
||
|
||
// -------- copy (download then upload) --------
|
||
socket.on('copy_server_batch', async ({ dirPath, destDir, targets }) => {
|
||
try {
|
||
await ensureDir(destDir)
|
||
for (const { name } of targets) {
|
||
const src = rawPath.posix.join(dirPath, name)
|
||
// cp -r preserves dir/file, will overwrite if exists
|
||
const cmd = `cp -r -- "${ src }" "${ destDir }/"`
|
||
await execCommand(cmd)
|
||
}
|
||
|
||
socket.emit('copy_success')
|
||
const dirLs = await sftpClient.list(dirPath)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, dirPath)
|
||
} catch (err) {
|
||
logger.error('copy error:', err.message)
|
||
socket.emit('copy_fail', err.message)
|
||
}
|
||
})
|
||
|
||
// -------- create file/folder --------
|
||
socket.on('create_item', async ({ dirPath, name, type }) => {
|
||
try {
|
||
if (!name || !name.trim()) {
|
||
throw new Error('文件名不能为空')
|
||
}
|
||
|
||
const trimmedName = name.trim()
|
||
|
||
// 验证文件名(不能包含路径分隔符和其他无效字符)
|
||
const invalidChars = ['/', '\0']
|
||
if (invalidChars.some(char => trimmedName.includes(char))) {
|
||
throw new Error('文件名包含无效字符')
|
||
}
|
||
|
||
const targetPath = rawPath.posix.join(dirPath, trimmedName)
|
||
|
||
// 检查文件/文件夹是否已存在
|
||
const exists = await sftpClient.exists(targetPath)
|
||
if (exists) {
|
||
throw new Error(`${ type === 'folder' ? '文件夹' : '文件' } "${ trimmedName }" 已存在`)
|
||
}
|
||
|
||
if (type === 'folder') {
|
||
logger.info(`创建文件夹: ${ targetPath }`)
|
||
await sftpClient.mkdir(targetPath)
|
||
socket.emit('create_success', `文件夹 "${ trimmedName }" 创建成功`)
|
||
} else if (type === 'file') {
|
||
logger.info(`创建文件: ${ targetPath }`)
|
||
// 创建空文件,使用 touch 命令
|
||
const cmd = `touch "${ targetPath }"`
|
||
await execCommand(cmd)
|
||
socket.emit('create_success', `文件 "${ trimmedName }" 创建成功`)
|
||
} else {
|
||
throw new Error('无效的创建类型')
|
||
}
|
||
|
||
// 返回最新目录列表
|
||
const dirLs = await sftpClient.list(dirPath)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, dirPath)
|
||
} catch (err) {
|
||
logger.error('create error:', err.message)
|
||
socket.emit('create_fail', err.message)
|
||
}
|
||
})
|
||
|
||
// -------- compress files --------
|
||
socket.on('compress_files', async ({ dirPath, targets, archiveName }) => {
|
||
try {
|
||
if (!targets || targets.length === 0) {
|
||
throw new Error('未选择要压缩的文件')
|
||
}
|
||
|
||
if (!archiveName || !archiveName.trim()) {
|
||
throw new Error('压缩文件名不能为空')
|
||
}
|
||
|
||
const trimmedArchiveName = archiveName.trim()
|
||
|
||
// 验证压缩文件名
|
||
const invalidChars = ['/', '\0']
|
||
if (invalidChars.some(char => trimmedArchiveName.includes(char))) {
|
||
throw new Error('压缩文件名包含无效字符')
|
||
}
|
||
|
||
const archivePath = rawPath.posix.join(dirPath, trimmedArchiveName)
|
||
|
||
// 检查压缩文件是否已存在
|
||
const exists = await sftpClient.exists(archivePath)
|
||
if (exists) {
|
||
throw new Error(`文件 "${ trimmedArchiveName }" 已存在`)
|
||
}
|
||
|
||
// 构建要压缩的文件列表
|
||
const fileNames = targets.map(t => `"${ t.name }"`).join(' ')
|
||
|
||
// 使用 tar 命令压缩
|
||
const tarCmd = `cd "${ dirPath }" && tar -czf "${ trimmedArchiveName }" ${ fileNames }`
|
||
|
||
logger.info(`开始压缩文件: ${ targets.map(t => t.name).join(', ') } -> ${ trimmedArchiveName }`)
|
||
await execCommand(tarCmd)
|
||
logger.info(`压缩完成: ${ trimmedArchiveName }`)
|
||
|
||
socket.emit('compress_success', `压缩文件 "${ trimmedArchiveName }" 创建成功`)
|
||
|
||
// 返回最新目录列表
|
||
const dirLs = await sftpClient.list(dirPath)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, dirPath)
|
||
} catch (err) {
|
||
logger.error('compress error:', err.message)
|
||
socket.emit('compress_fail', err.message)
|
||
}
|
||
})
|
||
|
||
// -------- decompress file --------
|
||
socket.on('decompress_file', async ({ dirPath, fileName, mode, folderName }) => {
|
||
try {
|
||
if (!fileName || !fileName.trim()) {
|
||
throw new Error('文件名不能为空')
|
||
}
|
||
|
||
const trimmedFileName = fileName.trim()
|
||
const archivePath = rawPath.posix.join(dirPath, trimmedFileName)
|
||
|
||
// 检查压缩文件是否存在
|
||
const exists = await sftpClient.exists(archivePath)
|
||
if (!exists) {
|
||
throw new Error(`文件 "${ trimmedFileName }" 不存在`)
|
||
}
|
||
|
||
// 检查是否为支持的压缩格式
|
||
const isValidArchive = /(\.tar\.gz|\.tgz|\.tar|\.zip)$/i.test(trimmedFileName)
|
||
if (!isValidArchive) {
|
||
throw new Error('不支持的压缩格式,仅支持 .tar.gz、.tgz、.tar、.zip 格式')
|
||
}
|
||
|
||
// 确定解压目标目录
|
||
let targetDir = dirPath
|
||
let targetDirName = '当前文件夹'
|
||
|
||
if (mode === 'folder' && folderName) {
|
||
// 解压到同名文件夹
|
||
targetDir = rawPath.posix.join(dirPath, folderName)
|
||
targetDirName = `文件夹 "${ folderName }"`
|
||
|
||
// 检查目标文件夹是否已存在
|
||
const targetExists = await sftpClient.exists(targetDir)
|
||
if (targetExists) {
|
||
throw new Error(`目标文件夹 "${ folderName }" 已存在`)
|
||
}
|
||
|
||
// 创建目标文件夹
|
||
await sftpClient.mkdir(targetDir)
|
||
logger.info(`创建目标文件夹: ${ targetDir }`)
|
||
}
|
||
|
||
// 根据文件扩展名选择解压命令
|
||
let decompressCmd = ''
|
||
if (/\.tar\.gz$|\.tgz$/i.test(trimmedFileName)) {
|
||
// tar.gz 或 tgz 格式
|
||
if (mode === 'folder') {
|
||
decompressCmd = `cd "${ dirPath }" && tar -xzf "${ trimmedFileName }" -C "${ folderName }"`
|
||
} else {
|
||
decompressCmd = `cd "${ dirPath }" && tar -xzf "${ trimmedFileName }"`
|
||
}
|
||
} else if (/\.tar$/i.test(trimmedFileName)) {
|
||
// tar 格式
|
||
if (mode === 'folder') {
|
||
decompressCmd = `cd "${ dirPath }" && tar -xf "${ trimmedFileName }" -C "${ folderName }"`
|
||
} else {
|
||
decompressCmd = `cd "${ dirPath }" && tar -xf "${ trimmedFileName }"`
|
||
}
|
||
} else if (/\.zip$/i.test(trimmedFileName)) {
|
||
// zip 格式
|
||
if (mode === 'folder') {
|
||
decompressCmd = `cd "${ dirPath }" && unzip -o "${ trimmedFileName }" -d "${ folderName }"`
|
||
} else {
|
||
decompressCmd = `cd "${ dirPath }" && unzip -o "${ trimmedFileName }"`
|
||
}
|
||
}
|
||
|
||
logger.info(`开始解压文件: ${ trimmedFileName } -> ${ targetDirName }`)
|
||
await execCommand(decompressCmd)
|
||
logger.info(`解压完成: ${ trimmedFileName } -> ${ targetDirName }`)
|
||
|
||
socket.emit('decompress_success', `文件 "${ trimmedFileName }" 解压到${ targetDirName }成功`)
|
||
|
||
// 返回最新目录列表
|
||
const dirLs = await sftpClient.list(dirPath)
|
||
const formattedDirLs = formatFileList(dirLs)
|
||
socket.emit('dir_ls', formattedDirLs, dirPath)
|
||
} catch (err) {
|
||
logger.error('decompress error:', err.message)
|
||
socket.emit('decompress_fail', err.message)
|
||
}
|
||
})
|
||
|
||
// 下载功能
|
||
socket.on('download_request', async ({ dirPath, targets }) => {
|
||
let remoteTarPath = null // 跟踪远程临时文件路径
|
||
let taskId = null // 声明在外层以便错误处理时访问
|
||
try {
|
||
if (!targets || targets.length === 0) {
|
||
throw new Error('未选择要下载的文件')
|
||
}
|
||
|
||
taskId = Date.now() + '-' + Math.random().toString(36).slice(2)
|
||
const taskDir = rawPath.join(sftpCacheDir, taskId)
|
||
await fs.ensureDir(taskDir)
|
||
|
||
const abortController = new AbortController()
|
||
downloadTasks.set(taskId, {
|
||
abortController,
|
||
startTime: Date.now(),
|
||
totalSize: 0,
|
||
downloadedSize: 0,
|
||
remoteTarPath: null // 跟踪远程临时文件
|
||
})
|
||
|
||
if (targets.length === 1) {
|
||
// 单文件/文件夹逻辑(保持原有逻辑)
|
||
const target = targets[0]
|
||
socket.emit('download_started', { taskId, fileName: target.name })
|
||
const srcPath = rawPath.posix.join(dirPath, target.name)
|
||
|
||
if (target.type === 'd') {
|
||
// 文件夹:先在远端打包
|
||
const tarFileName = `${ target.name }.tar.gz`
|
||
remoteTarPath = `/tmp/${ taskId }.tar.gz`
|
||
const localTarPath = rawPath.join(taskDir, tarFileName)
|
||
|
||
// 保存远程文件路径到任务中
|
||
downloadTasks.get(taskId).remoteTarPath = remoteTarPath
|
||
|
||
// 检查是否被取消
|
||
if (abortController.signal.aborted) {
|
||
await cleanupRemoteTarFile(remoteTarPath)
|
||
throw new Error('下载已取消')
|
||
}
|
||
|
||
// 在远端打包
|
||
logger.info(`开始打包文件夹: ${ srcPath }`)
|
||
const tarCmd = `cd "${ dirPath }" && tar -czf "${ remoteTarPath }" "${ target.name }"`
|
||
try {
|
||
await execCommand(tarCmd)
|
||
logger.info(`打包文件夹: ${ srcPath } 成功`)
|
||
} catch (tarErr) {
|
||
logger.error('打包文件夹失败:', tarErr.message)
|
||
throw new Error(`打包文件夹失败: ${ tarErr.message }`)
|
||
}
|
||
|
||
// 获取打包文件大小
|
||
const statResult = await sftpClient.stat(remoteTarPath)
|
||
downloadTasks.get(taskId).totalSize = statResult.size
|
||
|
||
// 下载打包文件
|
||
await downloadFileWithProgress(sftpClient, remoteTarPath, localTarPath, taskId, downloadTasks, socket, abortController)
|
||
|
||
// 下载完成后立即清理远端临时文件
|
||
await cleanupRemoteTarFile(remoteTarPath)
|
||
remoteTarPath = null // 已清理,置空
|
||
|
||
socket.emit('download_ready', { taskId, fileName: tarFileName })
|
||
} else {
|
||
// 单文件:直接下载
|
||
const localFilePath = rawPath.join(taskDir, target.name)
|
||
|
||
// 获取文件大小
|
||
const statResult = await sftpClient.stat(srcPath)
|
||
downloadTasks.get(taskId).totalSize = statResult.size
|
||
|
||
await downloadFileWithProgress(sftpClient, srcPath, localFilePath, taskId, downloadTasks, socket, abortController)
|
||
|
||
socket.emit('download_ready', { taskId, fileName: target.name })
|
||
}
|
||
} else {
|
||
// 多文件逻辑:打包所有选中的文件/文件夹
|
||
const archiveName = `selected-files-${ Date.now() }.tar.gz`
|
||
remoteTarPath = `/tmp/${ taskId }.tar.gz`
|
||
const localTarPath = rawPath.join(taskDir, archiveName)
|
||
|
||
// 保存远程文件路径到任务中
|
||
downloadTasks.get(taskId).remoteTarPath = remoteTarPath
|
||
|
||
socket.emit('download_started', { taskId, fileName: archiveName })
|
||
|
||
// 检查是否被取消
|
||
if (abortController.signal.aborted) {
|
||
await cleanupRemoteTarFile(remoteTarPath)
|
||
throw new Error('下载已取消')
|
||
}
|
||
|
||
// 构建tar命令,打包所有选中的文件/文件夹
|
||
const fileNames = targets.map(t => `"${ t.name }"`).join(' ')
|
||
const tarCmd = `cd "${ dirPath }" && tar -czf "${ remoteTarPath }" ${ fileNames }`
|
||
|
||
logger.info(`开始打包多个文件: ${ targets.map(t => t.name).join(', ') }`)
|
||
try {
|
||
await execCommand(tarCmd)
|
||
logger.info('打包多个文件成功')
|
||
} catch (tarErr) {
|
||
logger.error('打包失败:', tarErr.message)
|
||
throw new Error(`打包失败: ${ tarErr.message }`)
|
||
}
|
||
|
||
// 获取打包文件大小
|
||
const statResult = await sftpClient.stat(remoteTarPath)
|
||
downloadTasks.get(taskId).totalSize = statResult.size
|
||
|
||
// 下载打包文件
|
||
await downloadFileWithProgress(sftpClient, remoteTarPath, localTarPath, taskId, downloadTasks, socket, abortController)
|
||
|
||
// 下载完成后立即清理远端临时文件
|
||
await cleanupRemoteTarFile(remoteTarPath)
|
||
remoteTarPath = null // 已清理,置空
|
||
|
||
socket.emit('download_ready', { taskId, fileName: archiveName })
|
||
}
|
||
|
||
downloadTasks.delete(taskId)
|
||
} catch (err) {
|
||
logger.error('下载失败:', err.message)
|
||
socket.emit('download_fail', err.message)
|
||
|
||
// 清理远程临时文件(如果还存在)
|
||
if (remoteTarPath) {
|
||
await cleanupRemoteTarFile(remoteTarPath)
|
||
}
|
||
|
||
// 清理任务(如果taskId存在)
|
||
if (taskId) {
|
||
downloadTasks.delete(taskId)
|
||
}
|
||
}
|
||
})
|
||
|
||
// 清理远程tar文件的辅助函数
|
||
async function cleanupRemoteTarFile(remoteTarPath) {
|
||
if (!remoteTarPath) return
|
||
try {
|
||
await execCommand(`rm -f "${ remoteTarPath }"`)
|
||
logger.info(`已清理远程临时文件: ${ remoteTarPath }`)
|
||
} catch (cleanupErr) {
|
||
logger.warn('清理远程临时文件失败:', remoteTarPath, cleanupErr.message)
|
||
}
|
||
}
|
||
|
||
// 取消下载
|
||
socket.on('download_cancel', ({ taskId }) => {
|
||
const task = downloadTasks.get(taskId)
|
||
if (task) {
|
||
task.abortController.abort()
|
||
downloadTasks.delete(taskId)
|
||
socket.emit('download_cancelled', { taskId })
|
||
|
||
// 如果没有其他下载任务,清理缓存目录
|
||
if (downloadTasks.size === 0) {
|
||
cleanupCacheDir()
|
||
}
|
||
}
|
||
})
|
||
|
||
// 清理缓存目录
|
||
const cleanupCacheDir = () => {
|
||
try {
|
||
fs.emptyDirSync(sftpCacheDir)
|
||
logger.info('已清理 sftpCacheDir:', sftpCacheDir)
|
||
} catch (err) {
|
||
logger.warn('清理缓存目录失败:', err.message)
|
||
}
|
||
}
|
||
|
||
// -------- 收藏相关功能 --------
|
||
|
||
// 获取收藏列表
|
||
socket.on('get_favorites', async ({ hostId }) => {
|
||
try {
|
||
const favorites = await favoriteSftpDB.findAsync({ hostId }, { sort: { createTime: -1 } })
|
||
socket.emit('favorites_list', favorites)
|
||
} catch (err) {
|
||
logger.error('获取收藏列表失败:', err.message)
|
||
socket.emit('favorite_error', '获取收藏列表失败')
|
||
}
|
||
})
|
||
|
||
// 添加收藏
|
||
socket.on('add_favorite', async ({ hostId, path, name, type }) => {
|
||
try {
|
||
if (!hostId || !path || !name || !type) {
|
||
throw new Error('缺少必要参数')
|
||
}
|
||
|
||
// 检查是否已存在相同路径的收藏
|
||
const existing = await favoriteSftpDB.findOneAsync({ hostId, path })
|
||
if (existing) {
|
||
socket.emit('favorite_error', '该路径已经收藏过了')
|
||
return
|
||
}
|
||
|
||
// 添加新收藏
|
||
const newFavorite = {
|
||
hostId,
|
||
path,
|
||
name,
|
||
type,
|
||
createTime: Date.now()
|
||
}
|
||
|
||
await favoriteSftpDB.insertAsync(newFavorite)
|
||
socket.emit('favorite_added', `收藏 "${ name }" 成功`)
|
||
|
||
logger.info(`用户收藏了路径: ${ path }`)
|
||
} catch (err) {
|
||
logger.error('添加收藏失败:', err.message)
|
||
socket.emit('favorite_error', err.message)
|
||
}
|
||
})
|
||
|
||
// 删除收藏
|
||
socket.on('remove_favorite', async ({ hostId, path }) => {
|
||
try {
|
||
if (!hostId || !path) {
|
||
throw new Error('缺少必要参数')
|
||
}
|
||
|
||
const result = await favoriteSftpDB.removeAsync({ hostId, path }, {})
|
||
|
||
if (result === 0) {
|
||
socket.emit('favorite_error', '收藏不存在')
|
||
return
|
||
}
|
||
|
||
socket.emit('favorite_removed', '取消收藏成功')
|
||
logger.info(`用户取消收藏路径: ${ path }`)
|
||
} catch (err) {
|
||
logger.error('删除收藏失败:', err.message)
|
||
socket.emit('favorite_error', err.message)
|
||
}
|
||
})
|
||
|
||
// -------- 软链接解析功能 --------
|
||
|
||
// 解析软链接的真实路径
|
||
socket.on('resolve_symlink', async ({ symlinkPath }) => {
|
||
try {
|
||
// 获取软链接的真实路径
|
||
logger.info(`解析软链接: ${ symlinkPath }`)
|
||
const realPath = await sftpClient.realPath(symlinkPath)
|
||
logger.info(`软链接真实路径: ${ realPath }`)
|
||
|
||
// 检查真实路径是否存在
|
||
const stats = await sftpClient.stat(realPath)
|
||
const isDirectory = stats.isDirectory
|
||
|
||
socket.emit('symlink_resolved', {
|
||
realPath,
|
||
isDirectory,
|
||
symlinkPath
|
||
})
|
||
|
||
logger.info(`软链接解析成功: ${ symlinkPath } -> ${ realPath } (${ isDirectory ? '目录' : '文件' })`)
|
||
|
||
} catch (err) {
|
||
logger.error('软链接解析失败:', err.message)
|
||
socket.emit('symlink_resolve_error', {
|
||
error: err.message,
|
||
symlinkPath
|
||
})
|
||
}
|
||
})
|
||
|
||
// -------- 文本文件编辑功能 --------
|
||
|
||
// 智能检测编码并读取文件
|
||
socket.on('detect_and_read_file', async ({ filePath, fileSize }) => {
|
||
try {
|
||
// 检查文件是否存在
|
||
const exists = await sftpClient.exists(filePath)
|
||
if (!exists) {
|
||
socket.emit('file_read_error', {
|
||
error: '文件不存在',
|
||
filePath
|
||
})
|
||
return
|
||
}
|
||
// 检查文件大小限制 (2MB)
|
||
const maxFileSize = 1024 * 1024 * 2
|
||
if (fileSize > maxFileSize) {
|
||
socket.emit('file_read_error', {
|
||
error: `文件过大(${ Math.round(fileSize / 1024 / 1024 * 100) / 100 }MB),仅支持编辑小于2MB的文件`,
|
||
filePath
|
||
})
|
||
return
|
||
}
|
||
|
||
logger.info(`开始智能检测文件编码: ${ filePath }`)
|
||
const buffer = await sftpClient.get(filePath)
|
||
|
||
const { encoding: detectedEncoding, confidence, reason } = detectEncoding(buffer)
|
||
logger.info(`检测结果: ${ detectedEncoding.toUpperCase() } (置信度: ${ confidence }%) - ${ reason }`)
|
||
|
||
// 根据检测到的编码解码文件内容
|
||
let content
|
||
try {
|
||
content = decodeBuffer(buffer, detectedEncoding)
|
||
} catch (decodeErr) {
|
||
logger.error('使用检测编码解码失败,尝试 UTF-8:', decodeErr.message)
|
||
// 解码失败,降级为 UTF-8
|
||
content = buffer.toString('utf8')
|
||
}
|
||
|
||
logger.info(`文件读取成功: ${ filePath },大小: ${ content.length } 字符,使用编码: ${ detectedEncoding.toUpperCase() }`)
|
||
|
||
socket.emit('file_content_with_encoding', {
|
||
content,
|
||
filePath,
|
||
detectedEncoding,
|
||
confidence
|
||
})
|
||
|
||
} catch (err) {
|
||
logger.error('智能读取文件失败:', err.message)
|
||
socket.emit('file_read_error', {
|
||
error: err.message,
|
||
filePath
|
||
})
|
||
}
|
||
})
|
||
|
||
// 读取文件内容(手动指定编码)
|
||
socket.on('read_file', async ({ filePath, fileSize, encoding = 'utf8' }) => {
|
||
try {
|
||
// 检查文件是否存在
|
||
const exists = await sftpClient.exists(filePath)
|
||
if (!exists) {
|
||
socket.emit('file_read_error', {
|
||
error: '文件不存在',
|
||
filePath
|
||
})
|
||
return
|
||
}
|
||
|
||
// 读取文件内容
|
||
logger.info(`开始读取文件: ${ filePath },使用编码: ${ encoding.toUpperCase() }`)
|
||
const buffer = await sftpClient.get(filePath)
|
||
|
||
// 根据编码解码文件内容
|
||
let content
|
||
try {
|
||
content = decodeBuffer(buffer, encoding)
|
||
} catch (decodeErr) {
|
||
logger.error(`文件解码失败(编码: ${ encoding }):`, decodeErr.message)
|
||
socket.emit('file_read_error', {
|
||
error: `文件解码失败,可能不是 ${ encoding.toUpperCase() } 编码`,
|
||
filePath
|
||
})
|
||
return
|
||
}
|
||
|
||
logger.info(`文件读取成功: ${ filePath },大小: ${ content.length } 字符`)
|
||
socket.emit('file_content', { content, filePath })
|
||
|
||
} catch (err) {
|
||
logger.error('读取文件失败:', err.message)
|
||
socket.emit('file_read_error', {
|
||
error: err.message,
|
||
filePath
|
||
})
|
||
}
|
||
})
|
||
|
||
// -------- 图片预览功能 --------
|
||
|
||
// 读取图片文件内容
|
||
socket.on('read_image', async ({ filePath, fileSize }) => {
|
||
try {
|
||
// 检查文件大小限制 (10MB)
|
||
const maxFileSize = 10 * 1024 * 1024
|
||
if (fileSize > maxFileSize) {
|
||
socket.emit('image_read_error', {
|
||
error: `图片过大(${ Math.round(fileSize / 1024 / 1024 * 100) / 100 }MB),仅支持预览小于10MB的图片`,
|
||
filePath
|
||
})
|
||
return
|
||
}
|
||
|
||
// 检查文件是否存在
|
||
const exists = await sftpClient.exists(filePath)
|
||
if (!exists) {
|
||
socket.emit('image_read_error', {
|
||
error: '图片文件不存在',
|
||
filePath
|
||
})
|
||
return
|
||
}
|
||
|
||
// 检查是否为图片文件
|
||
const imageExtensions = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'svg', 'ico', 'tiff', 'tif']
|
||
const ext = rawPath.extname(filePath).toLowerCase().slice(1)
|
||
if (!imageExtensions.includes(ext)) {
|
||
socket.emit('image_read_error', {
|
||
error: '不支持的图片格式',
|
||
filePath
|
||
})
|
||
return
|
||
}
|
||
|
||
// 生成缓存文件名(使用UUID避免猜测和冲突)
|
||
const fileName = rawPath.basename(filePath)
|
||
const cacheFileName = `${ uuidv4() }_${ fileName }`
|
||
const localImagePath = rawPath.join(sftpCacheDir, cacheFileName)
|
||
|
||
logger.info(`开始下载图片到缓存: ${ filePath } -> ${ localImagePath }`)
|
||
|
||
// 下载图片到本地缓存
|
||
await sftpClient.fastGet(filePath, localImagePath)
|
||
|
||
// 生成访问URL
|
||
const imageUrl = `/sftp-cache/${ cacheFileName }`
|
||
|
||
logger.info(`图片下载成功: ${ filePath },缓存路径: ${ localImagePath }`)
|
||
|
||
socket.emit('image_content', {
|
||
imageUrl,
|
||
filePath,
|
||
fileName,
|
||
fileSize
|
||
})
|
||
|
||
} catch (err) {
|
||
logger.error('图片预览失败:', err.message)
|
||
socket.emit('image_read_error', {
|
||
error: err.message,
|
||
filePath
|
||
})
|
||
}
|
||
})
|
||
|
||
// 保存文件内容
|
||
socket.on('save_file', async ({ filePath, content, encoding = 'utf8' }) => {
|
||
try {
|
||
// 根据编码将内容转换为 Buffer
|
||
let buffer
|
||
try {
|
||
buffer = encodeString(content, encoding)
|
||
} catch (encodeErr) {
|
||
logger.error(`文件编码失败(编码: ${ encoding }):`, encodeErr.message)
|
||
socket.emit('file_save_error', {
|
||
error: `文件编码失败: ${ encodeErr.message }`,
|
||
filePath
|
||
})
|
||
return
|
||
}
|
||
|
||
// 保存文件内容
|
||
await sftpClient.put(buffer, filePath)
|
||
|
||
logger.info(`文件保存成功: ${ filePath }`)
|
||
socket.emit('file_saved', { filePath })
|
||
|
||
} catch (err) {
|
||
logger.error('保存文件失败:', err.message)
|
||
socket.emit('file_save_error', {
|
||
error: err.message,
|
||
filePath
|
||
})
|
||
}
|
||
})
|
||
|
||
// -------- 上传相关功能 --------
|
||
|
||
// 开始上传
|
||
socket.on('upload_start', async ({ taskId, fileName, fileSize, targetPath }) => {
|
||
try {
|
||
logger.info(`收到上传请求: ${ fileName }, 大小: ${ (fileSize / 1024 / 1024 / 1024).toFixed(2) }GB`)
|
||
|
||
if (!taskId || !fileName || !fileSize || !targetPath) {
|
||
throw new Error('上传参数不完整')
|
||
}
|
||
|
||
// 创建临时文件路径(清理文件名中的特殊字符)
|
||
const safeFileName = fileName.replace(/[<>:"|?*]/g, '_')
|
||
const tempFilePath = rawPath.join(sftpCacheDir, `temp_${ taskId }_${ safeFileName }`)
|
||
// 创建上传任务
|
||
const uploadTask = {
|
||
taskId,
|
||
fileName,
|
||
fileSize,
|
||
targetPath,
|
||
tempFilePath,
|
||
uploadedSize: 0,
|
||
receivedChunks: new Set(), // 使用Set记录已接收的分片索引
|
||
totalChunks: 0,
|
||
writeStream: null, // 稍后在第一个分片到达时创建
|
||
startTime: Date.now(),
|
||
lastProgressTime: Date.now(),
|
||
abortController: new AbortController()
|
||
}
|
||
|
||
uploadTasks.set(taskId, uploadTask)
|
||
logger.info(`开始上传任务: ${ taskId } - ${ fileName }`)
|
||
socket.emit('upload_started', { taskId, fileName })
|
||
|
||
} catch (err) {
|
||
logger.error('开始上传失败:', err.message)
|
||
socket.emit('upload_fail', { taskId, error: err.message })
|
||
}
|
||
})
|
||
|
||
// 上传文件分片
|
||
socket.on('upload_chunk', async ({ taskId, chunkIndex, chunkData, totalChunks, isLastChunk }) => {
|
||
try {
|
||
const task = uploadTasks.get(taskId)
|
||
|
||
if (!task) {
|
||
throw new Error('上传任务不存在')
|
||
}
|
||
|
||
if (task.abortController.signal.aborted) {
|
||
throw new Error('上传已取消')
|
||
}
|
||
|
||
// 初始化写入流(仅第一次)
|
||
if (!task.writeStream) {
|
||
// 确保缓存目录存在
|
||
fs.ensureDirSync(sftpCacheDir)
|
||
|
||
task.writeStream = fs.createWriteStream(task.tempFilePath)
|
||
task.totalChunks = totalChunks
|
||
|
||
// 处理写入流错误(将错误标记到任务中)
|
||
task.writeStream.on('error', (err) => {
|
||
logger.error(`写入流错误: ${ taskId }`, err.message)
|
||
task.writeStreamError = err
|
||
// 销毁流,防止继续写入
|
||
if (!task.writeStream.destroyed) {
|
||
task.writeStream.destroy()
|
||
}
|
||
})
|
||
}
|
||
|
||
// 检查写入流是否已经出错
|
||
if (task.writeStreamError) {
|
||
throw new Error(`文件写入失败: ${ task.writeStreamError.message }`)
|
||
}
|
||
|
||
// 直接写入文件流,不存储在内存
|
||
const chunkBuffer = Buffer.from(chunkData)
|
||
await new Promise((resolve, reject) => {
|
||
task.writeStream.write(chunkBuffer, (err) => {
|
||
if (err) {
|
||
reject(err)
|
||
} else {
|
||
resolve()
|
||
}
|
||
})
|
||
})
|
||
|
||
// 记录已接收的分片
|
||
task.receivedChunks.add(chunkIndex)
|
||
// 更新上传进度
|
||
const uploadedChunks = task.receivedChunks.size
|
||
task.uploadedSize = (uploadedChunks / totalChunks) * task.fileSize
|
||
|
||
// 计算速度和ETA
|
||
const now = Date.now()
|
||
const elapsed = (now - task.lastProgressTime) / 1000
|
||
if (elapsed >= 1) { // 每秒更新一次进度
|
||
const chunkProgress = Math.min((uploadedChunks / totalChunks) * 100, 100)
|
||
const totalElapsed = (now - task.startTime) / 1000
|
||
const speed = totalElapsed > 0 ? task.uploadedSize / totalElapsed : 0
|
||
const eta = speed > 0 ? (task.fileSize - task.uploadedSize) / speed : 0
|
||
|
||
socket.emit('upload_progress', {
|
||
taskId,
|
||
chunkProgress,
|
||
chunkUploadedSize: task.uploadedSize,
|
||
chunkTotalSize: task.fileSize,
|
||
sftpProgress: 0,
|
||
sftpUploadedSize: 0,
|
||
sftpTotalSize: task.fileSize,
|
||
speed: Math.round(speed),
|
||
eta: Math.round(eta),
|
||
stage: 'uploading'
|
||
})
|
||
|
||
task.lastProgressTime = now
|
||
}
|
||
|
||
// logger.info(`上传分片: ${ taskId } - ${ chunkIndex + 1 }/${ totalChunks }`) // 太频繁,注释掉
|
||
socket.emit('upload_chunk_success', { taskId, chunkIndex })
|
||
|
||
// 如果是最后一个分片,完成文件写入并开始传输
|
||
if (isLastChunk && uploadedChunks === totalChunks) {
|
||
await completeUpload(task)
|
||
}
|
||
|
||
} catch (err) {
|
||
logger.error('上传分片失败:', err.message)
|
||
socket.emit('upload_chunk_fail', { taskId, chunkIndex, error: err.message })
|
||
|
||
// 清理资源
|
||
const failedTask = uploadTasks.get(taskId)
|
||
if (failedTask && failedTask.writeStream) {
|
||
failedTask.writeStream.destroy()
|
||
}
|
||
}
|
||
})
|
||
|
||
// 完成上传的辅助函数
|
||
async function completeUpload(task) {
|
||
try {
|
||
logger.info(`文件接收完成,准备传输: ${ task.fileName }`)
|
||
|
||
// 关闭写入流
|
||
if (task.writeStream) {
|
||
await new Promise((resolve, reject) => {
|
||
task.writeStream.end((err) => {
|
||
if (err) reject(err)
|
||
else resolve()
|
||
})
|
||
})
|
||
task.writeStream = null
|
||
}
|
||
|
||
// 验证文件大小
|
||
const stats = fs.statSync(task.tempFilePath)
|
||
if (stats.size !== task.fileSize) {
|
||
throw new Error(`文件大小不匹配: 期望 ${ task.fileSize }, 实际 ${ stats.size }`)
|
||
}
|
||
|
||
// 发送SFTP传输开始事件
|
||
socket.emit('upload_progress', {
|
||
taskId: task.taskId,
|
||
chunkProgress: 100,
|
||
chunkUploadedSize: task.fileSize,
|
||
chunkTotalSize: task.fileSize,
|
||
sftpProgress: 0,
|
||
sftpUploadedSize: 0,
|
||
sftpTotalSize: task.fileSize,
|
||
speed: 0,
|
||
eta: 0,
|
||
stage: 'transferring'
|
||
})
|
||
|
||
// 通过SFTP传输到远程服务器,使用fastPut带进度回调
|
||
logger.info(`开始传输文件到远程服务器: ${ task.targetPath }`)
|
||
|
||
let sftpStartTime = Date.now()
|
||
let lastSftpUpdateTime = sftpStartTime
|
||
let lastSftpUploadedSize = 0
|
||
|
||
await sftpClient.fastPut(task.tempFilePath, task.targetPath, {
|
||
step: (transferredBytes) => {
|
||
const now = Date.now()
|
||
const sftpProgress = Math.min((transferredBytes / task.fileSize) * 100, 100)
|
||
|
||
// 计算SFTP传输速度和ETA(每500ms更新一次)
|
||
if (now - lastSftpUpdateTime >= 500) {
|
||
const elapsed = (now - lastSftpUpdateTime) / 1000
|
||
const sizeDiff = transferredBytes - lastSftpUploadedSize
|
||
const sftpSpeed = elapsed > 0 ? sizeDiff / elapsed : 0
|
||
const remainingBytes = task.fileSize - transferredBytes
|
||
const sftpEta = sftpSpeed > 0 ? remainingBytes / sftpSpeed : 0
|
||
|
||
socket.emit('upload_progress', {
|
||
taskId: task.taskId,
|
||
chunkProgress: 100,
|
||
chunkUploadedSize: task.fileSize,
|
||
chunkTotalSize: task.fileSize,
|
||
sftpProgress,
|
||
sftpUploadedSize: transferredBytes,
|
||
sftpTotalSize: task.fileSize,
|
||
speed: Math.round(sftpSpeed),
|
||
eta: Math.round(sftpEta),
|
||
stage: 'transferring'
|
||
})
|
||
|
||
lastSftpUpdateTime = now
|
||
lastSftpUploadedSize = transferredBytes
|
||
}
|
||
}
|
||
})
|
||
|
||
// 传输完成
|
||
logger.info(`文件上传成功: ${ task.fileName }`)
|
||
socket.emit('upload_complete', {
|
||
taskId: task.taskId,
|
||
fileName: task.fileName,
|
||
targetPath: task.targetPath
|
||
})
|
||
|
||
} catch (err) {
|
||
logger.error('完成上传失败:', err.message)
|
||
socket.emit('upload_fail', {
|
||
taskId: task.taskId,
|
||
error: err.message
|
||
})
|
||
} finally {
|
||
// 确保清理临时文件
|
||
if (task.tempFilePath && fs.existsSync(task.tempFilePath)) {
|
||
try {
|
||
fs.unlinkSync(task.tempFilePath)
|
||
logger.info(`已清理临时文件: ${ task.tempFilePath }`)
|
||
} catch (cleanupErr) {
|
||
logger.warn('清理临时文件失败:', cleanupErr.message)
|
||
}
|
||
}
|
||
|
||
// 确保关闭写入流
|
||
if (task.writeStream) {
|
||
task.writeStream.destroy()
|
||
}
|
||
|
||
// 清理任务
|
||
uploadTasks.delete(task.taskId)
|
||
}
|
||
}
|
||
|
||
// 取消上传
|
||
socket.on('upload_cancel', ({ taskId }) => {
|
||
const task = uploadTasks.get(taskId)
|
||
if (task) {
|
||
task.abortController.abort()
|
||
// 关闭写入流
|
||
if (task.writeStream) {
|
||
task.writeStream.destroy()
|
||
}
|
||
// 清理临时文件
|
||
if (task.tempFilePath && fs.existsSync(task.tempFilePath)) {
|
||
try {
|
||
fs.unlinkSync(task.tempFilePath)
|
||
logger.info(`取消上传,已清理临时文件: ${ task.tempFilePath }`)
|
||
} catch (cleanupErr) {
|
||
logger.warn('清理临时文件失败:', cleanupErr.message)
|
||
}
|
||
}
|
||
uploadTasks.delete(taskId)
|
||
|
||
logger.info(`取消上传任务: ${ taskId }`)
|
||
socket.emit('upload_cancelled', { taskId })
|
||
}
|
||
})
|
||
|
||
// 监听连接断开,清理下载任务和缓存
|
||
socket.on('disconnect', async (reason) => {
|
||
try {
|
||
logger.info('SFTP连接断开,开始清理资源...', reason)
|
||
|
||
// 清理定时器
|
||
if (memoryCleanupInterval) {
|
||
clearInterval(memoryCleanupInterval)
|
||
}
|
||
|
||
// 清理所有远程临时文件
|
||
const remoteCleanupPromises = []
|
||
for (const [taskId, task] of downloadTasks) {
|
||
try {
|
||
// 取消下载任务
|
||
if (task.abortController) {
|
||
task.abortController.abort()
|
||
}
|
||
|
||
// 如果有远程临时文件,添加到清理列表
|
||
if (task.remoteTarPath) {
|
||
remoteCleanupPromises.push(cleanupRemoteTarFile(task.remoteTarPath))
|
||
}
|
||
} catch (taskError) {
|
||
logger.warn(`清理下载任务 ${ taskId } 失败:`, taskError.message)
|
||
}
|
||
}
|
||
|
||
// 并行清理所有远程临时文件
|
||
if (remoteCleanupPromises.length > 0) {
|
||
try {
|
||
await Promise.all(remoteCleanupPromises)
|
||
logger.info(`连接断开时已清理 ${ remoteCleanupPromises.length } 个远程临时文件`)
|
||
} catch (err) {
|
||
logger.warn('连接断开时清理远程临时文件部分失败:', err.message)
|
||
}
|
||
}
|
||
|
||
// 清理上传任务和相关资源
|
||
for (const [taskId, task] of uploadTasks) {
|
||
try {
|
||
if (task.abortController) {
|
||
task.abortController.abort()
|
||
}
|
||
// 关闭写入流
|
||
if (task.writeStream) {
|
||
task.writeStream.destroy()
|
||
}
|
||
// 清理临时文件
|
||
if (task.tempFilePath && fs.existsSync(task.tempFilePath)) {
|
||
try {
|
||
fs.unlinkSync(task.tempFilePath)
|
||
logger.info(`连接断开,已清理临时文件: ${ task.tempFilePath }`)
|
||
} catch (cleanupErr) {
|
||
logger.warn('清理临时文件失败:', cleanupErr.message)
|
||
}
|
||
}
|
||
} catch (taskError) {
|
||
logger.warn(`清理上传任务 ${ taskId } 失败:`, taskError.message)
|
||
}
|
||
}
|
||
|
||
// 清理所有任务映射
|
||
downloadTasks.clear()
|
||
uploadTasks.clear()
|
||
|
||
// 清理本地缓存目录
|
||
try {
|
||
cleanupCacheDir()
|
||
} catch (cleanupError) {
|
||
logger.warn('清理本地缓存目录失败:', cleanupError.message)
|
||
}
|
||
|
||
logger.info('SFTP资源清理完成')
|
||
} catch (disconnectError) {
|
||
logger.error('Socket断开连接清理过程中发生错误:', disconnectError.message)
|
||
}
|
||
})
|
||
|
||
// 定期内存清理(可选优化)
|
||
const memoryCleanupInterval = setInterval(() => {
|
||
// 检查并清理超时的上传任务(超过1小时)
|
||
const now = Date.now()
|
||
const timeout = 2 * 60 * 60 * 1000 // 2小时
|
||
|
||
for (const [taskId, task] of uploadTasks) {
|
||
if (now - task.startTime > timeout) {
|
||
logger.warn(`清理超时上传任务: ${ taskId }`)
|
||
if (task.abortController) {
|
||
task.abortController.abort()
|
||
}
|
||
// 关闭写入流
|
||
if (task.writeStream) {
|
||
task.writeStream.destroy()
|
||
}
|
||
// 清理临时文件
|
||
if (task.tempFilePath && fs.existsSync(task.tempFilePath)) {
|
||
try {
|
||
fs.unlinkSync(task.tempFilePath)
|
||
logger.info(`清理超时任务临时文件: ${ task.tempFilePath }`)
|
||
} catch (cleanupErr) {
|
||
logger.warn('清理临时文件失败:', cleanupErr.message)
|
||
}
|
||
}
|
||
|
||
uploadTasks.delete(taskId)
|
||
}
|
||
}
|
||
|
||
// 检查并清理超时的下载任务
|
||
for (const [taskId, task] of downloadTasks) {
|
||
if (now - task.startTime > timeout) {
|
||
logger.warn(`清理超时下载任务: ${ taskId }`)
|
||
if (task.abortController) {
|
||
task.abortController.abort()
|
||
}
|
||
downloadTasks.delete(taskId)
|
||
}
|
||
}
|
||
}, 5 * 60 * 1000) // 每5分钟检查一次
|
||
|
||
// 带进度的文件下载函数
|
||
async function downloadFileWithProgress(sftpClient, remotePath, localPath, taskId, downloadTasks, socket, abortController) {
|
||
return new Promise((resolve, reject) => {
|
||
const task = downloadTasks.get(taskId)
|
||
if (!task) {
|
||
return reject(new Error('任务不存在'))
|
||
}
|
||
|
||
let lastUpdateTime = Date.now()
|
||
let lastDownloadedSize = 0
|
||
let progressInterval = null
|
||
let readStream = null
|
||
let writeStream = null
|
||
|
||
// 清理函数
|
||
const cleanup = () => {
|
||
if (progressInterval) {
|
||
clearInterval(progressInterval)
|
||
progressInterval = null
|
||
}
|
||
if (readStream) {
|
||
readStream.destroy()
|
||
readStream = null
|
||
}
|
||
if (writeStream) {
|
||
writeStream.destroy()
|
||
writeStream = null
|
||
}
|
||
}
|
||
|
||
// 错误处理函数
|
||
const handleError = (err) => {
|
||
cleanup()
|
||
fs.unlink(localPath).catch(() => {})
|
||
reject(err)
|
||
}
|
||
|
||
progressInterval = setInterval(() => {
|
||
if (abortController.signal.aborted) {
|
||
cleanup()
|
||
reject(new Error('下载已取消'))
|
||
return
|
||
}
|
||
|
||
const now = Date.now()
|
||
const currentTask = downloadTasks.get(taskId)
|
||
if (!currentTask) {
|
||
cleanup()
|
||
reject(new Error('任务已被删除'))
|
||
return
|
||
}
|
||
|
||
const { totalSize, downloadedSize } = currentTask // , startTime
|
||
// const elapsed = (now - startTime) / 1000 // 秒
|
||
const progress = totalSize > 0 ? (downloadedSize / totalSize * 100) : 0
|
||
|
||
// 计算速度 (bytes/s)
|
||
const timeDiff = (now - lastUpdateTime) / 1000
|
||
const sizeDiff = downloadedSize - lastDownloadedSize
|
||
const speed = timeDiff > 0 ? sizeDiff / timeDiff : 0
|
||
|
||
// 计算剩余时间
|
||
const remainingBytes = totalSize - downloadedSize
|
||
const eta = speed > 0 ? remainingBytes / speed : 0
|
||
|
||
socket.emit('download_progress', {
|
||
taskId,
|
||
progress: Math.min(progress, 100),
|
||
downloadedSize,
|
||
totalSize,
|
||
speed: Math.round(speed),
|
||
eta: Math.round(eta)
|
||
})
|
||
|
||
lastUpdateTime = now
|
||
lastDownloadedSize = downloadedSize
|
||
}, 1000) // 每1秒更新
|
||
|
||
try {
|
||
readStream = sftpClient.createReadStream(remotePath)
|
||
writeStream = fs.createWriteStream(localPath)
|
||
|
||
readStream.on('data', (chunk) => {
|
||
if (abortController.signal.aborted) {
|
||
cleanup()
|
||
fs.unlink(localPath).catch(() => {}) // 删除部分下载的文件
|
||
reject(new Error('下载已取消'))
|
||
return
|
||
}
|
||
|
||
const currentTask = downloadTasks.get(taskId)
|
||
if (currentTask) {
|
||
currentTask.downloadedSize += chunk.length
|
||
}
|
||
})
|
||
|
||
readStream.on('error', handleError)
|
||
writeStream.on('error', handleError)
|
||
|
||
writeStream.on('finish', () => {
|
||
cleanup()
|
||
resolve()
|
||
})
|
||
|
||
readStream.pipe(writeStream)
|
||
} catch (err) {
|
||
handleError(err)
|
||
}
|
||
})
|
||
}
|
||
}
|
||
|
||
module.exports = (httpServer) => {
|
||
const serverIo = createSecureWs(httpServer, '/sftp-v2', {
|
||
// 增加消息大小限制,支持大文件上传(默认1MB)
|
||
maxHttpBufferSize: 10 * 1024 * 1024, // 10MB
|
||
pingTimeout: 10000, // 10秒
|
||
pingInterval: 10000
|
||
})
|
||
|
||
serverIo.on('connection', async (socket) => {
|
||
let sftpClient = new SFTPClient()
|
||
logger.info('sftp-v2 websocket 已连接')
|
||
let jumpSshClients = []
|
||
|
||
// 添加socket本身的错误处理
|
||
socket.on('error', (err) => {
|
||
logger.error('SFTP-v2 Socket连接错误:', err.message)
|
||
})
|
||
|
||
socket.on('ws_sftp', async ({ hostId }) => {
|
||
try {
|
||
const targetHostInfo = await hostListDB.findOneAsync({ _id: hostId })
|
||
if (!targetHostInfo) throw new Error(`Host with ID ${ hostId } not found`)
|
||
|
||
let { authType, host, port, username } = targetHostInfo
|
||
let { authInfo: targetConnectionOptions } = await getConnectionOptions(hostId)
|
||
|
||
// 使用通用的代理和跳板机连接处理函数
|
||
try {
|
||
const result = await handleProxyAndJumpHostConnection({
|
||
hostInfo: targetHostInfo,
|
||
targetConnectionOptions,
|
||
socket,
|
||
logPrefix: 'SFTP '
|
||
})
|
||
jumpSshClients = result.jumpSshClients
|
||
} catch (proxyError) {
|
||
socket.emit('connect_fail', `代理连接失败: ${ proxyError.message }`)
|
||
return
|
||
}
|
||
|
||
logger.info('准备连接sftp-v2 面板:', host)
|
||
logger.info('连接信息', { username, port, authType })
|
||
|
||
sftpClient.client = new SSHClient()
|
||
|
||
// 添加错误处理器,防止程序崩溃
|
||
sftpClient.client.on('error', (err) => {
|
||
logger.error('SFTP SSH连接错误:', err.message)
|
||
try {
|
||
// 发送SSH连接错误事件
|
||
socket.emit('shell_connection_error', {
|
||
message: `SSH连接错误: ${ err.message }`,
|
||
code: err.code || 'UNKNOWN'
|
||
})
|
||
} catch (emitError) {
|
||
logger.error('发送错误事件失败:', emitError.message)
|
||
}
|
||
})
|
||
|
||
sftpClient.client.on('end', () => {
|
||
logger.info('SSH连接正常结束')
|
||
})
|
||
|
||
sftpClient.client.on('close', (hadError) => {
|
||
if (hadError) {
|
||
logger.warn('SSH连接异常关闭')
|
||
try {
|
||
socket.emit('shell_connection_error', {
|
||
message: 'SSH连接异常关闭',
|
||
code: 'CONNECTION_CLOSED'
|
||
})
|
||
} catch (emitError) {
|
||
logger.error('发送连接关闭事件失败:', emitError.message)
|
||
}
|
||
} else {
|
||
logger.info('SSH连接已关闭')
|
||
}
|
||
})
|
||
|
||
// 添加未处理异常捕获
|
||
// sftpClient.client.on('timeout', () => {
|
||
// logger.warn('SSH连接超时')
|
||
// try {
|
||
// socket.emit('shell_connection_error', {
|
||
// message: 'SSH连接超时',
|
||
// code: 'CONNECTION_TIMEOUT'
|
||
// })
|
||
// } catch (emitError) {
|
||
// logger.error('发送超时事件失败:', emitError.message)
|
||
// }
|
||
// })
|
||
|
||
sftpClient.client.on('keyboard-interactive', function (name, instructions, instructionsLang, prompts, finish) {
|
||
finish([targetConnectionOptions[authType]])
|
||
})
|
||
|
||
try {
|
||
await sftpClient.connect({
|
||
tryKeyboard: true,
|
||
...targetConnectionOptions
|
||
})
|
||
fs.ensureDirSync(sftpCacheDir)
|
||
let rootList = []
|
||
let isRootUser = true
|
||
let currentWorkingDir = '/'
|
||
|
||
// 如果是 root 用户,直接进入 root 家目录
|
||
if (username === 'root') {
|
||
try {
|
||
currentWorkingDir = '/root'
|
||
rootList = await sftpClient.list('/root')
|
||
} catch (rootError) {
|
||
// 如果 /root 不存在或无权限,降级到根目录
|
||
logger.warn('获取 /root 失败,降级到根目录:', rootError.message)
|
||
currentWorkingDir = '/'
|
||
rootList = await sftpClient.list('/')
|
||
logger.info('root 用户进入根目录成功')
|
||
}
|
||
} else {
|
||
try {
|
||
rootList = await sftpClient.list('/')
|
||
logger.info('获取根目录成功')
|
||
} catch (error) {
|
||
logger.error('获取根目录失败:', error.message)
|
||
logger.info('尝试获取当前目录')
|
||
isRootUser = false
|
||
|
||
try {
|
||
// 获取当前工作目录的绝对路径
|
||
currentWorkingDir = await sftpClient.cwd()
|
||
logger.info('当前工作目录:', currentWorkingDir)
|
||
rootList = await sftpClient.list(currentWorkingDir)
|
||
logger.info('获取当前目录成功')
|
||
} catch (cwdError) {
|
||
logger.warn('获取工作目录失败,使用相对路径:', cwdError.message)
|
||
currentWorkingDir = '~'
|
||
rootList = await sftpClient.list('./')
|
||
logger.info('获取当前目录成功')
|
||
}
|
||
}
|
||
}
|
||
|
||
// 格式化文件列表
|
||
const formattedRootList = formatFileList(rootList)
|
||
|
||
// 普通文件-、目录文件d、链接文件l
|
||
socket.emit('connect_success', {
|
||
rootList: formattedRootList,
|
||
isRootUser,
|
||
currentPath: currentWorkingDir
|
||
})
|
||
logger.info('连接sftp-v2 成功:', host)
|
||
listenAction(sftpClient, socket)
|
||
} catch (error) {
|
||
logger.error('连接sftp-v2 失败:', error.message)
|
||
|
||
// 发送详细的错误信息给前端
|
||
let errorMessage = error.message
|
||
if (error.code) {
|
||
errorMessage += ` (错误代码: ${ error.code })`
|
||
}
|
||
if (error.errno) {
|
||
errorMessage += ` (错误号: ${ error.errno })`
|
||
}
|
||
|
||
socket.emit('connect_fail', errorMessage)
|
||
|
||
// 清理资源
|
||
try {
|
||
if (sftpClient && sftpClient.sftp) {
|
||
await sftpClient.end()
|
||
}
|
||
} catch (cleanupError) {
|
||
logger.warn('清理SFTP客户端资源失败:', cleanupError.message)
|
||
}
|
||
|
||
// 清理跳板机连接
|
||
if (jumpSshClients && jumpSshClients.length > 0) {
|
||
jumpSshClients.forEach((client, index) => {
|
||
try {
|
||
client.end()
|
||
logger.info(`已清理跳板机连接 ${ index + 1 }`)
|
||
} catch (cleanupError) {
|
||
logger.warn(`清理跳板机连接 ${ index + 1 } 失败:`, cleanupError.message)
|
||
}
|
||
})
|
||
}
|
||
|
||
socket.disconnect()
|
||
}
|
||
} catch (globalError) {
|
||
logger.error('SFTP连接处理过程中发生未预期错误:', globalError.message)
|
||
try {
|
||
socket.emit('connect_fail', `连接失败: ${ globalError.message }`)
|
||
socket.disconnect()
|
||
} catch (cleanupError) {
|
||
logger.error('清理失败连接时发生错误:', cleanupError.message)
|
||
}
|
||
}
|
||
})
|
||
|
||
socket.on('disconnect', async () => {
|
||
try {
|
||
await sftpClient.end()
|
||
logger.info('sftp-v2 连接断开')
|
||
} catch (error) {
|
||
logger.info('sftp断开连接失败:', error.message)
|
||
} finally {
|
||
sftpClient = null
|
||
// 这里不再清理缓存目录,因为在 listenAction 中的 disconnect 处理器会处理
|
||
jumpSshClients?.forEach(sshClient => sshClient && sshClient.end())
|
||
jumpSshClients = null
|
||
}
|
||
})
|
||
})
|
||
}
|