Google数据流如何停止长时间运行的步骤

2024-10-05 10:02:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个python数据流管道,它从pubsub获取通知,从存储桶读取文件,转换它们,然后将它们上传到BigQuery

我必须进行回填,这大大增加了通过管道的体积,并按预期增加了处理此问题的工人数量。之后,当音量低到只容纳一个工作线程时,它不会自动缩小。我发现我在长时间运行的步骤中遇到了很多错误,如下所示:

Error message from worker: Operation ongoing in step s03 for at least 04h30m00s without 
outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at 
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(
RegisterAndProcessBundleOperation.java:332) at 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) at 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350) at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152) at 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at 
java.lang.Thread.run(Thread.java:748)

有没有办法停止或中止这些长任务?设定一个时间限制一个步骤必须在多长时间内完成

我相信这正是阻止我的管道缩小规模的原因,因此,按照我的意愿,让管道一直开着会让我付出高昂的代价


Tags: runorg管道apacheutiljavaconcurrentat

热门问题