当前位置:   article > 正文

【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程_seatunnel web database type

seatunnel web database type

Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

file

本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

源码修改编译

克隆SeaYunnel-Web源码到本地

  git  clone https://github.com/apache/seatunnel-web.git

    在idea中打开项目

    升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

      <seatunnel-framework.version>2.3.3</seatunnel-framework.version>
      改为
      <seatunnel-framework.version>2.3.4</seatunnel-framework.version>
    • 1
    • 2

    因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

    社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

    org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

    public static class SeaTunnelDataTypeConvertor
            implements DataTypeConvertor<SeaTunnelDataType<?>> {
    
        @Override
        public SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {
            return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();
        }
    
        @Override
        public SeaTunnelDataType<?> toSeaTunnelType(
                SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
                throws DataTypeConvertException {
            return seaTunnelDataType;
        }
    
        @Override
        public SeaTunnelDataType<?> toConnectorType(
                SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)
                throws DataTypeConvertException {
            return seaTunnelDataType;
        }
    
        @Override
        public String getIdentity() {
            return "EngineDataTypeConvertor";
        }
    }
    // 改为
    public static class SeaTunnelDataTypeConvertor
                implements DataTypeConvertor<SeaTunnelDataType<?>> {
    
            @Override
            public SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {
                return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();
            }
    
            @Override
            public SeaTunnelDataType<?> toSeaTunnelType(
                    String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
                return seaTunnelDataType;
            }
    
            @Override
            public SeaTunnelDataType<?> toConnectorType(
                    String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {
                return seaTunnelDataType;
            }
    
            @Override
            public String getIdentity() {
                return "EngineDataTypeConvertor";
            }
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

    public TableSchemaServiceImpl() throws IOException {
        Common.setStarter(true);
        Set<PluginIdentifier> pluginIdentifiers =
                SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
        ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
        pluginIdentifiersList.addAll(pluginIdentifiers);
        List<URL> pluginJarPaths =
                new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
        //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
        if (!pluginJarPaths.isEmpty()) {
            //            List<URL> files = FileUtils.searchJarFiles(path);
            pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
            factory =
                    new DataTypeConvertorFactory(
                            new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
        } else {
            factory = new DataTypeConvertorFactory();
        }
    }
    // 改为
        public TableSchemaServiceImpl() throws IOException {
            Common.setStarter(true);
            Set<PluginIdentifier> pluginIdentifiers =
                    SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
            ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();
            pluginIdentifiersList.addAll(pluginIdentifiers);
            List<URL> pluginJarPaths =
                    new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
            //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
            if (!pluginJarPaths.isEmpty()) {
                //            List<URL> files = FileUtils.searchJarFiles(path);
                pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
                factory =
                        new DataTypeConvertorFactory(
                                new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
            } else {
                factory = new DataTypeConvertorFactory();
            }
        }
    
    SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
    // 改为
    SeaTunnelDataType<?> dataType =
                        convertor.toSeaTunnelType(field.getName(), field.getType());
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

     public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
            Common.setDeployMode(DeployMode.CLIENT);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName(jobInstanceId + "_job");
            try {
                SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
                SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
                ClientJobExecutionEnvironment jobExecutionEnv =
                        seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
                    final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
                JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
                jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
                jobInstanceDao.update(jobInstance);
    
                CompletableFuture.runAsync(
                        () -> {
                            waitJobFinish(
                                    clientJobProxy,
                                    userId,
                                    jobInstanceId,
                                    Long.toString(clientJobProxy.getJobId()),
                                    seaTunnelClient);
                        });
    
            } catch (ExecutionException | InterruptedException e) {
                ExceptionUtils.getMessage(e);
                throw new RuntimeException(e);
            }
            return jobInstanceId;
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

    else if (statusList.contains("CANCELLING")) {
                jobStatus = JobStatus.CANCELLING.name();
    // 改为
    else if (statusList.contains("CANCELING")) {
                jobStatus = JobStatus.CANCELING.name();
    • 1
    • 2
    • 3
    • 4

    org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

    TableFactoryContext context =
            new TableFactoryContext(
                    Collections.singletonList(table),
                    ReadonlyConfig.fromMap(config),
                    Thread.currentThread().getContextClassLoader());
    // 改为
    TableTransformFactoryContext context =
                    new TableTransformFactoryContext(
                            Collections.singletonList(table),
                            ReadonlyConfig.fromMap(config),
                            Thread.currentThread().getContextClassLoader());
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

    public void restoreJob(
                @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
            SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName(jobInstanceId + "_job");
            try {
                seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
    }
    // 改为
    public void restoreJob(
            @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
            SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName(jobInstanceId + "_job");
            SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
            try {
                seaTunnelClient
                    .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)
                    .execute();
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

    public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
            PluginType pluginType) throws IOException {
        Common.setStarter(true);
        if (!pluginType.equals(PluginType.SOURCE)) {
            throw new UnsupportedOperationException("ONLY support plugin type source");
        }
        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
        List<Factory> factories;
        if (path.toFile().exists()) {
            List<URL> files = FileUtils.searchJarFiles(path);
            factories =
                    FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
        } else {
            factories =
                    FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
        }
        Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
        factories.forEach(
                plugin -> {
                    if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                        TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                        PluginIdentifier info =
                                PluginIdentifier.of(
                                        "seatunnel",
                                        PluginType.SOURCE.getType(),
                                        plugin.factoryIdentifier());
                        featureMap.put(
                                info,
                                new ConnectorFeature(
                                        SupportColumnProjection.class.isAssignableFrom(
                                                tableSourceFactory.getSourceClass())));
                    }
                });
        return featureMap;
    }
    // 改为
    
        public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(
                PluginType pluginType) {
            Common.setStarter(true);
            if (!pluginType.equals(PluginType.SOURCE)) {
                throw new UnsupportedOperationException("ONLY support plugin type source");
            }
    
            ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();
            pluginIdentifiers.addAll(
                    SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());
            List<URL> pluginJarPaths =
                    new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);
    
            List<Factory> factories;
            if (!pluginJarPaths.isEmpty()) {
                factories =
                        FactoryUtil.discoverFactories(
                                new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
            } else {
                factories =
                        FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
            }
            Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();
            factories.forEach(
                    plugin -> {
                        if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                            TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                            PluginIdentifier info =
                                    PluginIdentifier.of(
                                            "seatunnel",
                                            PluginType.SOURCE.getType(),
                                            plugin.factoryIdentifier());
                            featureMap.put(
                                    info,
                                    new ConnectorFeature(
                                            SupportColumnProjection.class.isAssignableFrom(
                                                    tableSourceFactory.getSourceClass())));
                        }
                    });
            return featureMap;
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    代码格式化

    mvn spotless:apply

      编译打包

      mvn clean package -DskipTests

        至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

        Linux部署测试

        这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

        重要的配置项

        1、seatunnel-web数据库相关配置(application.yml) 
        用来web服务中的数据持久化
        
        2、SEATUNNEL_HOME(环境变量)
        seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器
        
        3、ST_WEB_HOME(环境变量)
        seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义
        
        4、重要的配置文件:
        connector-datasource-mapper.yaml 
        该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
        hazelcast-client.yaml 
        seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13

        感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

        本文由 白鲸开源科技 提供发布支持!

        声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/530127
        推荐阅读
        相关标签
          

        闽ICP备14008679号