chunjun是一款基于flink的开源数据同步工具,官方文档,其提供了很多flink官方未提供的插件供大家来使用,特别是达梦插件在国产化环境中很方便!
本次介绍的是chunjun中的一款http插件,通过该插件可以实现基于http请求的流处理,但是目前官方提供的http插件在以SQL模式运行的时候是有一些问题的,所以我花了些时间将问题排查修复下,并且添加了一个分页的新功能。下面是具体的过程。
按照官方文档使用http插件运行的时候,会报下面的错误
java.lang.RuntimeException: request data error,msg is prevResponse value is exception java.lang.RuntimeException: key data.id on {msg=请求成功, total=0, code=0000, data=[{name=第0臭桑, id=0}, {name=第1臭桑, id=1}], timestamp=2023-02-12 16:39:12} is not a json
at com.dtstack.chunjun.util.MapUtil.getValueByKey(MapUtil.java:161)
at com.dtstack.chunjun.connector.http.client.ResponseParse.buildResponseByKey(ResponseParse.java:63)
at com.dtstack.chunjun.connector.http.client.JsonResponseParse.next(JsonResponseParse.java:95)
at com.dtstack.chunjun.connector.http.client.HttpClient.doExecute(HttpClient.java:272)
at com.dtstack.chunjun.connector.http.client.HttpClient.execute(HttpClient.java:184)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
at com.dtstack.chunjun.connector.http.inputformat.HttpInputFormat.nextRecordInternal(HttpInputFormat.java:118)
at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:198)
at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:68)
at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:133)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
HttpOptions 添加两个配置// 第一个是配置是数据主体,一般http请求都是标准的统一返回值,有状态码 状态信息 数据主体,我们需要的数据都在数据主体里面的
public static final ConfigOption<String> DATA_SUBJECT =
ConfigOptions.key("dataSubject")
.stringType()
.defaultValue("${data}")
.withDescription("response data subject");
// 这个配置是发送http请求的周期,如果设置2的话 就会重复请求两次的 如果是-1则会一直重复请求
public static final ConfigOption<Long> CYCLES =
ConfigOptions.key("cycles")
.longType()
.defaultValue(1L)
.withDescription("request cycle");
HttpDynamicTableFactory @Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(HttpOptions.DECODE);
options.add(HttpOptions.METHOD);
options.add(HttpOptions.HEADER);
options.add(HttpOptions.BODY);
options.add(HttpOptions.PARAMS);
options.add(HttpOptions.INTERVALTIME);
options.add(HttpOptions.COLUMN);
options.add(HttpOptions.DELAY);
// 下面这俩是对应了with参数
options.add(HttpOptions.DATA_SUBJECT);
options.add(HttpOptions.CYCLES);
return options;
}
private HttpRestConfig getRestapiConf(ReadableConfig config) {
Gson gson = GsonUtil.setTypeAdapter(new Gson());
HttpRestConfig httpRestConfig = new HttpRestConfig();
httpRestConfig.setIntervalTime(config.get(HttpOptions.INTERVALTIME));
httpRestConfig.setUrl(config.get(HttpOptions.URL));
httpRestConfig.setDecode(config.get(HttpOptions.DECODE));
httpRestConfig.setRequestMode(config.get(HttpOptions.METHOD));
// 将上面配置的参数信息封装到http请求配置里面
httpRestConfig.setDataSubject(config.get(HttpOptions.DATA_SUBJECT));
httpRestConfig.setCycles(config.get(HttpOptions.CYCLES));
httpRestConfig.setParam(
gson.fromJson(
config.get(HttpOptions.PARAMS),
new TypeToken<List<MetaParam>>() {}.getType()));
httpRestConfig.setHeader(
gson.fromJson(
config.get(HttpOptions.HEADER),
new TypeToken<List<MetaParam>>() {}.getType()));
httpRestConfig.setBody(
gson.fromJson(
config.get(HttpOptions.BODY),
new TypeToken<List<MetaParam>>() {}.getType()));
httpRestConfig.setColumn(
gson.fromJson(
config.get(HttpOptions.COLUMN),
new TypeToken<List<FieldConf>>() {}.getType()));
return httpRestConfig;
}
HttpRowConverter// 修改类的泛型 原来是 String 现在需要修改成Map<String,Object>
public class HttpRowConverter
extends AbstractRowConverter<Map<String, Object>, RowData, RowData, LogicalType>
// 上面修改了泛型后 这里重写的方法参数类型也会是map类型,在别的地方调用这个方法的时候,传递的就是map类型数据
// 但是源码里面用String接收的,这样会导致调用方法的时候就出错,而且单步调试的时候就是进不到这个方法的,只能进入到类上
// 前面传递过来的就是map类型数据了,源码里面,这个方法里的前两行是将字符串转成map的,那也就是说这两行代码不需要了,删除即可
@Override
public RowData toInternal(Map<String, Object> result) throws Exception {
GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
List<String> columns = rowType.getFieldNames();
for (int pos = 0; pos < columns.size(); pos++) {
Object value =
MapUtil.getValueByKey(
result, columns.get(pos), httpRestConfig.getFieldDelimiter());
if (value instanceof LinkedTreeMap) {
value = value.toString();
}
genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(value));
}
return genericRowData;
}
经过上面的修改之后,可以在with参数里面指定数据主体和请求周期,直接在localTest类运行即可成功!下面是示例的sql
CREATE TABLE source
(
id int,
name varchar
) WITH (
'connector' = 'http-x'
,'url' = 'http://127.0.0.1:8090/test/test'
,'intervalTime' = '3000'
,'method' = 'get'
,'cycles' = '5',
,'dataSubject' = '${data}'
,'decode' = 'json'
,'paging' = 'true'
,'pagingParam' = 'pageNumber'
,'params' = '[{"key": "pageNumber","value":1,"type":"int"},{"key": "pageSize","value":100,"type":"int"}]'
,'column' = '[
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "String"
}
]'
);
CREATE TABLE sink
(
id int,
name varchar
) WITH (
'connector' = 'stream-x'
);
insert into sink
select *
from source u;
目前在上面的基础上,我又加了分页查询的功能,后面有时间会编辑此博客加上分页的源码修改
转载请注明来处
在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/
我对最新版本的Rails有疑问。我创建了一个新应用程序(railsnewMyProject),但我没有脚本/生成,只有脚本/rails,当我输入ruby./script/railsgeneratepluginmy_plugin"Couldnotfindgeneratorplugin.".你知道如何生成插件模板吗?没有这个命令可以创建插件吗?PS:我正在使用Rails3.2.1和ruby1.8.7[universal-darwin11.0] 最佳答案 随着Rails3.2.0的发布,插件生成器已经被移除。查看变更日志here.现在
是的,我知道最好使用webmock,但我想知道如何在RSpec中模拟此方法:defmethod_to_testurl=URI.parseurireq=Net::HTTP::Post.newurl.pathres=Net::HTTP.start(url.host,url.port)do|http|http.requestreq,foo:1endresend这是RSpec:let(:uri){'http://example.com'}specify'HTTPcall'dohttp=mock:httpNet::HTTP.stub!(:start).and_yieldhttphttp.shou
我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur
1.错误信息:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:requestcanceledwhilewaitingforconnection(Client.Timeoutexceededwhileawaitingheaders)或者:Errorresponsefromdaemon:Gethttps://registry-1.docker.io/v2/:net/http:TLShandshaketimeout2.报错原因:docker使用的镜像网址默认为国外,下载容易超时,需要修改成国内镜像地址(首先阿里
Rails中有没有一种方法可以提取与路由关联的HTTP动词?例如,给定这样的路线:将“users”匹配到:“users#show”,通过:[:get,:post]我能实现这样的目标吗?users_path.respond_to?(:get)(显然#respond_to不是正确的方法)我最接近的是通过执行以下操作,但它似乎并不令人满意。Rails.application.routes.routes.named_routes["users"].constraints[:request_method]#=>/^GET$/对于上下文,我有一个设置cookie然后执行redirect_to:ba
您认为可以作为插件很好地存在于您的Rails应用程序中必须实现的哪些行为?您过去曾搜索过哪些插件功能但找不到?哪些现有的Rails插件可以改进或扩展,如何改进或扩展? 最佳答案 我希望在管理界面中看到一个引擎插件,它提供了应用程序中所有模型的仪表板摘要,以及可配置的事件图表。 关于ruby-on-rails-您希望看到哪些Rails插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questio
我正在使用Heroku(heroku.com)来部署我的Rails应用程序,并且正在构建一个iPhone客户端来与之交互。我的目的是将手机的唯一设备标识符作为HTTPheader传递给应用程序以进行身份验证。当我在本地测试时,我的header通过得很好,但在Heroku上它似乎去掉了我的自定义header。我用ruby脚本验证:url=URI.parse('http://#{myapp}.heroku.com/')#url=URI.parse('http://localhost:3000/')req=Net::HTTP::Post.new(url.path)#boguspara
我试图在我的网站上实现使用Facebook登录功能,但在尝试从Facebook取回访问token时遇到障碍。这是我的代码:ifparams[:error_reason]=="user_denied"thenflash[:error]="TologinwithFacebook,youmustclick'Allow'toletthesiteaccessyourinformation"redirect_to:loginelsifparams[:code]thentoken_uri=URI.parse("https://graph.facebook.com/oauth/access_token
我是Ruby的新手。我试过查看在线文档,但没有找到任何有效的方法。我想在以下HTTP请求botget_response()和get()中包含一个用户代理。有人可以指出我正确的方向吗?#PreliminarycheckthatProggitisupcheck=Net::HTTP.get_response(URI.parse(proggit_url))ifcheck.code!="200"puts"ErrorcontactingProggit"returnend#Attempttogetthejsonresponse=Net::HTTP.get(URI.parse(proggit_url)