设计账号模块:账号通道改造、视频通道改造。引入PostWorker链接两端

This commit is contained in:
welsir
2024-05-17 23:55:49 +08:00
parent 656ade3856
commit 0fc5db0834
23 changed files with 228 additions and 234 deletions

View File

@@ -8,7 +8,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Description
* @Description 用户通道API
* @Author welsir
* @Date 2023/11/17 8:34
*/

View File

@@ -1,12 +1,10 @@
package org.example.api;
import org.example.bean.section.PackageSection;
import org.example.constpool.PluginName;
import org.example.core.guard.VideoPushGuard;
import org.example.init.InitPluginRegister;
import org.example.plugin.CommonPlugin;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.example.core.guard.VideoPushChannelGuard;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@@ -15,12 +13,13 @@ import javax.annotation.Resource;
* @Author welsir
* @Date 2023/9/5 22:36
*/
@Component
@RestController(value = "/")
public class VideoPushApi {
@Resource
VideoPushGuard videoPushGuard;
VideoPushChannelGuard videoPushGuard;
@RequestMapping(method = RequestMethod.POST,value = "/push")
public void pushVideo(PackageSection obj){
videoPushGuard.sendVideo(obj);
}

View File

@@ -17,7 +17,7 @@ import javax.annotation.Resource;
import java.util.*;
/**
* @Description 账号通道
* @Description 账号通道绑定
* @Author welsir
* @Date 2023/11/17 8:18
*/
@@ -117,13 +117,12 @@ public class AccountBindChannel extends SpringBootPlugin {
" PRIMARY KEY (id),\n" +
" UNIQUE (name)\n" +
")");
ArrayList<Channel> channels = new ArrayList<>();
Channel channel = new Channel();
channel.setRoute("*.*.*");
channel.setName("default");
channel.setId(1L);
channels.add(channel);
sqlInitHelper.initData(channels,channelMapper);
sqlInitHelper.initData(List.of(channel),channelMapper);
sqlInitHelper.initTable("account_channel","CREATE TABLE `account_channel` (\n" +
" `id` INTEGER NOT NULL,\n" +
" `channel_id` TEXT ,\n" +

View File

@@ -6,9 +6,11 @@ import lombok.extern.slf4j.Slf4j;
import org.example.api.VideoPublishApi;
import org.example.bean.section.PackageSection;
import org.example.core.channel.AccountBindChannel;
import org.example.core.guard.VideoPushChannelGuard;
import org.example.core.route.DefaultRouteRuler;
import org.example.pojo.Account;
import org.example.pojo.PacketSectionVideo;
import org.example.pojo.VideoToPublish;
import org.example.pojo.VideoToPush;
import org.springframework.scheduling.annotation.Scheduled;
@@ -21,7 +23,7 @@ import java.nio.file.Path;
import java.util.*;
/**
* @Description 视频管道交换机
* @Description 用户视频管道
* @Author welsir
* @Date 2023/9/4 22:07
*/
@@ -29,7 +31,7 @@ import java.util.*;
@Slf4j
public class Exchange {
//存储不同管道对应的切片集合
private final Map<String, List<PackageSection>> channels = new HashMap<>();
private final Map<String, List<PacketSectionVideo>> channels = new HashMap<>();
//存储管道的匹配规则
private Map<String,String> channelRoute;
@@ -39,18 +41,18 @@ public class Exchange {
AccountBindChannel channel;
@Resource
VideoPublishApi pushVideo;
@Resource
VideoPushChannelGuard videoCollections;
@Scheduled(fixedDelay = 5000)
public void publishVideo(){
public void work(){
log.info("listen video to push...");
Map<String, List<Account>> channelAccount = channel.getChannelAccount();
channels.forEach((k,v)->{
if(channels.get(k)==null||channelAccount.get(k)==null){
return;
}
log.debug("channel:"+k+" videos:"+ Arrays.toString(v.toArray()));
List<Account> accountList = channelAccount.get(k);
List<PackageSection> packageSections = channels.get(k);
List<PacketSectionVideo> packageSections = channels.get(k);
for (Account account : accountList) {
for (PackageSection packageSection : packageSections) {
VideoToPublish video = new VideoToPublish();
@@ -70,15 +72,14 @@ public class Exchange {
});
}
public void pushToQueue(String routingKey,PackageSection packageSection) {
public void pushToQueue(String routingKey,PacketSectionVideo packageSection) {
channelRoute = channel.getChannelRoute();
for (String key : channelRoute.keySet()) {
String route = channelRoute.get(key);
boolean flag = defaultRouteRuler.matchRoute(routingKey, route);
if (flag){
channels.putIfAbsent(key,new ArrayList<>());
List<PackageSection> packageSections = channels.get(key);
List<PacketSectionVideo> packageSections = channels.get(key);
packageSections.add(packageSection);
}
}

View File

@@ -0,0 +1,36 @@
package org.example.core.exchange;
import org.example.core.guard.VideoPushChannelGuard;
import org.example.plugin.SpringBootPlugin;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Description
* @Author welsir
* @Date 2024/5/17 15:05
*/
@Component
public class PostWorkerManager extends SpringBootPlugin {
@Resource
Exchange exchange;
@Resource
VideoPushChannelGuard videoStorehouse;
@Override
public boolean init() {
return super.init();
}
//todo:想办法把exchange的每个不同类型管道提出来让一个线程处理
class PostWorker implements Runnable{
@Override
public void run() {
}
}
}

View File

@@ -0,0 +1,45 @@
package org.example.core.factory.videoPushFactory;
import org.example.bean.section.PackageSection;
import org.example.pojo.PacketSectionVideo;
import org.springframework.beans.BeanUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
/**
* @Description
* @Author welsir
* @Date 2024/5/17 12:13
*/
public class DefaultVideoPushStrategy extends StrategyFactory {
private List<String> priorityImportWord = new ArrayList<>();
@Override
public PacketSectionVideo rule(PackageSection obj) {
if(obj != null){
return wrapPacketSection(obj);
}
return null;
}
private PacketSectionVideo wrapPacketSection(PackageSection p){
PacketSectionVideo video = new PacketSectionVideo();
BeanUtils.copyProperties(p,video);
video.setId(UUID.randomUUID().toString());
return video;
}
@Override
public List<String> queryPriority(){
return priorityImportWord;
}
@Override
public void changePriority(List<String> newList){
priorityImportWord = newList;
}
}

View File

@@ -0,0 +1,18 @@
package org.example.core.factory.videoPushFactory;
/**
* @Description
* @Author welsir
* @Date 2024/5/17 12:14
*/
public abstract class StrategyFactory implements VideoStrategy{
public static StrategyFactory connect(int x){
switch (x) {
case 0:
return new DefaultVideoPushStrategy();
default:
throw new IllegalArgumentException("违法的策略选项!");
}
}
}

View File

@@ -0,0 +1,20 @@
package org.example.core.factory.videoPushFactory;
import org.example.bean.section.PackageSection;
import org.example.pojo.PacketSectionVideo;
import java.util.List;
/**
* @Description
* @Author welsir
* @Date 2024/5/17 12:24
*/
public interface VideoStrategy {
PacketSectionVideo rule(PackageSection obj);
List<String> queryPriority();
void changePriority(List<String> s);
}

View File

@@ -0,0 +1,65 @@
package org.example.core.guard;
import lombok.extern.slf4j.Slf4j;
import org.example.bean.section.PackageSection;
import org.example.core.factory.videoPushFactory.StrategyFactory;
import org.example.plugin.SpringGuardPlugin;
import org.example.pojo.PacketSectionVideo;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Description 切片推送通道 负责将切片工厂包装好的切片存放 并给切片打上额外标签
* @Author welsir
* @Date 2023/9/4 22:10
*/
@Component
@Slf4j
public class VideoPushChannelGuard extends SpringGuardPlugin {
StrategyFactory factory;
BlockingQueue<PackageSection> queue = new ArrayBlockingQueue<>(1024);
ConcurrentHashMap<String, PacketSectionVideo> videosCollection = new ConcurrentHashMap<>();
int val = 0;
@Override
public boolean init() {
factory = StrategyFactory.connect(val);
return true;
}
@Override
public void start() {
PackageSection p = queue.poll();
if(p==null){
return;
}
PacketSectionVideo packedVideo = factory.rule(p);
videosCollection.put(packedVideo.getId(),packedVideo);
}
public void sendVideo(PackageSection p) {
queue.add(p);
}
public List<String> priority(){
return factory.queryPriority();
}
public void editPriority(List<String> priority){
factory.changePriority(priority);
}
public List<PacketSectionVideo> getPacketSection(){
List<PacketSectionVideo> list = new ArrayList<>();
videosCollection.values().stream()
.filter(v -> !v.isFinish())
.forEach(list::add);
return list;
}
}

View File

@@ -1,70 +0,0 @@
package org.example.core.guard;
import lombok.extern.slf4j.Slf4j;
import org.example.bean.section.PackageSection;
import org.example.core.exchange.Exchange;
import org.example.mapper.AccountMapper;
import org.example.mapper.AccountTypeMapper;
import org.example.mapper.ChannelMapper;
import org.example.plugin.SpringGuardPlugin;
import org.example.pojo.*;
import org.example.sql.SQLInitHelper;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @Description
* @Author welsir
* @Date 2023/9/4 22:10
*/
@Component
@Slf4j
public class VideoPushGuard extends SpringGuardPlugin {
private BlockingQueue<Object> receiveVideo;
@Resource
private AccountMapper accountMapper;
@Resource
ChannelMapper channelMapper;
@Resource
SQLInitHelper sqlInitHelper;
@Resource
Exchange exchange;
@Override
public boolean init() {
receiveVideo = new ArrayBlockingQueue<>(1024);
return true;
}
public void start() {
try {
log.debug("阻塞队列监听视频...");
Object videoMsg = receiveVideo.poll(5, TimeUnit.SECONDS);
if (videoMsg instanceof PackageSection) {
PackageSection video = (PackageSection) videoMsg;
StringBuilder route = new StringBuilder();
for (String label : video.getLabels()) {
if(route.length()!=0) {
route.append(".");
}
route.append(label);
}
exchange.pushToQueue(route.toString(),video);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void sendVideo(Object msg) {
receiveVideo.offer(msg);
}
}

View File

@@ -20,10 +20,8 @@ public class DefaultRouteRuler extends AbstractRouteRuler{
String[] routes = route.split("\\.");
String[] channelRoutes = channelRoute.split("\\.");
if(routes.length!=channelRoutes.length){
return false;
}
for (int i = 0; i < 3; i++) {
//这里匹配规则换成模糊匹配。只要前两位匹配成功即可
for (int i = 0; i < 2; i++) {
// 如果 channelRoute 的这一段不是 "*",则必须与 route 的相应段相等
if (!"*".equals(channelRoutes[i]) && !channelRoutes[i].equals(routes[i])) {
return false;

View File

@@ -0,0 +1,20 @@
package org.example.pojo;
import lombok.Data;
import org.example.bean.section.PackageSection;
/**
* @Description
* @Author welsir
* @Date 2024/5/17 14:27
*/
@Data
public class PacketSectionVideo extends PackageSection {
private String id;
private boolean priority;
private boolean isRelay;
private boolean isAuto = true;
private boolean isFinish;
}

View File

@@ -1,13 +0,0 @@
package org.example.pojo;
/**
* @Description
* @Author welsir
* @Date 2023/9/4 22:09
*/
public enum VideoType {
HAPPY,
SAD;
private VideoType() {
}
}

View File

@@ -14,6 +14,7 @@ import java.util.List;
@AllArgsConstructor
@NoArgsConstructor
public class PackageSection extends VideoSection{
private String title;
private String description;
private String coverPath;

View File

@@ -1,18 +0,0 @@
{
"data":{
"barrageScoreCurve":{
"scoreStrategy":"SCORING",
"splitStrategy":"ORDER",
"basicBarrageScore":5,
"duration":5000
},
"InstantSlicing":{
"handler":"ScheduleTime",
"ScheduleTime":{
"duration":1800000
}
},
"popularRange":{}
},
"updateTime":"2023-11-20 11:19:28"
}

View File

@@ -1,28 +0,0 @@
{
"data":{
"taskCenter":{
"queueCapacity":50,
"threads":10,
"waitingTime":1000
},
"spiderConfig":{
"douyu":{
"emptySleepTime":100,
"retrySleepTime":100,
"retryTimes":3,
"sleepTime":100,
"threadCnt":5,
"userAgent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.48"
},
"bilibili":{
"emptySleepTime":100,
"retrySleepTime":100,
"retryTimes":3,
"sleepTime":100,
"threadCnt":5,
"userAgent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.48"
}
}
},
"updateTime":"2023-11-20 11:19:28"
}

View File

@@ -1,6 +0,0 @@
{
"data":{
"task":[]
},
"updateTime":"2023-11-20 11:19:28"
}

View File

@@ -1,6 +0,0 @@
{
"data":{
"task":[]
},
"updateTime":"2023-11-21 12:25:02"
}

View File

@@ -1,14 +0,0 @@
{
"data":{
"LiverFollower":{
"checkTime":60000,
"focusBarrage":1,
"focusRecord":1,
"focusLive":1
},
"HotGuard":{
"GuardNum":10
}
},
"updateTime":"2023-11-20 11:19:28"
}

View File

@@ -1,8 +0,0 @@
{
"data":{
"LiveDownload":{
"showDownloadTable":false
}
},
"updateTime":"2023-11-20 11:19:28"
}

View File

@@ -1,45 +0,0 @@
{
"data":{
"src":{
"Account":"./config/Account",
"Creeper":"./config/Creeper",
"LiveRecord":"./config/LiveRecord",
"SectionWork":"./config/SectionWork",
"Hot":"./config/Hot",
"Barrage":"./config/Barrage",
"Section":"./config/Section",
"Publish":"./config/Publish"
},
"pluginStart":{
"HotGuard":true,
"SectionWorkShop":true,
"BarrageConfig":true,
"BarrageFileListen":false,
"BarrageEvent":true,
"channel":true,
"LiveConfig":true,
"BarrageScoreCurve":true,
"EmotionAnalysis":false,
"FileCacheManager":true,
"LiverFollower":true,
"TaskMonitor":true,
"HotConfig":true,
"TaskCenter":true,
"CreeperManager":true,
"DescriptionGenerate":true,
"InstantSlicing":true,
"LiveDownLoadManager":true,
"BarragePopularRange":true,
"HotRecommendation":true,
"NoticePlugin":true,
"TitleGenerate":true,
"LabelGenerate":true,
"VideoPush":true,
"AccountManager":true,
"LabelManager":true,
"CreeperConfig":true,
"OpenAPI":true
}
},
"updateTime":"2024-05-15 21:23:47"
}

Binary file not shown.