有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

带Quartz调度器的java Camel SFTP:防止并发执行,如果忙则拒绝任务

  • 骆驼3.11
  • 弹簧靴2.5
  • 爪哇11

我有一个从SFTP服务器获取文件的动态驼峰路由。每个SFTP用户有一条路由。路由由cron作业安排

sftp://localhost:55443/reports?
binary=true
&bridgeErrorHandler=true
&connectTimeout=10000
&delete=true
&disconnect=true
&fastExistsCheck=true
&hash=200f9d2c
&jschLoggingLevel=WARN
&maximumReconnectAttempts=0
&passiveMode=true
&privateKey=[PRIVATEKEY]
&readLockLoggingLevel=WARN
&runLoggingLevel=WARN
&scheduler=quartz
&scheduler.cron=*+*+*+%3F+*+*+*
&scheduler.stateful=true
&soTimeout=10000
&sortBy=file%3Amodified
&throwExceptionOnConnectFailed=true
&timeout=10000
&username=sftpuser

(在这里,为了测试目的,我增加了cron速率,以导致错误。)

检索到的文件将被处理、过滤、充实并发送到消息队列

有时,这些文件的处理时间比cron间隔长。我假设骆驼应该有某种队列来防止并发的SFTP运行,但这不起作用。似乎队列允许多个作业为同一任务排队,并且如果SFTP服务器的状态发生变化,则不会清除队列

当作业重叠时,会发生异常。有时会多次加载文件。有时会出现一个错误,说明文件无法删除。最终结果是消息队列中的重复数据

错误

org.apache.camel.component.file.GenericFileOperationFailedException: Cannot change directory
...
Caused by: java.io.IOException: Pipe closed
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot delete file
...
Caused by: com.jcraft.jsch.SftpException: No such file

我想要的

  • 每个用户创建1条SFTP路由(完成)
  • 对于每个SFTP路线,
    • 每次至少运行一次(完成)
    • 一次只允许一个实例(?)
    • 根本不排队执行(?)

尝试的修复

SFTP路由属性

设置/启用这些属性时没有更改:

  • useFixedDelay
  • idempotent
  • readLock
  • fastExistsCheck
ScheduledExecutorService

如果我为所有SFTP路由创建一个单线程执行器服务,它就没有帮助了

private final ScheduledExecutorService sftpExecutor = Executors.newSingleThreadScheduledExecutor();
...
<create route>
.scheduledExecutorService(sftpExecutor)

弹簧靴

这里似乎没有任何相关的配置——尽管使用如此简洁的文档很难判断

schedulerProperties

我想也许schedulerProperties会有帮助,但我设置的任何东西都不起作用。没有这方面的文档或示例。对于为什么用它设置任何东西都不起作用

.schedulerProperties("stateful", true)
.schedulerProperties("scheduler.stateful", true
.schedulerProperties("threadCount", "1")

结果:Failed to create Consumer for endpoint: Reason: There are 1 scheduler parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{stateful=true}]


旁白(抱怨和抱怨):Camel的文档通常都很糟糕——它非常简洁、模糊,并且依赖于Camel组件的模糊网络。该网站的配置表中有非常窄的行,这些行强制包装了所有配置属性,因此很难阅读。

我找不到任何人可以告诉我找到它有多困难,所以我想发泄一下。


共 (0) 个答案