在快速发展的数据驱动业务环境中,确保数据在各个系统间高效、准确地同步至关重要。为了进一步的数据处理和分析,经常需要将这些数据同步到其他数据处理系统。Apache SeaTunnel 提供了一个强大而灵活的数据集成框架,使得从 SQL Server 到其他系统的数据同步变得简单且高效。
本文档将指导您如何配置 Apache SeaTunnel,使用 JDBC SQL Server Source Connector 来实现数据的有效同步。
SQL Server
JDBC SQL Server Source Connector
支持 SQL Server 版本
- 服务器:2008(或更高版本,仅供信息参考)
支持以下引擎
Spark
Flink
Seatunnel Zeta
主要特点
- [x] 批处理
- [ ] 流处理
- [x] 精准一次性
- [x] 列投影
- [x] 并行处理
- [x] 支持用户定义拆分
支持查询 SQL 并能够实现投影效果。
描述
通过 JDBC 读取外部数据源数据。
支持的数据源信息
数据源 | 支持的版本 | 驱动 | URL | Maven |
---|---|---|---|---|
SQL Server | 支持版本 >= 2008 | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | 下载 |
数据库依赖
请下载与 ‘Maven’ 对应的支持列表,并将其复制到 ‘$SEATNUNNEL_HOME/plugins/jdbc/lib/’ 工作目录
例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
数据类型映射
SQL Server 数据类型 | Seatunnel 数据类型 |
---|---|
BIT | BOOLEAN |
TINYINT SMALLINT |
SHORT |
INTEGER | INT |
BIGINT | LONG |
DECIMAL NUMERIC MONEY SMALLMONEY |
DECIMAL((指定列的指定列大小)+1, (获取指定列的小数点右边的数字的数量。))) |
REAL | FLOAT |
FLOAT | DOUBLE |
CHAR NCHAR VARCHAR NTEXT NVARCHAR TEXT |
STRING |
DATE | LOCAL_DATE |
TIME | LOCAL_TIME |
DATETIME DATETIME2 SMALLDATETIME DATETIMEOFFSET |
LOCAL_DATE_TIME |
TIMESTAMP BINARY VARBINARY IMAGE UNKNOWN |
尚不支持 |
源选项
名称 | 类型 | 必需 | 默认值 | 描述 |
---|---|---|---|---|
url | 字符串 | 是 | – | JDBC 连接的 URL。例如:jdbc:sqlserver://127.0.0.1:1434;database=TestDB |
driver | 字符串 | 是 | – | 用于连接到远程数据源的 JDBC 类名,如果使用 SQL Server,则值为 com.microsoft.sqlserver.jdbc.SQLServerDriver 。 |
user | 字符串 | 否 | – | 连接实例的用户名 |
password | 字符串 | 否 | – | 连接实例的密码 |
query | 字符串 | 是 | – | 查询语句 |
connection_check_timeout_sec | 整数 | 否 | 30 | 等待用于验证连接的数据库操作完成的秒数 |
partition_column | 字符串 | 否 | – | 并行处理的分区列,仅支持数值类型。 |
partition_lower_bound | 长整数 | 否 | – | 用于扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。 |
partition_upper_bound | 长整数 | 否 | – | 用于扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。 |
partition_num | 整数 | 否 | 作业并行度 | 分区计数的数量,仅支持正整数。默认值为作业并行度。 |
fetch_size | 整数 | 否 | 0 | 对返回大量对象的查询,您可以配置查询中使用的行抓取大小,以减少满足选择条件所需的数据库命中次数,从而提高性能。 零表示使用 JDBC 默认值。 |
common-options | 否 | – | 源插件的常见参数,请参阅 源常用选项 以获取详细信息。 |
提示
如果未设置 partition_column,则将以单一并发运行;如果设置了 partition_column,则将根据任务的并发度进行并行执行。
任务示例
简单:
简单的单一任务以读取数据表
# 定义运行时环境
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 1
job.mode = "BATCH"
}
source{
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
query = "select * from full_types_jdbc"
}
}
transform { # 如果您想要获取有关如何配置 seatunnel 和查看变换插件的完整列表的更多信息, # 请转到 [seatunnel.apache.org/docs/transform-v2/sql](https://seatunnel.apache.org/docs/transform-v2/sql)
}
sink {
Console {}
}
并行:
使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
# 根据需要定义查询逻辑
query = "select * from full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}
并行:
使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
# 根据需要定义查询逻辑
query = "select * from full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}
并行:
使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
# 根据需要定义查询逻辑
query = "select * from full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}
并行:
使用您配置的分片字段和分片数据并行读取您的查询表,如果您希望读取整个表,可以这样做:
env {
# 您可以在此处设置 Flink 配置
execution.parallelism = 10
job.mode = "BATCH"
}
source {
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
# 根据需要定义查询逻辑
query = "select * from full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}
分段并行读取示例:
这是一个快速并行读取数据的分片示例
env {
# 您可以在此处设置引擎配置
execution.parallelism = 10
}
source {
# 这是一个示例源插件,仅用于测试和展示源插件的功能
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
query = "select * from column_type_test.dbo.full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 片段数量
partition_num = 10
}
# 如果您想要获取有关如何配置 Seatunnel 和查看源插件的完整列表的更多信息,
# 请转到 https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
}
transform {
# 如果您想要获取有关如何配置 Seatunnel 和查看转换插件的完整列表的更多信息,
# 请转到 https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
# 如果您想要获取有关如何配置 Seatunnel 和查看接收插件的完整列表的更多信息,
# 请转到 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}
本文由 白鲸开源科技 提供发布支持!