java FlinkkinesConsumer不在NoHttpResponseException上重试?
(AWS EMR发布标签5.28.x上的Apache Flink1.8)
我们的数据源是一个AWS运动流(如果有必要的话,有450个碎片)。我们使用FlinkkinesConsumer来读取运动流。 我们的应用程序偶尔(每隔几天一次)会出现“目标服务器未能响应”错误而崩溃。完整的堆栈跟踪位于底部
进一步查看代码库,我发现“ProvisionedThroughPutteExceedeDexception”是唯一可以重试的异常类型Code
1.想知道为什么kinesis连接器不会重试瞬态http响应异常
2.是否有一种方法可以传递重试配置,以重试这些错误
作为补充说明,我们设置了以下重试配置-
env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
org.apache.flink.api.common.time.Time.of(300, TimeUnit.SECONDS)));
异常的完整堆栈跟踪-
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
# 1 楼答案
使用
env.setRestartStrategy()
配置的重启策略是在出现故障时重启整个Flink作业。这不会影响Flink中的运动连接Kinesis consumer有the following configuration settings(从1.11开始)用于更改重启行为:
# 2 楼答案
KinesisProxy支持重试异常,重试行为可以通过前面答案中提到的设置进行控制。然而,并不是所有的异常都会被重试,并且默认的白名单并没有涵盖通常会在Kinesis服务中出现的所有暂时性问题。我们(随着时间推移)对代理进行了如下定制,以实现稳定的生产设置: