有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java FlinkkinesConsumer不在NoHttpResponseException上重试?

(AWS EMR发布标签5.28.x上的Apache Flink1.8)

我们的数据源是一个AWS运动流(如果有必要的话,有450个碎片)。我们使用FlinkkinesConsumer来读取运动流。 我们的应用程序偶尔(每隔几天一次)会出现“目标服务器未能响应”错误而崩溃。完整的堆栈跟踪位于底部

进一步查看代码库,我发现“ProvisionedThroughPutteExceedeDexception”是唯一可以重试的异常类型Code
1.想知道为什么kinesis连接器不会重试瞬态http响应异常
2.是否有一种方法可以传递重试配置,以重试这些错误

作为补充说明,我们设置了以下重试配置-

env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
      org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
                org.apache.flink.api.common.time.Time.of(300, TimeUnit.SECONDS)));

异常的完整堆栈跟踪-

    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
    at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
    at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

共 (2) 个答案

  1. # 1 楼答案

    使用env.setRestartStrategy()配置的重启策略是在出现故障时重启整个Flink作业。这不会影响Flink中的运动连接

    Kinesis consumer有the following configuration settings(从1.11开始)用于更改重启行为:

        /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
        public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
    
        /** The maximum number of getRecords attempts if we get a recoverable exception. */
        public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
    
        /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
        public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";
    
        /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
        public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";
    
        /** The power constant for exponential backoff between each getRecords attempt. */
        public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";
    
        /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds. */
        public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";
    
  2. # 2 楼答案

    KinesisProxy支持重试异常,重试行为可以通过前面答案中提到的设置进行控制。然而,并不是所有的异常都会被重试,并且默认的白名单并没有涵盖通常会在Kinesis服务中出现的所有暂时性问题。我们(随着时间推移)对代理进行了如下定制,以实现稳定的生产设置:

      @Override
      protected boolean isRecoverableSdkClientException(SdkClientException ex) {
        if (ex instanceof KMSThrottlingException) {
          // not handled in KinesisProxy in 1.5.x
          return true;
        } else if (ex instanceof AmazonServiceException) {
          return KinesisProxy.isRecoverableException((AmazonServiceException)ex);
        } else if (ex.getCause() instanceof SocketTimeoutException) {
          return true;
        } else if (ex.getCause() instanceof NoHttpResponseException) {
          return true;
        } else if (ex.getCause() instanceof ConnectTimeoutException) {
          return true;
        } else if (ex.getCause() instanceof java.net.UnknownHostException) {
          return true;
        } else if (ex.getCause() instanceof javax.net.ssl.SSLHandshakeException) {
          return true;
        }
        return false;
      }