代码合并问题与前端BUG修正

This commit is contained in:
inrgihc
2026-05-14 21:36:41 +08:00
parent 9dc565038d
commit dd59a8a559
29 changed files with 43 additions and 499 deletions

3
.gitignore vendored
View File

@@ -33,6 +33,7 @@ logs/
### AI Coding ###
.workbuddy/
.codebuddy/
.opencode/
.cloudecode/
.claude/
.trae/

View File

@@ -547,7 +547,7 @@
</el-tag>
</div>
<div class="field-tips" v-if="!canPreviewDdl">
请先选择源端数据源源端模式名目的端数据源目的端模式名并完成表名配置方可使用此功能
请先选择源端数据源源端模式名目的端数据源目的端模式名配置方式方可使用此功能
</div>
<div class="field-tips" v-else>
查看和编辑系统为每张目标表自动生成的 CREATE TABLE 建表语句适用于需要调整字段类型添加表属性等场景
@@ -886,7 +886,7 @@ export default {
&& this.dataform.sourceSchema
&& this.dataform.targetConnectionId > 0
&& this.dataform.targetSchema
&& (this.dataform.sourceTables.length > 0 || this.dataform.includeOrExclude === 'INCLUDE')
&& !!this.dataform.includeOrExclude
}
},
methods: {
@@ -1232,23 +1232,31 @@ export default {
handlePreviewDdl: function () {
if (!this.canPreviewDdl) return;
var tablesToPreview = [];
if (this.dataform.includeOrExclude === 'EXCLUDE' && this.dataform.sourceTables.length > 0) {
var includeOrExclude = this.dataform.includeOrExclude;
if (includeOrExclude === 'INCLUDE') {
// 包含模式:选了表则用选中的,否则用全部表
tablesToPreview = this.dataform.sourceTables && this.dataform.sourceTables.length > 0
? this.dataform.sourceTables.slice()
: JSON.parse(JSON.stringify(this.sourceSchemaTables));
} else if (includeOrExclude === 'EXCLUDE') {
// 排除模式:全部表去掉排除的
tablesToPreview = JSON.parse(JSON.stringify(this.sourceSchemaTables));
for (var i = 0; i < this.dataform.sourceTables.length; ++i) {
var one = this.dataform.sourceTables[i];
tablesToPreview.some(function (item, index) {
if (item === one) { tablesToPreview.splice(index, 1); return true; }
if (this.dataform.sourceTables && this.dataform.sourceTables.length > 0) {
var excludeSet = this.dataform.sourceTables;
tablesToPreview = tablesToPreview.filter(function (t) {
return excludeSet.indexOf(t) < 0;
});
}
} else if (this.dataform.includeOrExclude === 'INCLUDE') {
tablesToPreview = this.dataform.sourceTables.length > 0
? this.dataform.sourceTables
: JSON.parse(JSON.stringify(this.sourceSchemaTables));
} else {
// 未选配置方式:使用全部表
tablesToPreview = JSON.parse(JSON.stringify(this.sourceSchemaTables));
}
if (tablesToPreview.length === 0) {
this.$message.warning('没有可预览的表,请先配置表名');
this.$message.warning('当前没有可预览的表,请检查源端模式下的表列表或调整表名配置');
return;
}
// 前端已将 EXCLUDE 模式计算为最终净表列表,统一以 isInclude=true 传给后端
var isIncludeForApi = true;
var self = this;
var tableNameMapper = this.dataform.tableNameMapper || [];
var tableNameCase = this.dataform.tableNameCase || 'NONE';
@@ -1259,7 +1267,7 @@ export default {
data: JSON.stringify({
id: this.dataform.sourceConnectionId,
schemaName: this.dataform.sourceSchema,
isInclude: this.dataform.includeOrExclude === 'INCLUDE',
isInclude: isIncludeForApi,
tableNames: tablesToPreview,
nameMapper: tableNameMapper,
tableNameCase: tableNameCase

View File

@@ -32,9 +32,6 @@ import springfox.documentation.swagger2.annotations.EnableSwagger2;
@EnableSwagger2
public class SwaggerConfig {
@Value("${swagger.enable}")
private boolean isEnableSwagger;
public static final String API_PREFIX = "/dbswitch/admin/api";
public static final String API_V1 = API_PREFIX + "/v1";
private static final String API_DEFAULT_PACKAGE = "org.dromara.dbswitch.admin.controller.privateapi";
@@ -60,7 +57,6 @@ public class SwaggerConfig {
pars.add(ticketPar.build());
return new Docket(DocumentationType.SWAGGER_2)
.enable(isEnableSwagger)
.groupName("需要认证的接口")
.apiInfo(createApiInfo())
.select()
@@ -74,7 +70,6 @@ public class SwaggerConfig {
@Bean(value = "publicApi")
public Docket publicApi() {
return new Docket(DocumentationType.SWAGGER_2)
.enable(isEnableSwagger)
.groupName("无需认证的接口")
.apiInfo(createApiInfo())
.select()

View File

@@ -26,18 +26,14 @@ dbswitch:
configuration:
drivers-base-path: ${APP_DRIVERS_PATH}
# Swagger 两种方式都可以关闭
swagger:
enable: false
# Swagger在线接口开关
springfox:
documentation:
# 关键:全局禁用开关
enabled: false
auto-startup: false
swagger.v2:
enabled: false
openApi:
enabled: false
swagger-ui:
enabled: false
swagger:
enabled: false
v2:
enabled: false
auto-startup: false
enabled: false

View File

@@ -1 +1 @@
<!DOCTYPE html><html><head><meta charset=utf-8><meta name=viewport content="width=device-width,initial-scale=1"><title>异构数据迁移工具</title><link href=/static/css/app.ab8c389732d782a30e517c00e6a3fcb4.css rel=stylesheet></head><body><div id=app></div><script type=text/javascript src=/static/js/manifest.f7865f1f37621bebd5d4.js></script><script type=text/javascript src=/static/js/vendor.8200341f98478c8f7552.js></script><script type=text/javascript src=/static/js/app.94027213e498a69e0bf6.js></script></body></html>
<!DOCTYPE html><html><head><meta charset=utf-8><meta name=viewport content="width=device-width,initial-scale=1"><title>异构数据迁移工具</title><link href=/static/css/app.d7e7a5c6581a03b1bcf1e52d06ecb8c0.css rel=stylesheet></head><body><div id=app></div><script type=text/javascript src=/static/js/manifest.0c8e102e3af4f84797f4.js></script><script type=text/javascript src=/static/js/vendor.8200341f98478c8f7552.js></script><script type=text/javascript src=/static/js/app.94027213e498a69e0bf6.js></script></body></html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,2 @@
!function(e){var c=window.webpackJsonp;window.webpackJsonp=function(n,a,f){for(var o,d,i,u=0,b=[];u<n.length;u++)d=n[u],r[d]&&b.push(r[d][0]),r[d]=0;for(o in a)Object.prototype.hasOwnProperty.call(a,o)&&(e[o]=a[o]);for(c&&c(n,a,f);b.length;)b.shift()();if(f)for(u=0;u<f.length;u++)i=t(t.s=f[u]);return i};var n={},r={25:0};function t(c){if(n[c])return n[c].exports;var r=n[c]={i:c,l:!1,exports:{}};return e[c].call(r.exports,r,r.exports,t),r.l=!0,r.exports}t.e=function(e){var c=r[e];if(0===c)return new Promise(function(e){e()});if(c)return c[2];var n=new Promise(function(n,t){c=r[e]=[n,t]});c[2]=n;var a=document.getElementsByTagName("head")[0],f=document.createElement("script");f.type="text/javascript",f.charset="utf-8",f.async=!0,f.timeout=12e4,t.nc&&f.setAttribute("nonce",t.nc),f.src=t.p+"static/js/"+e+"."+{0:"01b50a1b04adf9f0bb05",1:"fed755dff73c923d7f16",2:"22e821ec8c9909429f09",3:"fe1c5c7d88360f5f98ae",4:"08d31a8e3923090c336f",5:"2ba81da755c2838a1bc0",6:"3d1dbfbc1cadc39a24c4",7:"ff59d5e8c3f5f6990b25",8:"fcf46f880cde60008573",9:"7482dae9c15b18627401",10:"80e7801109b57f1a3d72",11:"1b5e1420657152e07dc2",12:"ddd8e2a887b327f4ab2d",13:"a1fc0f5e975ae62c9069",14:"572f60f49a9868fcd0fe",15:"78a79600adcd06361937",16:"66f213fc135ae6a5d988",17:"bfc722f22acf2bd97223",18:"f7e4b41780ec8b255f7a",19:"852c4c5cdec46703f5a8",20:"a476a38191cde522905a",21:"e2737863c3879ad32e68",22:"ac430228b1464a487c04"}[e]+".js";var o=setTimeout(d,12e4);function d(){f.onerror=f.onload=null,clearTimeout(o);var c=r[e];0!==c&&(c&&c[1](new Error("Loading chunk "+e+" failed.")),r[e]=void 0)}return f.onerror=f.onload=d,a.appendChild(f),n},t.m=e,t.c=n,t.d=function(e,c,n){t.o(e,c)||Object.defineProperty(e,c,{configurable:!1,enumerable:!0,get:n})},t.n=function(e){var c=e&&e.__esModule?function(){return e.default}:function(){return e};return t.d(c,"a",c),c},t.o=function(e,c){return Object.prototype.hasOwnProperty.call(e,c)},t.p="/",t.oe=function(e){throw console.error(e),e}}([]);
//# sourceMappingURL=manifest.0c8e102e3af4f84797f4.js.map

View File

@@ -1,2 +0,0 @@
!function(e){var n=window.webpackJsonp;window.webpackJsonp=function(r,a,o){for(var f,d,i,u=0,b=[];u<r.length;u++)d=r[u],t[d]&&b.push(t[d][0]),t[d]=0;for(f in a)Object.prototype.hasOwnProperty.call(a,f)&&(e[f]=a[f]);for(n&&n(r,a,o);b.length;)b.shift()();if(o)for(u=0;u<o.length;u++)i=c(c.s=o[u]);return i};var r={},t={25:0};function c(n){if(r[n])return r[n].exports;var t=r[n]={i:n,l:!1,exports:{}};return e[n].call(t.exports,t,t.exports,c),t.l=!0,t.exports}c.e=function(e){var n=t[e];if(0===n)return new Promise(function(e){e()});if(n)return n[2];var r=new Promise(function(r,c){n=t[e]=[r,c]});n[2]=r;var a=document.getElementsByTagName("head")[0],o=document.createElement("script");o.type="text/javascript",o.charset="utf-8",o.async=!0,o.timeout=12e4,c.nc&&o.setAttribute("nonce",c.nc),o.src=c.p+"static/js/"+e+"."+{0:"01b50a1b04adf9f0bb05",1:"fed755dff73c923d7f16",2:"22e821ec8c9909429f09",3:"fe1c5c7d88360f5f98ae",4:"08d31a8e3923090c336f",5:"2ba81da755c2838a1bc0",6:"5d5ddd62bcebfdd45c40",7:"5832297a8db38b5a6b9a",8:"fcf46f880cde60008573",9:"7482dae9c15b18627401",10:"80e7801109b57f1a3d72",11:"1b5e1420657152e07dc2",12:"ddd8e2a887b327f4ab2d",13:"a1fc0f5e975ae62c9069",14:"572f60f49a9868fcd0fe",15:"78a79600adcd06361937",16:"66f213fc135ae6a5d988",17:"bfc722f22acf2bd97223",18:"f7e4b41780ec8b255f7a",19:"852c4c5cdec46703f5a8",20:"a476a38191cde522905a",21:"e2737863c3879ad32e68",22:"ac430228b1464a487c04"}[e]+".js";var f=setTimeout(d,12e4);function d(){o.onerror=o.onload=null,clearTimeout(f);var n=t[e];0!==n&&(n&&n[1](new Error("Loading chunk "+e+" failed.")),t[e]=void 0)}return o.onerror=o.onload=d,a.appendChild(o),r},c.m=e,c.c=r,c.d=function(e,n,r){c.o(e,n)||Object.defineProperty(e,n,{configurable:!1,enumerable:!0,get:r})},c.n=function(e){var n=e&&e.__esModule?function(){return e.default}:function(){return e};return c.d(n,"a",n),n},c.o=function(e,n){return Object.prototype.hasOwnProperty.call(e,n)},c.p="/",c.oe=function(e){throw console.error(e),e}}([]);
//# sourceMappingURL=manifest.f7865f1f37621bebd5d4.js.map

View File

@@ -241,15 +241,6 @@ public enum ProductTypeEnum {
"jdbc:uxdb://",
new String[]{"jdbc:uxdb://{host}[:{port}]/[{database}][\\?{params}]"},
"jdbc:uxdb://127.0.0.1:52025/uxdb"),
/**
* KAFKA数据库类型
*/
KAFKA(25, "\"", "Kafka", "org.apache.kafka.connect.jdbc.JdbcSinkConnector", 9092,
"SELECT 1",
"jdbc:kafka://",
new String[]{"jdbc:kafka://{host}[:{port}]/[{database}][\\?{params}]"},
"jdbc:kafka://127.0.0.1:9092/test"),
;

View File

@@ -1,28 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.dromara.dbswitch</groupId>
<artifactId>dbswitch-product</artifactId>
<version>2.0.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dbswitch-product-kafka</artifactId>
<dependencies>
<dependency>
<groupId>org.dromara.dbswitch</groupId>
<artifactId>dbswitch-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.dromara.dbswitch</groupId>
<artifactId>dbswitch-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -1,67 +0,0 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package org.dromara.dbswitch.product.kafka;
import org.dromara.dbswitch.core.annotation.Product;
import org.dromara.dbswitch.common.type.ProductTypeEnum;
import org.dromara.dbswitch.core.features.DefaultProductFeatures;
import org.dromara.dbswitch.core.features.ProductFeatures;
import org.dromara.dbswitch.core.provider.AbstractFactoryProvider;
import org.dromara.dbswitch.core.provider.meta.MetadataProvider;
import org.dromara.dbswitch.core.provider.manage.TableManageProvider;
import org.dromara.dbswitch.core.provider.query.TableDataQueryProvider;
import org.dromara.dbswitch.core.provider.sync.TableDataSynchronizeProvider;
import org.dromara.dbswitch.core.provider.write.TableDataWriteProvider;
import javax.sql.DataSource;
/**
* Kafka工厂提供者
* Kafka作为消息队列系统不支持传统的关系型数据库操作
* 主要用于数据写入场景
*/
@Product(ProductTypeEnum.KAFKA)
public class KafkaFactoryProvider extends AbstractFactoryProvider {
public KafkaFactoryProvider(DataSource dataSource) {
super(dataSource);
}
@Override
public ProductFeatures getProductFeatures() {
return new DefaultProductFeatures();
}
@Override
public MetadataProvider createMetadataQueryProvider() {
return new KafkaMetadataQueryProvider(this);
}
@Override
public TableDataQueryProvider createTableDataQueryProvider() {
// Kafka不支持数据查询返回null或使用默认实现
return super.createTableDataQueryProvider();
}
@Override
public TableDataWriteProvider createTableDataWriteProvider(boolean useInsert) {
return new KafkaTableDataWriteProvider(this);
}
@Override
public TableManageProvider createTableManageProvider() {
return new KafkaTableManageProvider(this);
}
@Override
public TableDataSynchronizeProvider createTableDataSynchronizeProvider() {
return new KafkaTableDataSynchronizer(this);
}
}

View File

@@ -1,132 +0,0 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package org.dromara.dbswitch.product.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dromara.dbswitch.core.provider.ProductFactoryProvider;
import org.dromara.dbswitch.core.provider.meta.AbstractMetadataProvider;
import org.dromara.dbswitch.core.schema.ColumnDescription;
import org.dromara.dbswitch.core.schema.ColumnMetaData;
import org.dromara.dbswitch.core.schema.SourceProperties;
import org.dromara.dbswitch.core.schema.TableDescription;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Kafka元数据查询提供者
* Kafka作为消息队列其"表"概念对应Topic
*/
@Slf4j
public class KafkaMetadataQueryProvider extends AbstractMetadataProvider {
public KafkaMetadataQueryProvider(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
}
@Override
public List<String> querySchemaList(Connection connection) {
// Kafka没有schema概念返回空列表或默认值
List<String> schemas = new ArrayList<>();
schemas.add("default");
return schemas;
}
@Override
public List<TableDescription> queryTableList(Connection connection, String schemaName) {
// Kafka的"表"对应Topic这里需要通过JDBC驱动获取Topic列表
// 由于Kafka JDBC驱动的特殊性返回空列表或需要从配置中获取
log.warn("Kafka does not support traditional table listing. Topics should be configured externally.");
return new ArrayList<>();
}
@Override
public TableDescription queryTableMeta(Connection connection, String schemaName, String tableName) {
// Kafka的"表"对应Topic
TableDescription td = new TableDescription();
td.setSchemaName(schemaName);
td.setTableName(tableName);
td.setTableType("TOPIC");
td.setRemarks("Kafka Topic: " + tableName);
return td;
}
@Override
public List<String> queryTableColumnName(Connection connection, String schemaName, String tableName) {
// Kafka消息没有固定的列结构返回空列表
log.warn("Kafka topics do not have fixed column structures.");
return new ArrayList<>();
}
@Override
public List<ColumnDescription> queryTableColumnMeta(Connection connection, String schemaName, String tableName) {
// Kafka消息没有固定的列结构
log.warn("Kafka topics do not have fixed column metadata.");
return new ArrayList<>();
}
@Override
public List<String> queryTablePrimaryKeys(Connection connection, String schemaName, String tableName) {
// Kafka不支持主键概念
return new ArrayList<>();
}
@Override
public String getTableDDL(Connection connection, String schemaName, String tableName) {
// Kafka不支持传统DDL
return "-- Kafka Topic: " + tableName + "\n-- DDL not applicable for Kafka";
}
@Override
public String getViewDDL(Connection connection, String schemaName, String tableName) {
// Kafka不支持视图
return null;
}
@Override
public List<ColumnDescription> querySelectSqlColumnMeta(Connection connection, String sql) {
// Kafka不支持SQL查询
log.warn("Kafka does not support SQL queries for metadata extraction.");
return new ArrayList<>();
}
@Override
public void testQuerySQL(Connection connection, String sql) {
// Kafka不支持SQL查询测试
log.warn("Kafka does not support SQL query testing.");
}
@Override
public String getFieldDefinition(ColumnMetaData v, List<String> pks, boolean useAutoInc,
boolean addCr, boolean withRemarks) {
// Kafka不需要字段定义
return "-- Field definition not applicable for Kafka";
}
@Override
public List<String> getTableColumnCommentDefinition(TableDescription td, List<ColumnDescription> cds) {
return Collections.emptyList();
}
@Override
public void preAppendCreateTableSql(StringBuilder builder) {
// Kafka不需要建表前缀
}
@Override
public void postAppendCreateTableSql(StringBuilder builder, String tblComment, List<String> primaryKeys,
SourceProperties tblProperties) {
// Kafka不需要建表后缀
builder.append("-- Kafka topic creation is managed externally");
}
}

View File

@@ -1,89 +0,0 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package org.dromara.dbswitch.product.kafka;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dbswitch.core.provider.ProductFactoryProvider;
import org.dromara.dbswitch.core.provider.sync.DefaultTableDataSynchronizeProvider;
import java.util.Collections;
import java.util.List;
/**
* Kafka表数据同步提供者
* Kafka作为消息队列主要支持数据插入操作发送消息
* 不支持传统的update和delete操作
*/
@Slf4j
public class KafkaTableDataSynchronizer extends DefaultTableDataSynchronizeProvider {
public KafkaTableDataSynchronizer(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
}
@Override
public void prepare(String schemaName, String tableName, List<String> fieldNames, List<String> pks) {
// Kafka不需要主键概念但为了兼容性仍然调用父类方法
log.info("Preparing Kafka data synchronizer for topic: {}", tableName);
// 初始化基本字段,但不依赖传统数据库的元数据
this.columnType = Collections.emptyMap();
this.fieldOrders = fieldNames;
this.pksOrders = pks != null ? pks : Collections.emptyList();
// 为Kafka构建INSERT语句模板
this.insertStatementSql = getInsertPrepareStatementSql(schemaName, tableName, fieldNames);
// Kafka不支持UPDATE和DELETE设置为null或空
this.updateStatementSql = null;
this.deleteStatementSql = null;
// 设置参数类型数组
this.insertArgsType = new int[fieldNames.size()];
// 对于Kafka所有字段都视为字符串类型
for (int i = 0; i < fieldNames.size(); i++) {
insertArgsType[i] = java.sql.Types.VARCHAR;
}
this.updateArgsType = new int[0];
this.deleteArgsType = new int[0];
}
@Override
public long executeInsert(List<Object[]> records) {
if (records == null || records.isEmpty()) {
return 0;
}
log.info("Executing insert of {} records to Kafka topic", records.size());
// 使用父类的插入逻辑通过JDBC驱动向Kafka发送消息
try {
return super.executeInsert(records);
} catch (Exception e) {
log.error("Failed to insert records to Kafka", e);
throw new RuntimeException("Failed to send messages to Kafka topic", e);
}
}
@Override
public long executeUpdate(List<Object[]> records) {
// Kafka不支持更新操作
log.warn("Kafka does not support update operations. Records will be ignored.");
return 0;
}
@Override
public long executeDelete(List<Object[]> records) {
// Kafka不支持删除操作
log.warn("Kafka does not support delete operations. Records will be ignored.");
return 0;
}
}

View File

@@ -1,86 +0,0 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package org.dromara.dbswitch.product.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.dromara.dbswitch.core.provider.ProductFactoryProvider;
import org.dromara.dbswitch.core.provider.write.DefaultTableDataWriteProvider;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
/**
* Kafka表数据写入提供者
* Kafka作为消息队列数据写入对应于向Topic发送消息
*/
@Slf4j
public class KafkaTableDataWriteProvider extends DefaultTableDataWriteProvider {
public KafkaTableDataWriteProvider(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
}
@Override
public void prepareWrite(String schemaName, String tableName, List<String> fieldNames) {
// Kafka不需要预处理但需要设置schema和tableName
this.columnType = Collections.emptyMap();
this.schemaName = schemaName;
this.tableName = tableName;
log.info("Preparing to write to Kafka topic: {}", tableName);
}
@Override
public long write(List<String> fieldNames, List<Object[]> recordValues) {
if (CollectionUtils.isEmpty(fieldNames) || CollectionUtils.isEmpty(recordValues)) {
return 0L;
}
log.info("Writing {} records to Kafka topic: {}", recordValues.size(), tableName);
// 对于Kafka我们需要将记录转换为消息并发送到Topic
// 这里使用JDBC驱动提供的机制来发送数据
try (Connection connection = getDataSource().getConnection()) {
for (Object[] record : recordValues) {
// 构建INSERT语句用于Kafka JDBC连接器
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ").append(tableName).append(" (");
for (int i = 0; i < fieldNames.size(); i++) {
if (i > 0) sb.append(", ");
sb.append(fieldNames.get(i));
}
sb.append(") VALUES (");
for (int i = 0; i < record.length; i++) {
if (i > 0) sb.append(", ");
if (record[i] == null) {
sb.append("NULL");
} else {
sb.append("'").append(record[i].toString().replace("'", "''")).append("'");
}
}
sb.append(")");
try (Statement stmt = connection.createStatement()) {
stmt.executeUpdate(sb.toString());
}
}
} catch (SQLException e) {
throw new RuntimeException("Failed to write data to Kafka topic: " + tableName, e);
}
return recordValues.size();
}
}

View File

@@ -1,38 +0,0 @@
// Copyright tang. All rights reserved.
// https://gitee.com/inrgihc/dbswitch
//
// Use of this source code is governed by a BSD-style license
//
// Author: tang (inrgihc@126.com)
// Date : 2020/1/2
// Location: beijing , china
/////////////////////////////////////////////////////////////
package org.dromara.dbswitch.product.kafka;
import lombok.extern.slf4j.Slf4j;
import org.dromara.dbswitch.core.provider.ProductFactoryProvider;
import org.dromara.dbswitch.core.provider.manage.DefaultTableManageProvider;
/**
* Kafka表管理提供者
* Kafka作为消息队列不支持传统的表操作如truncate和drop
*/
@Slf4j
public class KafkaTableManageProvider extends DefaultTableManageProvider {
public KafkaTableManageProvider(ProductFactoryProvider factoryProvider) {
super(factoryProvider);
}
@Override
public void truncateTableData(String schemaName, String tableName) {
// Kafka不支持truncate操作Topic中的数据通过保留策略自动清理
log.warn("Kafka does not support truncate operation. Topic data is managed by retention policy.");
}
@Override
public void dropTable(String schemaName, String tableName) {
// Kafka不支持drop操作Topic需要通过Kafka管理工具删除
log.warn("Kafka does not support drop operation. Topics should be managed through Kafka admin tools.");
}
}

View File

@@ -1 +0,0 @@
org.dromara.dbswitch.product.kafka.KafkaFactoryProvider

View File

@@ -133,11 +133,6 @@
<artifactId>dbswitch-product-tdengine</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.dromara.dbswitch</groupId>
<artifactId>dbswitch-product-kafka</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -37,7 +37,6 @@
<module>dbswitch-product-doris</module>
<module>dbswitch-product-oceanbase</module>
<module>dbswitch-product-tdengine</module>
<module>dbswitch-product-kafka</module>
</modules>
</project>