jjzjj

SeaTunnel安装及测试

小小大数据 2024-01-29 原文

一. 简介

架构于Spark和Flink之上的分布式的支持海量数据实时同步的高性能分布式数据集成平台

官网:http://seatunnel.incubator.apache.org/

Gitee:https://gitee.com/seatunnel/incubator-seatunnel

日常海量数据同步中常见问题:

1.缺乏统一的数据集成平台:
在类似SeaTunnel数据集成平台出来之前,开发者使用的是 Datax+Azkaban 分别作为数据采集的组件和调度执行的组件,使用 Git 作为代码管理;一个熟手配置一个数据集成任务大约通过 7 步:编辑,Commit,Push,打包,上传,页面操作,数据校验等, 一个任务的开发最起码需要 30~60 分钟,还必须保证这中间不能被打扰,没有出现异常情况,十分麻烦。

2.数据孤岛现象(eg:ClickHouse):

3.数据丢失与重复
4.任务堆积与延迟
5.吞吐量低
6.应用到生产环境周期长
7.缺少应用运行状态监控

特点:

低代码开发,易用性高,易维护
实时流式处理
离线多源数据分析
高性能、海量数据处理能力
模块化和插件化,易于扩展

支持的插件:

Input:File,Hdfs,Kafka,S3,Socket及自行开发的plugin

Filter:SQL,K-V,Json,Add,Drop,Split,Table及自行开发的plugin

Output:ES,File,Hdfs,Kafka,Mysql,S3,JDBC,Stdout及自行开发的plugin

插件支持情况:

Spark插件


Flink插件:


SeaTunnel官网暂不支持Doris,StarRocks作为数据Source端,若有具体业务需求可能需要具体开发;
Doris,StarRocks作为Sink端,在数据格式/质量上基本没有问题,只是不同的插件时间上有差异

日常使用:

会被用来做出仓入仓工具;

编辑配置文件,然后SeaTunnel将之转换为具体的Spark或Flink任务

竞品比较:

参考文章
SeaTunnel在oppo特征平台的集成实践

升级OLAP平台,SeaTunnel在唯品会的实践

二. 基本原理

两个启动脚本:

提交spark任务用 start-seatunnel-spark.sh
提交flink任务则用 start-seatunnel-flink.sh

SeaTunnel配置文件应包含四部分配置组件:
env{} → source{} → transform{} → sink{}

source,transform,sink这三部分可以看做一个pipeline。

在Source和Sink数据同构时,如果业务上也不需要对数据进行转换,那么transform中的内容可以为空,反之transform具体需根据业务情况来定。

env块:

env块中可以直接写spark或flink支持的配置项。比如引擎选择,并行度,检查点间隔时间,检查点hdfs路径等。在SeaTunnel源码的ConfigKeyName类中,声明了env块中所有可用的key。

source块:

用来声明数据源,可以声明多个连接器

source {

      hdfs{ ... }

      elasticsearch { .... }

      jdbc { .... }

}

transform块:

可能根据具体业务进一步处理数据,因此提供转换模块;当然也可以直接从Source到Sink。

可以同时声明多个转换插件:添加校验、转换、日期、删除、Grok、Json、KV、大/小写、删除、重命名、重分区、替换、样本、拆分、Sql、表、截断、Unid,自主开发的过滤器插件等

sink块:

定义如何以及在何处写入数据,将数据从一个位置同步到另一个位置

小结:

三.SeaTunnel本地环境搭建

基础:

安装:

#下载解压

sudo wget  "apache-seatunnel-incubating-2.1.3-bin.tar.gz" 

sudo tar -zxvf  apache-seatunnel-incubating-2.1.3-bin.tar.gz


#配置环境变量

sudo vim  /etc/profile

export SEATUNNEL_HOME=/usr/local/apache-seatunnel-incubating-2.1.3

export PATH=$PATH:${SEATUNNEL_HOME}/bin


#环境变量生效

source  /etc/profile

测试:

从Hive中抽数插入到CK中的配置,数据源是Hive的一张表,通过SeaTunnel插件根据id字段进行分片插入CK集群不同分片


#创建测试文件
vim  /usr/local/apache-seatunnel-incubating-2.1.3/config/hive-console.conf


#配置Spark参数
spark {  spark.sql.catalogImplementation = "hive"
  spark.app.name = "hive2clickhouse"
  spark.executor.instances = 30
  spark.executor.cores = 1 
  spark.executor.memory = "2g"
  spark.ui.port = 13000
}


input {
    hive {
        pre_sql = "select id,name,create_time from table"
        table_name = "table_tmp"
    }
}


filter {
    convert {
        source_field = "data_source"
        new_type = "UInt8"
    }

    org.interestinglab.waterdrop.filter.Slice {
        source_table_name = "table_tmp"
        source_field = "id"
        slice_num = 2
        slice_code = 0
        result_table_name = "table_8123"
    }
    org.interestinglab.waterdrop.filter.Slice {
        source_table_name = "table_tmp"
        source_field = "id"
        slice_num = 2
        slice_code = 1
        result_table_name = "table_8124"
    }
}


output {
    clickhouse {
        source_table_name="table_8123"
        host = "ip1:8123"
        database = "db_name"
        username="username"
        password="pwd"
        table = "model_score_local"
        fields = ["id","name","create_time"]
            clickhouse.socket_timeout = 50000
            retry_codes = [209, 210]
            retry = 3
            bulk_size = 500000
    }
    clickhouse {
        source_table_name="table_8124"
        host = "ip2:8123"
        database = "db_name"
        username="username"
        password="pwd"
        table = "model_score_local"
        fields = ["id","name","create_time"]
            clickhouse.socket_timeout = 50000
            retry_codes = [209, 210]
            retry = 3
            bulk_size = 500000
    }
}


#启动测试
/bin/start-waterdrop.sh --master local --deploy-mode client --config/hive-console.conf

四. 应用案例

Hive数据导入StarRocks

#创建文件任务
vim config/text01.conf

#配置内容
env {
   spark.app.name = "ads_product_sale_d"
   spark.executor.instances = 1
   spark.dynamicAllocation.maxExecutors = 20
   spark.executor.cores = 1
   spark.executor.memory = "8g"
   spark.sql.catalogImplementation = "hive"
   spark.sql.hive.verifyPartitionPath = "true"
  }

source  {
       hive {
        pre_sql = "select platform_id,cat_id from dws.dws_prd_product_detail_d where  day_id ='2022-12- 14‘ "
        result_table_name = "ads_product_sale_d"
       }
  }

transform {
   sql {
    sql = "select platform_id,cat_id from ads_product_sale_d",
    table_name = "ads_product_sale_d"
   } 

}

  sink {
      Doris {
          fenodes="10.4.102.46:8030"
          database="example_db"
          table="ads_product_sale_d"
          user="root"
          password=""
          batch_size=10000
          doris.column_separator="\t"
          doris.columns="platform_id,cat_id"
      }          
  }


#启动任务
bin/start-seatunnel-flink.sh --config config/text01.conf  -i age=18

五. 发展前景——企业级服务化之路

基于 SeaTunnel 实现了可视化的数据集成服务,我相信服务化必然,一定,100%是SeaTunnel未来的不可缺少的一部分。

核心目标:
脚本管控:让用户通过 WebUI,以参数的形式配置任务信息而非脚本的方式来表达自己的业务需求,这样无论是对于非专业人员还是开发人员都会节省很大精力
作业及实例管理:任务触发,查看日志记录,重跑,Kill等
整体架构设计:

**管控:**对数据源、用户、权限、脚本、作业、实例的管控,任何在 WebUi 上看到的内容都会被管控
管理能力:针对于数据源的增删改查以及连接性测试,数据源的映射、数据探查等的能力;此外,页面上每个能看见的页面、菜单、按钮、数据,资源管理、自定义 connector、trasnform 管理、项目空间等都应该纳入管控

开发能力:基本上就是针对的脚本的增删改查(保存、执行、停止、测试、发布、基本参数展示、调度参数配置、告警参数调整、脚本内容、数据源、transform、并发等)

运维能力:任务的不同时期所要求的内容也不一样,作业运维包括手动触发,暂停等;实例运维则是重跑,Kill,查看日志记录等

调度:
根据配置的不同,负责将任务丢至不同的调度系统中进行调度与执行;上层的作业和实例的管控也依赖于具体的调度系统;

crontabl-local:我们的 SeaTunnel 自成体系,自身就提供了简单的定时调度能力,用户只需要修改下配置,即可快速上手,完成定时数据集成任务的配置与发布

执行(task-wrapper):
1.考虑到仅靠 SeaTunnel 原生的能力是不够的,所以分为两部分: pre-task 和 post-task,与 SeaTunnel 的执行引擎进行组装,变成真正的执行内容

2.完整独立的执行脚本

SeaTunnel WebUi整体项目开发进度
参考文章:
SeaTunnel企业化服务之路

有关SeaTunnel安装及测试的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

  3. ruby - 完全离线安装RVM - 2

    我打算为ruby​​脚本创建一个安装程序,但我希望能够确保机器安装了RVM。有没有一种方法可以完全离线安装RVM并且不引人注目(通过不引人注目,就像创建一个可以做所有事情的脚本而不是要求用户向他们的bash_profile或bashrc添加一些东西)我不是要脚本本身,只是一个关于如何走这条路的快速指针(如果可能的话)。我们还研究了这个很有帮助的问题:RVM-isthereawayforsimpleofflineinstall?但有点误导,因为答案只向我们展示了如何离线在RVM中安装ruby。我们需要能够离线安装RVM本身,并查看脚本https://raw.github.com/wayn

  4. ruby - 使用 C 扩展开发 ruby​​gem 时,如何使用 Rspec 在本地进行测试? - 2

    我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当

  5. ruby-on-rails - rails 目前在重启后没有安装 - 2

    我有一个奇怪的问题:我在rvm上安装了ruby​​onrails。一切正常,我可以创建项目。但是在我输入“railsnew”时重新启动后,我有“程序'rails'当前未安装。”。SystemUbuntu12.04ruby-v"1.9.3p194"gemlistactionmailer(3.2.5)actionpack(3.2.5)activemodel(3.2.5)activerecord(3.2.5)activeresource(3.2.5)activesupport(3.2.5)arel(3.0.2)builder(3.0.0)bundler(1.1.4)coffee-rails(

  6. ruby - 如何为 emacs 安装 ruby​​-mode - 2

    我刚刚为fedora安装了emacs。我想用emacs编写ruby。为ruby​​提供代码提示、代码完成类型功能所需的工具、扩展是什么? 最佳答案 ruby-mode已经包含在Emacs23之后的版本中。不过,它也可以通过ELPA获得。您可能感兴趣的其他一些事情是集成RVM、feature-mode(Cucumber)、rspec-mode、ruby-electric、inf-ruby、rinari(用于Rails)等。这是我当前用于Ruby开发的Emacs配置:https://github.com/citizen428/emacs

  7. ruby - Ruby 的 Hash 在比较键时使用哪种相等性测试? - 2

    我有一个围绕一些对象的包装类,我想将这些对象用作散列中的键。包装对象和解包装对象应映射到相同的键。一个简单的例子是这样的:classAattr_reader:xdefinitialize(inner)@inner=innerenddefx;@inner.x;enddef==(other)@inner.x==other.xendenda=A.new(o)#oisjustanyobjectthatallowso.xb=A.new(o)h={a=>5}ph[a]#5ph[b]#nil,shouldbe5ph[o]#nil,shouldbe5我试过==、===、eq?并散列所有无济于事。

  8. ruby - RSpec - 使用测试替身作为 block 参数 - 2

    我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere

  9. ruby-on-rails - 无法在centos上安装therubyracer(V8和GCC出错) - 2

    我正在尝试在我的centos服务器上安装therubyracer,但遇到了麻烦。$geminstalltherubyracerBuildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingtherubyracer:ERROR:Failedtobuildgemnativeextension./usr/local/rvm/rubies/ruby-1.9.3-p125/bin/rubyextconf.rbcheckingformain()in-lpthread...yescheckingforv8.h...no***e

  10. ruby - 通过 RVM (OSX Mountain Lion) 安装 Ruby 2.0.0-p247 时遇到问题 - 2

    我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search

随机推荐