我有一个python数据流管道,它从pubsub获取通知,从存储桶读取文件,转换它们,然后将它们上传到BigQuery
我必须进行回填,这大大增加了通过管道的体积,并按预期增加了处理此问题的工人数量。之后,当音量低到只容纳一个工作线程时,它不会自动缩小。我发现我在长时间运行的步骤中遇到了很多错误,如下所示:
Error message from worker: Operation ongoing in step s03 for at least 04h30m00s without
outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(
RegisterAndProcessBundleOperation.java:332) at
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) at
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350) at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152) at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
java.lang.Thread.run(Thread.java:748)
有没有办法停止或中止这些长任务?设定一个时间限制一个步骤必须在多长时间内完成
我相信这正是阻止我的管道缩小规模的原因,因此,按照我的意愿,让管道一直开着会让我付出高昂的代价
目前没有回答
相关问题 更多 >
编程相关推荐