java使用ApacheCamel同步FTP远程和FS(或HDFS)
我的要求如下
有时,我想手动运行一个工具,将FTP remote与我的FS(至少最终是-HDFS)remote同步。 应下载每个文件,并附加较新的文件
我使用了Apache Camel FTP2示例: https://github.com/apache/camel/tree/master/examples/camel-example-ftp 启动这个项目。在其他来源的帮助下,这是一个简单的解决方案
object Main{
def main(args: Array[String]): Unit = {
val context = new DefaultCamelContext
context.addRoutes(FtpRoute())
context.start
Thread.sleep(100000)
context.stop
}
}
case class FtpRoute() extends RouteBuilder {
def configure(): Unit = { // configure properties component
val pc = getContext.getComponent("properties", classOf[PropertiesComponent])
pc.setLocation("classpath:ftp.properties")
val ftpSource = getContext.resolvePropertyPlaceholders(
s"ftp://{{ftp.serv}}{{ftp.path}}?username={{ftp.user}}&password={{ftp.pass}}&passiveMode=true")
from(ftpSource)
.to("file:/tmp/ftp")
.log("Downloaded file ${file:name} complete.")
}
}
我可以说这很有效,但是。。。绝对不是我想要的那样
- 如何跟踪文件进度,以及(可能)在下载所有文件时停止
Camel
上下文?我必须分开处理吗李> - 来自
FTP
的文件正在被反复下载。不停。他为什么一直在重新下载文件李> - 正在创建大量FTP连接
disconnect=true
在这里没有帮助。你有这方面的经验吗李>
非常感谢
# 1 楼答案
ftp组件正在轮询消费者
这样的组件每隔延迟秒轮询一个URI
每次轮询,组件都会从头到尾获取所有新文件。 这就是为什么你会看到不断的文件下载。下载所有文件后,组件将停止并等待计时器或时间表发出的新事件
我的ftp选项:
和幂等消费者:
数据源:
更新
根据Camel文件:
选项:幂等元=true
选项使用幂等消费者EIP模式,让Camel跳过已处理的文件。默认情况下,将使用基于内存的LRUCache,该LRUCache可容纳1000个条目。如果noop=true,则还将启用幂等项,以避免反复使用相同的文件
选项:幂等存储库
一个可插入的存储库组织。阿帕奇。骆驼spi。幂等存储库,默认情况下,如果未指定且幂等为真,则使用MemorymessagedRepository
因此,默认情况下,ftp使用内存解决方案