Merge remote-tracking branch 'origin/master'

# Conflicts:
#	CHANGELOG.md
This commit is contained in:
userA
2023-05-19 12:18:18 +08:00
33 changed files with 644 additions and 343 deletions

View File

@@ -31,7 +31,7 @@
* [V 1.0.0]()
------
## [V 1.0.7] - 2023.5.18
## [V 1.0.6] - 2023.5.6
### 🎈 doc
- ChopperBot技术文档
------

View File

@@ -64,6 +64,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20210307</version>
</dependency>
</dependencies>

View File

@@ -1,43 +1,43 @@
package org.example;
import org.example.core.manager.LoadTaskManager;
import org.example.pojo.download.assign.LoadConfig_R_Douyu;
import org.example.danmaku.core.manager.LoadTaskManager;
import org.example.danmaku.pojo.download.assign.BilibiliLiveLoadConfig;
public class Test {
public static void main(String[] args) throws Exception {
// 创建一个任务管理器
LoadTaskManager manager = new LoadTaskManager();
// 创建一个斗鱼录播的配置类
LoadConfig_R_Douyu dsm = new LoadConfig_R_Douyu("大司马", "Kp1QM8gb4ow7k4bj");
// 创建一个B站直播的配置类
BilibiliLiveLoadConfig zzgz = new BilibiliLiveLoadConfig("猪猪公主", "6154037");
// 创建一个任务,返回一个唯一的key
String dsmTask = manager.addTask(dsm);
String task = manager.creatTask(zzgz);
// 开启此次下载任务
manager.startTask(dsmTask);
manager.startTask(task);
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
System.out.println(dsmTask + "(运行状态):" + manager.isTaskRunning(dsmTask));
System.out.println(dsmTask + "(缓存中弹幕条数):" + manager.getCacheSize(dsmTask));
System.out.println(task + "(运行状态):" + manager.isTaskRunning(task));
System.out.println(task + "(缓存中弹幕条数):" + manager.getCacheSize(task));
}
// 刷入数据到其他地方去
System.out.println(dsmTask + "(刷入数据):" + manager.flushTaskCacheAndSave(dsmTask));
System.out.println(task + "(刷入数据):" + manager.flushTaskCacheAndSave(task));
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
System.out.println(dsmTask + "(运行状态):" + manager.isTaskRunning(dsmTask));
System.out.println(dsmTask + "(缓存中弹幕条数):" + manager.getCacheSize(dsmTask));
System.out.println(task + "(运行状态):" + manager.isTaskRunning(task));
System.out.println(task + "(缓存中弹幕条数):" + manager.getCacheSize(task));
}
// 刷入数据到其他地方去
System.out.println(dsmTask + "(刷入数据):" + manager.flushTaskCacheAndSave(dsmTask));
System.out.println(task + "(刷入数据):" + manager.flushTaskCacheAndSave(task));
// 结束掉任务
manager.endTask(dsmTask);
manager.endTask(task);
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
System.out.println(dsmTask + "(运行状态):" + manager.isTaskRunning(dsmTask));
System.out.println(dsmTask + "(缓存中弹幕条数):" + manager.getCacheSize(dsmTask));
System.out.println(task + "(运行状态):" + manager.isTaskRunning(task));
System.out.println(task + "(缓存中弹幕条数):" + manager.getCacheSize(task));
}
while(true){

View File

@@ -1,54 +0,0 @@
package org.example.core.factory;
import org.example.core.processor.AbstractProcessor;
import org.example.core.processor.Process_L_Bilibili;
import org.example.core.processor.Process_R_Douyu;
import org.example.pojo.download.LoadConfig;
import org.example.pojo.download.assign.LoadConfig_L_Bilibili;
import org.example.pojo.download.assign.LoadConfig_R_Douyu;
import org.example.utils.PachongConfig;
/**
* 处理器工厂
*
* @author 燧枫
* @date 2023/4/23 21:28
*/
public class ProcessorFactory {
private final String bi_userAgent = PachongConfig.getProperty("dy.userAgent");
private final int bi_retryTimes = PachongConfig.getIntProperty("dy.retryTimes");
private final int bi_retrySleepTime = PachongConfig.getIntProperty("dy.retrySleepTime");
private final int bi_sleepTime = PachongConfig.getIntProperty("dy.sleepTime");
private final String dy_userAgent = PachongConfig.getProperty("bi.userAgent");
private final int dy_retryTimes = PachongConfig.getIntProperty("bi.retryTimes");
private final int dy_retrySleepTime = PachongConfig.getIntProperty("bi.retrySleepTime");
private final int dy_sleepTime = PachongConfig.getIntProperty("bi.sleepTime");
/**
* 通过配置信息来获取一个处理器
*
* @param loadConfig
* @return AbstractProcessor
*/
public AbstractProcessor getProcessor(LoadConfig loadConfig) {
if (loadConfig == null) {
return null;
}
// 斗鱼录播
if (loadConfig instanceof LoadConfig_R_Douyu) {
return new Process_R_Douyu((LoadConfig_R_Douyu) loadConfig, dy_retryTimes,
dy_retrySleepTime, dy_userAgent, dy_sleepTime);
}
// B站直播
else if (loadConfig instanceof LoadConfig_L_Bilibili) {
return new Process_L_Bilibili((LoadConfig_L_Bilibili) loadConfig, bi_retryTimes,
bi_retrySleepTime, bi_userAgent, bi_sleepTime);
}
return null;
}
}

View File

@@ -1,40 +0,0 @@
package org.example.core.factory;
import org.example.core.control.LoadTask;
import org.example.core.control.impl.LoadTask_L_Bilibili;
import org.example.core.control.impl.LoadTask_R_Douyu;
import org.example.exception.FileCacheException;
import org.example.pojo.download.LoadConfig;
import org.example.pojo.download.assign.LoadConfig_L_Bilibili;
import org.example.pojo.download.assign.LoadConfig_R_Douyu;
/**
* 弹幕下载任务工厂
* @author 燧枫
* @date 2023/4/23 19:36
*/
public class TaskFactory {
/**
* 通过配置信息来获取一个任务
* @param loadConfig
* @return LoadTask
*/
public LoadTask getLoadTask(LoadConfig loadConfig) throws FileCacheException {
if (loadConfig == null) {
return null;
}
// 斗鱼录播
if (loadConfig instanceof LoadConfig_R_Douyu) {
return new LoadTask_R_Douyu((LoadConfig_R_Douyu) loadConfig);
}
// B站直播
else if (loadConfig instanceof LoadConfig_L_Bilibili) {
return new LoadTask_L_Bilibili((LoadConfig_L_Bilibili) loadConfig);
}
return null;
}
}

View File

@@ -1,27 +0,0 @@
package org.example.core.processor;
import org.example.pojo.download.assign.LoadConfig_L_Bilibili;
import us.codecraft.webmagic.Page;
/**
* (B站直播)下载与处理
* @author 燧枫
* @date 2023/4/23 18:53
*/
public class Process_L_Bilibili extends AbstractProcessor {
LoadConfig_L_Bilibili loadConfig;
public Process_L_Bilibili(LoadConfig_L_Bilibili loadConfig, int retryTimes, int retrySleepTime, String userAgent, int sleepTime) {
super(retryTimes, retrySleepTime, userAgent, sleepTime);
this.loadConfig = loadConfig;
}
@Override
public void process(Page page) {
// 首次加载
if (isFirst) {
isFirst = false;
}
}
}

View File

@@ -1,4 +1,4 @@
package org.example.core.control;
package org.example.danmaku.core.control;
/**
* 弹幕下载任务

View File

@@ -1,13 +1,12 @@
package org.example.core.control.impl;
package org.example.danmaku.core.control.impl;
import org.example.constpool.ConstPool;
import org.example.core.control.LoadTask;
import org.example.core.factory.ProcessorFactory;
import org.example.core.pipeline.PipelineWriteJson;
import org.example.core.processor.Process_L_Bilibili;
import org.example.exception.FileCacheException;
import org.example.pojo.download.LoadConfig;
import org.example.utils.PachongConfig;
import org.example.danmaku.core.control.LoadTask;
import org.example.danmaku.core.factory.ProcessorFactory;
import org.example.danmaku.core.pipeline.PipelineWriteJson;
import org.example.danmaku.core.processor.BilibiliLiveProcessor;
import org.example.danmaku.pojo.download.LoadConfig;
import org.example.danmaku.utils.CreeperConfig;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Spider;
@@ -16,24 +15,24 @@ import us.codecraft.webmagic.Spider;
* @author 燧枫
* @date 2023/4/23 18:19
*/
public class LoadTask_L_Bilibili implements LoadTask {
public class BilibiliLiveLoadTask implements LoadTask {
private final int threadCnt = PachongConfig.getIntProperty("bi.threadCnt");
private final int threadCnt = CreeperConfig.getIntProperty("bi.threadCnt");
private final int emptySleepTime = PachongConfig.getIntProperty("bi.emptySleepTime");
private final int emptySleepTime = CreeperConfig.getIntProperty("bi.emptySleepTime");
private final Process_L_Bilibili process_l_bilibili;
private final BilibiliLiveProcessor bilibiliLiveProcessor;
private final PipelineWriteJson pipelineWriteJson;
public LoadTask_L_Bilibili(LoadConfig loadConfig) throws FileCacheException {
process_l_bilibili = (Process_L_Bilibili) new ProcessorFactory().getProcessor(loadConfig);
public BilibiliLiveLoadTask(LoadConfig loadConfig) {
bilibiliLiveProcessor = (BilibiliLiveProcessor) new ProcessorFactory().getProcessor(loadConfig);
pipelineWriteJson = new PipelineWriteJson(loadConfig);
}
@Override
public void start() {
Spider.create(process_l_bilibili)
Spider.create(bilibiliLiveProcessor)
// 设置起始Request
.addRequest(new Request(ConstPool.OCCUURL))
// 设置结果处理类
@@ -47,12 +46,12 @@ public class LoadTask_L_Bilibili implements LoadTask {
@Override
public void end() {
process_l_bilibili.end();
bilibiliLiveProcessor.end();
}
@Override
public boolean isRunning() {
return process_l_bilibili.isRunning();
return bilibiliLiveProcessor.isRunning();
}
@Override

View File

@@ -1,13 +1,12 @@
package org.example.core.control.impl;
package org.example.danmaku.core.control.impl;
import org.example.constpool.ConstPool;
import org.example.core.control.LoadTask;
import org.example.core.factory.ProcessorFactory;
import org.example.core.pipeline.PipelineWriteJson;
import org.example.core.processor.Process_R_Douyu;
import org.example.exception.FileCacheException;
import org.example.pojo.download.LoadConfig;
import org.example.utils.PachongConfig;
import org.example.danmaku.core.control.LoadTask;
import org.example.danmaku.core.factory.ProcessorFactory;
import org.example.danmaku.core.pipeline.PipelineWriteJson;
import org.example.danmaku.core.processor.DouyuRecordProcessor;
import org.example.danmaku.pojo.download.LoadConfig;
import org.example.danmaku.utils.CreeperConfig;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Spider;
@@ -16,24 +15,24 @@ import us.codecraft.webmagic.Spider;
* @author 燧枫
* @date 2023/4/23 18:12
*/
public class LoadTask_R_Douyu implements LoadTask {
public class DouyuRecordLoadTask implements LoadTask {
private final int threadCnt = PachongConfig.getIntProperty("dy.threadCnt");
private final int threadCnt = CreeperConfig.getIntProperty("dy.threadCnt");
private final int emptySleepTime = PachongConfig.getIntProperty("dy.emptySleepTime");
private final int emptySleepTime = CreeperConfig.getIntProperty("dy.emptySleepTime");
private final Process_R_Douyu process_r_douyu;
private final DouyuRecordProcessor douyuRecordProcessor;
private final PipelineWriteJson pipelineWriteJson;
public LoadTask_R_Douyu(LoadConfig loadConfig) throws FileCacheException {
process_r_douyu = (Process_R_Douyu) new ProcessorFactory().getProcessor(loadConfig);
public DouyuRecordLoadTask(LoadConfig loadConfig) {
douyuRecordProcessor = (DouyuRecordProcessor) new ProcessorFactory().getProcessor(loadConfig);
pipelineWriteJson = new PipelineWriteJson(loadConfig);
}
@Override
public void start() {
Spider.create(process_r_douyu)
Spider.create(douyuRecordProcessor)
// 设置起始Request
.addRequest(new Request(ConstPool.OCCUURL))
// 设置结果处理类
@@ -47,12 +46,12 @@ public class LoadTask_R_Douyu implements LoadTask {
@Override
public void end() {
process_r_douyu.end();
douyuRecordProcessor.end();
}
@Override
public boolean isRunning() {
return process_r_douyu.isRunning();
return douyuRecordProcessor.isRunning();
}
@Override

View File

@@ -0,0 +1,40 @@
package org.example.danmaku.core.factory;
import org.example.danmaku.core.control.LoadTask;
import org.example.danmaku.core.control.impl.BilibiliLiveLoadTask;
import org.example.danmaku.core.control.impl.DouyuRecordLoadTask;
import org.example.exception.FileCacheException;
import org.example.danmaku.pojo.download.LoadConfig;
import org.example.danmaku.pojo.download.assign.BilibiliLiveLoadConfig;
import org.example.danmaku.pojo.download.assign.DouyuRecordLoadConfig;
/**
* 弹幕下载任务工厂
* @author 燧枫
* @date 2023/4/23 19:36
*/
public class LoadTaskFactory {
/**
* 通过配置信息来获取一个任务
* @param loadConfig
* @return LoadTask
*/
public LoadTask getLoadTask(LoadConfig loadConfig) throws FileCacheException {
if (loadConfig == null) {
return null;
}
// 斗鱼录播
if (loadConfig instanceof DouyuRecordLoadConfig) {
return new DouyuRecordLoadTask((DouyuRecordLoadConfig) loadConfig);
}
// B站直播
else if (loadConfig instanceof BilibiliLiveLoadConfig) {
return new BilibiliLiveLoadTask((BilibiliLiveLoadConfig) loadConfig);
}
return null;
}
}

View File

@@ -0,0 +1,54 @@
package org.example.danmaku.core.factory;
import org.example.danmaku.core.processor.AbstractProcessor;
import org.example.danmaku.core.processor.BilibiliLiveProcessor;
import org.example.danmaku.core.processor.DouyuRecordProcessor;
import org.example.danmaku.pojo.download.LoadConfig;
import org.example.danmaku.pojo.download.assign.BilibiliLiveLoadConfig;
import org.example.danmaku.pojo.download.assign.DouyuRecordLoadConfig;
import org.example.danmaku.utils.CreeperConfig;
/**
* 处理器工厂
*
* @author 燧枫
* @date 2023/4/23 21:28
*/
public class ProcessorFactory {
private final String bi_userAgent = CreeperConfig.getProperty("dy.userAgent");
private final int bi_retryTimes = CreeperConfig.getIntProperty("dy.retryTimes");
private final int bi_retrySleepTime = CreeperConfig.getIntProperty("dy.retrySleepTime");
private final int bi_sleepTime = CreeperConfig.getIntProperty("dy.sleepTime");
private final String dy_userAgent = CreeperConfig.getProperty("bi.userAgent");
private final int dy_retryTimes = CreeperConfig.getIntProperty("bi.retryTimes");
private final int dy_retrySleepTime = CreeperConfig.getIntProperty("bi.retrySleepTime");
private final int dy_sleepTime = CreeperConfig.getIntProperty("bi.sleepTime");
/**
* 通过配置信息来获取一个处理器
*
* @param loadConfig
* @return AbstractProcessor
*/
public AbstractProcessor getProcessor(LoadConfig loadConfig) {
if (loadConfig == null) {
return null;
}
// 斗鱼录播
if (loadConfig instanceof DouyuRecordLoadConfig) {
return new DouyuRecordProcessor((DouyuRecordLoadConfig) loadConfig, dy_retryTimes,
dy_retrySleepTime, dy_userAgent, dy_sleepTime);
}
// B站直播
else if (loadConfig instanceof BilibiliLiveLoadConfig) {
return new BilibiliLiveProcessor((BilibiliLiveLoadConfig) loadConfig, bi_retryTimes,
bi_retrySleepTime, bi_userAgent, bi_sleepTime);
}
return null;
}
}

View File

@@ -1,9 +1,9 @@
package org.example.core.manager;
package org.example.danmaku.core.manager;
import org.example.core.control.LoadTask;
import org.example.core.factory.TaskFactory;
import org.example.danmaku.core.control.LoadTask;
import org.example.danmaku.core.factory.LoadTaskFactory;
import org.example.exception.FileCacheException;
import org.example.pojo.download.LoadConfig;
import org.example.danmaku.pojo.download.LoadConfig;
import java.util.Collections;
import java.util.Map;
@@ -19,27 +19,27 @@ import java.util.stream.Collectors;
public class LoadTaskManager {
private final ConcurrentHashMap<String, LoadTask> taskMap;
private final TaskFactory taskFactory;
private final LoadTaskFactory loadTaskFactory;
public LoadTaskManager() {
this.taskMap = new ConcurrentHashMap<>();
// 默认任务工场
this.taskFactory = new TaskFactory();
this.loadTaskFactory = new LoadTaskFactory();
}
public LoadTaskManager(TaskFactory customTaskFactory) {
public LoadTaskManager(LoadTaskFactory customLoadTaskFactory) {
this.taskMap = new ConcurrentHashMap<>();
// 自定义任务工场
this.taskFactory = customTaskFactory;
this.loadTaskFactory = customLoadTaskFactory;
}
// 创建一个任务并返回唯一的 key
public String addTask(LoadConfig loadConfig) throws FileCacheException {
public String creatTask(LoadConfig loadConfig) throws FileCacheException {
// 根据主播名和时间戳生成唯一的 key
String key = generateKey(loadConfig);
// 创建任务并将其添加到任务映射中
LoadTask loadTask = taskFactory.getLoadTask(loadConfig);
LoadTask loadTask = loadTaskFactory.getLoadTask(loadConfig);
taskMap.put(key, loadTask);
// 返回生成的 key

View File

@@ -1,11 +1,10 @@
package org.example.core.pipeline;
package org.example.danmaku.core.pipeline;
import org.example.cache.FileCache;
import org.example.cache.FileCacheManagerInstance;
import org.example.exception.FileCacheException;
import org.example.pojo.Barrage;
import org.example.pojo.configfile.BarrageSaveFile;
import org.example.pojo.download.LoadConfig;
import org.example.danmaku.pojo.Barrage;
import org.example.danmaku.pojo.configfile.BarrageSaveFile;
import org.example.danmaku.pojo.download.LoadConfig;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.pipeline.Pipeline;
@@ -32,7 +31,7 @@ public class PipelineWriteJson implements Pipeline {
this.loadConfig = loadConfig;
this.cache = new ConcurrentLinkedQueue<>();
this.barrageSaveFile = new BarrageSaveFile(loadConfig, cache);
this.filecache = new FileCache(barrageSaveFile,0,10*1024);
this.filecache = new FileCache(barrageSaveFile, 0, 10 * 1024);
} catch (FileCacheException e) {
throw new RuntimeException(e);
}
@@ -56,7 +55,7 @@ public class PipelineWriteJson implements Pipeline {
Barrage barrage;
while ((barrage = cache.poll()) != null) {
if(successCount%1000==0){
if (successCount % 1000 == 0) {
System.out.print("写入:" + successCount);
}
try {

View File

@@ -1,4 +1,4 @@
package org.example.core.processor;
package org.example.danmaku.core.processor;
import us.codecraft.webmagic.Page;

View File

@@ -0,0 +1,98 @@
package org.example.danmaku.core.processor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.example.danmaku.pojo.Barrage;
import org.example.danmaku.pojo.download.assign.BilibiliLiveLoadConfig;
import us.codecraft.webmagic.Page;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* (B站直播)下载与处理
* @author 燧枫
* @date 2023/4/23 18:53
*/
public class BilibiliLiveProcessor extends AbstractProcessor {
BilibiliLiveLoadConfig loadConfig;
// 前缀url
private String urlPrefix = "https://api.live.bilibili.com/ajax/msg?roomid=";
private String url = "";
// 弹幕去重
private Set<String> processedMids;
// 开始时间戳
private Long startTime;
public BilibiliLiveProcessor(BilibiliLiveLoadConfig loadConfig, int retryTimes, int retrySleepTime, String userAgent, int sleepTime) {
super(retryTimes, retrySleepTime, userAgent, sleepTime);
this.loadConfig = loadConfig;
this.processedMids = new HashSet<>();
}
@Override
public void process(Page page) {
// 首次加载
if (isFirst) {
isFirst = false;
init(page);
return;
}
processOnePage(page);
}
// 初始化
public void init(Page page) {
// 拼装url
url = urlPrefix + loadConfig.getRoomId();
// 记录开始爬取时间
startTime = System.currentTimeMillis() / 1000;
page.addTargetRequest(url + "&_=" + System.currentTimeMillis());
}
// 处理一页请求
private void processOnePage(Page page) {
page.addTargetRequest(url + "&_=" + System.currentTimeMillis());
// 是否结束爬虫
if (!isRunning()) {
return;
}
JSONArray roomArray = JSON.parseObject(page.getRawText()).getJSONObject("data").getJSONArray("room");
// 弹幕列表
List<Barrage> barrageList = new ArrayList<>();
for (Object o : roomArray) {
JSONObject temp = (JSONObject) o;
JSONObject check_info = temp.getJSONObject("check_info");
// 唯一id
String mid = check_info.getString("ct");
if (!processedMids.contains(mid)) { // 检查mid是否已经存在
// 真实时间戳
Long timeReal = check_info.getLong("ts");
// 相对时间戳
Long timeIndex = timeReal - startTime;
if (timeIndex < 0) timeIndex = 0L;
// 弹幕内容
String content = temp.getString("text");
Barrage barrage = new Barrage(mid, timeReal, timeIndex, content);
barrageList.add(barrage);
processedMids.add(mid); // 将mid添加到已处理集合中
}
}
// 发送给pipeline
page.putField("barrageList", barrageList);
}
}

View File

@@ -1,14 +1,13 @@
package org.example.core.processor;
package org.example.danmaku.core.processor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.example.pojo.Barrage;
import org.example.pojo.download.assign.LoadConfig_R_Douyu;
import org.example.danmaku.pojo.Barrage;
import org.example.danmaku.pojo.download.assign.DouyuRecordLoadConfig;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.utils.HttpConstant;
import java.util.ArrayList;
import java.util.List;
@@ -18,14 +17,14 @@ import java.util.List;
* @author 燧枫
* @date 2023/4/23 18:54
*/
public class Process_R_Douyu extends AbstractProcessor {
public class DouyuRecordProcessor extends AbstractProcessor {
LoadConfig_R_Douyu loadConfig;
DouyuRecordLoadConfig loadConfig;
// 前缀url
private String urlPrefix = "https://v.douyu.com/wgapi/vod/center/getBarrageListByPage?vid=";
public Process_R_Douyu(LoadConfig_R_Douyu loadConfig, int retryTimes, int retrySleepTime, String userAgent, int sleepTime) {
public DouyuRecordProcessor(DouyuRecordLoadConfig loadConfig, int retryTimes, int retrySleepTime, String userAgent, int sleepTime) {
super(retryTimes, retrySleepTime, userAgent, sleepTime);
this.loadConfig = loadConfig;
}
@@ -36,11 +35,8 @@ public class Process_R_Douyu extends AbstractProcessor {
if (isFirst) {
init(page);
isFirst = false;
Request request = new Request();
request.putExtra("type", "singer");
return;
}
processOnePage(page);
}

View File

@@ -1,4 +1,4 @@
package org.example.pojo;
package org.example.danmaku.pojo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;

View File

@@ -1,9 +1,9 @@
package org.example.pojo.configfile;
package org.example.danmaku.pojo.configfile;
import org.example.common.ConfigFile;
import org.example.danmaku.pojo.download.LoadConfig;
import org.example.exception.FileCacheException;
import org.example.pojo.Barrage;
import org.example.pojo.download.LoadConfig;
import org.example.danmaku.pojo.Barrage;
import org.example.util.FileUtil;
import org.example.util.JsonFileUtil;
@@ -24,6 +24,7 @@ import static org.example.constpool.ConstPool.BARRAGE_ROOT;
* 弹幕保存文件配置类通过LoadConfig获取主播信息平台信息弹幕爬取时间生成弹幕文件
*/
public class BarrageSaveFile extends ConfigFile<ConcurrentLinkedQueue<Barrage>> {
private LoadConfig loadConfig;
public BarrageSaveFile(LoadConfig loadConfig, ConcurrentLinkedQueue<Barrage> data) throws FileCacheException {
@@ -48,7 +49,7 @@ public class BarrageSaveFile extends ConfigFile<ConcurrentLinkedQueue<Barrage>>
Path path = Path.of(rootPath);
//TODO 待移除 建立主播文件夹
try {
if(!Files.exists(path)){
if (!Files.exists(path)) {
Files.createDirectory(path);
}

View File

@@ -1,7 +1,7 @@
package org.example.pojo.download;
package org.example.danmaku.pojo.download;
import lombok.Data;
import org.example.utils.FormatUtil;
import org.example.danmaku.utils.FormatUtil;
/**
* 单次弹幕爬取信息配置基类

View File

@@ -1,9 +1,8 @@
package org.example.pojo.download.assign;
package org.example.danmaku.pojo.download.assign;
import lombok.Data;
import org.example.constpool.ConstPool;
import org.example.pojo.download.LoadConfig;
import org.springframework.stereotype.Component;
import org.example.danmaku.pojo.download.LoadConfig;
/**
* (B站直播)配置信息
@@ -11,12 +10,12 @@ import org.springframework.stereotype.Component;
* @date 2023/4/23 17:59
*/
@Data
public class LoadConfig_L_Bilibili extends LoadConfig {
public class BilibiliLiveLoadConfig extends LoadConfig {
// 房间号
private String roomId;
public LoadConfig_L_Bilibili(String anchorName, String roomId) {
public BilibiliLiveLoadConfig(String anchorName, String roomId) {
super(ConstPool.BILIBILI, ConstPool.ACTION_LIVE, anchorName);
this.roomId = roomId;
}

View File

@@ -1,9 +1,8 @@
package org.example.pojo.download.assign;
package org.example.danmaku.pojo.download.assign;
import lombok.Data;
import org.example.constpool.ConstPool;
import org.example.pojo.download.LoadConfig;
import org.springframework.stereotype.Component;
import org.example.danmaku.pojo.download.LoadConfig;
/**
* (斗鱼录播)配置信息
@@ -11,12 +10,12 @@ import org.springframework.stereotype.Component;
* @date 2023/4/23 16:30
*/
@Data
public class LoadConfig_R_Douyu extends LoadConfig {
public class DouyuRecordLoadConfig extends LoadConfig {
// 录播vid
private String vid;
public LoadConfig_R_Douyu(String anchorName, String vid) {
public DouyuRecordLoadConfig(String anchorName, String vid) {
super(ConstPool.DOUYU, ConstPool.ACTION_RECORD, anchorName);
this.vid = vid;
}

View File

@@ -1,4 +1,4 @@
package org.example.utils;
package org.example.danmaku.utils;
import java.io.IOException;
import java.io.InputStream;
@@ -9,14 +9,14 @@ import java.util.Properties;
* @author 燧枫
* @date 2023/4/24 11:02
*/
public class PachongConfig {
public class CreeperConfig {
private static final Properties properties;
static {
properties = new Properties();
String configFileName = "pachong.properties";
try (InputStream inputStream = PachongConfig.class.getClassLoader().getResourceAsStream(configFileName)) {
String configFileName = "creeper.properties";
try (InputStream inputStream = CreeperConfig.class.getClassLoader().getResourceAsStream(configFileName)) {
if (inputStream != null) {
properties.load(inputStream);
} else {

View File

@@ -1,4 +1,4 @@
package org.example.utils;
package org.example.danmaku.utils;
import java.text.SimpleDateFormat;
import java.util.Date;

View File

@@ -0,0 +1,86 @@
package org.example.video;
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashMap;
import java.util.Map;
import org.example.video.core.handle.FlvHandle;
import org.example.video.core.monitor.StatusMonitor;
import org.example.video.core.parser.BilibiliFlvUrlParse;
/**
*
* @author 燧枫
* @date 2023/5/18 22:23
*/
public class BilibiliLiveStreamTest {
static String videoPath = "F:\\";
public void startFlvStreamParse(String url, StatusMonitor statusMonitor, OutputStream fileIO, Map<String, String> headers) {
new Thread(() -> {
FlvHandle f = new FlvHandle();
try {
URLConnection conn = new URL(url).openConnection();
if (headers != null) {
for (Map.Entry<String, String> entry : headers.entrySet()) {
conn.setRequestProperty(entry.getKey(), entry.getValue());
}
}
InputStream in = conn.getInputStream();
f.parseStream(in, statusMonitor, fileIO);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
public void startFlvStreamParse(String url, StatusMonitor statusMonitor, OutputStream fileIO) {
Map<String, String> defaultHeaders = new HashMap<>();
defaultHeaders.put("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36");
defaultHeaders.put("Origin", "https://live.bilibili.com");
defaultHeaders.put("Referer", "https://live.bilibili.com/");
startFlvStreamParse(url, statusMonitor, fileIO, defaultHeaders);
}
public static void main(String[] args) {
String roomId = "732";
int qn = 10000;
StatusMonitor statusMonitor = new StatusMonitor();
BilibiliLiveStreamTest bilibiliLiveStreamTest = new BilibiliLiveStreamTest();
BilibiliFlvUrlParse bilibiliFlvUrlParse = new BilibiliFlvUrlParse();
try {
String url = bilibiliFlvUrlParse.getFlvUrl(roomId, qn);
System.out.println(url);
OutputStream fileIO = new FileOutputStream(videoPath + roomId + ".flv");
bilibiliLiveStreamTest.startFlvStreamParse(url, statusMonitor, fileIO);
while (true) {
// 注意我们在这里没有包含 isStopFlag 和 getVideoTagSpeed 方法,
// 因为这些方法的实现可能需要更复杂的逻辑
if (statusMonitor.isConnectionClosed()) {
System.out.println("连接中断,已停止录制...");
break;
}
// 输出实时的下载状态
System.out.println("平均下载速度:" + statusMonitor.getDownloadSpeedAvg() + " B/s");
System.out.println("瞬时下载速度:" + statusMonitor.getDownloadSpeed() + " B/s");
System.out.println("已写入数据量:" + statusMonitor.getDownloadedBytes() + " bytes");
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,36 @@
package org.example.video.core.handle;
import org.example.video.core.monitor.StatusMonitor;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Flv下载器
* @author 燧枫
* @date 2023/5/19 0:22
*/
public class FlvHandle {
public void parseStream(InputStream in, StatusMonitor statusMonitor, OutputStream out) {
try {
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
out.write(buffer, 0, bytesRead);
statusMonitor.addDownloadedBytes(bytesRead);
}
} catch (Exception e) {
statusMonitor.setConnectionClosed(true);
e.printStackTrace();
} finally {
try {
in.close();
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

View File

@@ -0,0 +1,46 @@
package org.example.video.core.monitor;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author 燧枫
* @date 2023/5/19 0:00
*/
public class StatusMonitor {
private long downloadedBytes = 0;
private boolean connectionClosed = false;
private long startTime = System.currentTimeMillis();
private Queue<Long> recentBytes = new LinkedList<>();
private static final int QUEUE_SIZE = 10; // 储存最近10秒的下载字节数
public synchronized void addDownloadedBytes(int bytes) {
downloadedBytes += bytes;
recentBytes.offer((long) bytes);
if (recentBytes.size() > QUEUE_SIZE) {
recentBytes.poll();
}
}
public synchronized long getDownloadedBytes() {
return downloadedBytes;
}
public synchronized double getDownloadSpeed() { // 瞬时下载速度,单位:字节每秒
return recentBytes.stream().mapToLong(Long::longValue).sum();
}
public synchronized double getDownloadSpeedAvg() { // 平均下载速度,单位:字节每秒
long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
return (double) downloadedBytes / Math.max(1, elapsedSeconds);
}
public synchronized void setConnectionClosed(boolean connectionClosed) {
this.connectionClosed = connectionClosed;
}
public synchronized boolean isConnectionClosed() {
return connectionClosed;
}
}

View File

@@ -0,0 +1,55 @@
package org.example.video.core.parser;
import org.example.video.utils.HttpClientUtil;
import org.json.JSONArray;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.Map;
/**
* b站flv链接解析器
*
* @author 燧枫
* @date 2023/5/16 20:42
*/
public class BilibiliFlvUrlParse {
String urlFormat = "https://api.live.bilibili.com/xlive/web-room/v2/index/getRoomPlayInfo?room_id=%s&protocol=0,1&format=0,1,2&codec=0,1&qn=%d&platform=web&ptype=8";
// 通过房间号roomId,qn(画质,10000为原画画质),得到flv链接
public String getFlvUrl(String roomId, int qn) throws Exception {
String url = String.format(urlFormat, roomId, qn);
String response = HttpClientUtil.get(url);
JSONObject js = new JSONObject(response);
if (js.getInt("code") != 0) {
throw new Exception(js.getString("message"));
}
if (js.getJSONObject("data").getInt("live_status") != 1) {
throw new Exception("主播未开播或已下播");
}
JSONObject data = js.getJSONObject("data");
JSONArray streamList = data.getJSONObject("playurl_info").getJSONObject("playurl").getJSONArray("stream");
for (int i = 0; i < streamList.length(); i++) {
JSONObject stream = streamList.getJSONObject(i);
if ("http_stream".equals(stream.getString("protocol_name"))) {
JSONArray formatList = stream.getJSONArray("format");
for (int j = 0; j < formatList.length(); j++) {
JSONObject format = formatList.getJSONObject(j);
if ("flv".equals(format.getString("format_name"))) {
String host = format.getJSONArray("codec").getJSONObject(0).getJSONArray("url_info").getJSONObject(0).getString("host");
String extra = format.getJSONArray("codec").getJSONObject(0).getJSONArray("url_info").getJSONObject(0).getString("extra");
String baseUrl = format.getJSONArray("codec").getJSONObject(0).getString("base_url");
return host + baseUrl + extra;
}
}
}
}
throw new Exception("没有找到直播流地址");
}
}

View File

@@ -0,0 +1,36 @@
package org.example.video.pool;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
/**
* http请求链接池
* @author 燧枫
* @date 2023/5/16 19:22
*/
public class HttpClientPool {
private static final int MAX_TOTAL_CONNECTIONS = 100;
private static final int DEFAULT_MAX_PER_ROUTE = 20;
private static HttpClientPool instance;
private final CloseableHttpClient httpClient;
private HttpClientPool() {
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager();
connManager.setMaxTotal(MAX_TOTAL_CONNECTIONS);
connManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
this.httpClient = HttpClients.custom().setConnectionManager(connManager).build();
}
public static synchronized HttpClientPool getInstance() {
if (instance == null) {
instance = new HttpClientPool();
}
return instance;
}
public CloseableHttpClient getHttpClient() {
return httpClient;
}
}

View File

@@ -0,0 +1,83 @@
package org.example.video.utils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.example.video.pool.HttpClientPool;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* 简单的get,post请求工具类
* @author 燧枫
* @date 2023/5/16 19:24
*/
public class HttpClientUtil {
private static final CloseableHttpClient httpClient = HttpClientPool.getInstance().getHttpClient();
// get请求
public static String get(String url) {
return get(url, null);
}
// post请求
public static String post(String url, String json) {
return post(url, json, null);
}
// get请求带请求头
public static String get(String url, Map<String, String> headers) {
HttpGet httpGet = new HttpGet(url);
addHeaders(httpGet, headers);
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
return handleResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// post请求带请求头
public static String post(String url, String json, Map<String, String> headers) {
HttpPost httpPost = new HttpPost(url);
httpPost.setEntity(new StringEntity(json, ContentType.APPLICATION_JSON));
addHeaders(httpPost, headers);
try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
return handleResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// 统一处理请求头
private static void addHeaders(HttpRequest httpRequest, Map<String, String> headers) {
if (headers != null && !headers.isEmpty()) {
for (Map.Entry<String, String> entry : headers.entrySet()) {
httpRequest.addHeader(entry.getKey(), entry.getValue());
}
}
}
// 统一封装成response
private static String handleResponse(HttpResponse response) {
HttpEntity entity = response.getEntity();
if (entity != null) {
try {
return EntityUtils.toString(entity, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException("Response entity is null");
}
}
}

View File

@@ -7,7 +7,7 @@ dy.sleepTime=100
bi.userAgent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.48
bi.retryTimes=1
bi.retrySleepTime=10
bi.retrySleepTime=100
bi.threadCnt=1
bi.emptySleepTime=10
bi.sleepTime=10
bi.emptySleepTime=100
bi.sleepTime=100

View File

@@ -1,8 +1,8 @@
package org.example;
import org.example.exception.FileCacheException;
import org.example.pojo.configfile.BarrageSaveFile;
import org.example.pojo.download.LoadConfig;
import org.example.danmaku.pojo.configfile.BarrageSaveFile;
import org.example.danmaku.pojo.download.LoadConfig;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ConcurrentLinkedQueue;

View File

@@ -1,111 +0,0 @@
package org.example.coreTest;
import org.example.core.control.LoadTask;
import org.example.core.factory.TaskFactory;
import org.example.core.manager.LoadTaskManager;
import org.example.exception.FileCacheException;
import org.example.pojo.download.assign.LoadConfig_R_Douyu;
import org.junit.jupiter.api.Test;
/**
* @author 燧枫
* @date 2023/4/25 15:30
*/
public class CoreTest {
@Test
public void HelloWorld() throws InterruptedException, FileCacheException {
// 创建一个任务管理器
LoadTaskManager manager = new LoadTaskManager();
// 创建一个斗鱼录播的配置类
LoadConfig_R_Douyu dsm = new LoadConfig_R_Douyu("大司马", "Kp1QM8gb4ow7k4bj");
// 创建一个任务,返回一个唯一的key
String dsmTask = manager.addTask(dsm);
// 开启此次下载任务
manager.startTask(dsmTask);
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println(dsmTask + "(运行状态):" + manager.isTaskRunning(dsmTask));
System.out.println(dsmTask + "(缓存中弹幕条数):" + manager.getCacheSize(dsmTask));
}
// 刷入数据到其他地方去
System.out.println(dsmTask + "(刷入数据):" + manager.flushTaskCacheAndSave(dsmTask));
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println(dsmTask + "(运行状态):" + manager.isTaskRunning(dsmTask));
System.out.println(dsmTask + "(缓存中弹幕条数):" + manager.getCacheSize(dsmTask));
}
// 刷入数据到其他地方去
System.out.println(dsmTask + "(刷入数据):" + manager.flushTaskCacheAndSave(dsmTask));
// 结束掉任务
manager.endTask(dsmTask);
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println(dsmTask + "(运行状态):" + manager.isTaskRunning(dsmTask));
System.out.println(dsmTask + "(缓存中弹幕条数):" + manager.getCacheSize(dsmTask));
}
}
@Test
public void testTask() throws InterruptedException, FileCacheException {
LoadConfig_R_Douyu dsm = new LoadConfig_R_Douyu("大司马", "Kp1QM8gb4ow7k4bj");
TaskFactory taskFactory = new TaskFactory();
LoadTask loadTask = taskFactory.getLoadTask(dsm);
loadTask.start();
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println(loadTask.isRunning());
System.out.println(loadTask.getCacheSize());
}
loadTask.end();
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println(loadTask.isRunning());
System.out.println(loadTask.getCacheSize());
}
}
@Test
public void testTaskManage() throws InterruptedException, FileCacheException {
LoadConfig_R_Douyu dsm = new LoadConfig_R_Douyu("大司马", "Kp1QM8gb4ow7k4bj");
LoadConfig_R_Douyu swk = new LoadConfig_R_Douyu("孙悟空", "X3JzMaObqDYvPQro");
LoadTaskManager manager = new LoadTaskManager();
String damKey = manager.addTask(dsm);
String swkKey = manager.addTask(swk);
manager.startTask(damKey);
manager.startTask(swkKey);
System.out.println(damKey);
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
System.out.println(damKey + "(运行状态):" + manager.isTaskRunning(damKey));
System.out.println(damKey + "(弹幕条数):" + manager.getCacheSize(damKey));
System.out.println(swkKey + "(运行状态):" + manager.isTaskRunning(swkKey));
System.out.println(swkKey + "(弹幕条数):" + manager.getCacheSize(swkKey));
}
System.out.println(damKey + "(刷入数据):" + manager.flushTaskCacheAndSave(damKey));
System.out.println(swkKey + "(刷入数据):" + manager.flushTaskCacheAndSave(swkKey));
manager.endTask(damKey);
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
System.out.println(damKey + "(运行状态):" + manager.isTaskRunning(damKey));
System.out.println(damKey + "(弹幕条数):" + manager.getCacheSize(damKey));
System.out.println(swkKey + "(运行状态):" + manager.isTaskRunning(swkKey));
System.out.println(swkKey + "(弹幕条数):" + manager.getCacheSize(swkKey));
}
manager.taskFinished(damKey);
}
}

View File

@@ -5,8 +5,8 @@ import org.example.exception.FileCacheException;
import org.example.init.FileCacheManagerInit;
import org.example.init.InitWorld;
import org.example.init.ModuleSrcConfigFileInit;
import org.example.pojo.configfile.BarrageSaveFile;
import org.example.pojo.download.LoadConfig;
import org.example.danmaku.pojo.configfile.BarrageSaveFile;
import org.example.danmaku.pojo.download.LoadConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;