有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用ApacheCamel同步FTP远程和FS(或HDFS)

我的要求如下

有时,我想手动运行一个工具,将FTP remote与我的FS(至少最终是-HDFS)remote同步。 应下载每个文件,并附加较新的文件

我使用了Apache Camel FTP2示例: https://github.com/apache/camel/tree/master/examples/camel-example-ftp 启动这个项目。在其他来源的帮助下,这是一个简单的解决方案

object Main{

  def main(args: Array[String]): Unit = {
    val context = new DefaultCamelContext
    context.addRoutes(FtpRoute())

    context.start
    Thread.sleep(100000)
    context.stop
  }    
}

case class FtpRoute() extends RouteBuilder {

  def configure(): Unit = { // configure properties component
    val pc = getContext.getComponent("properties", classOf[PropertiesComponent])
    pc.setLocation("classpath:ftp.properties")

    val ftpSource = getContext.resolvePropertyPlaceholders(
      s"ftp://{{ftp.serv}}{{ftp.path}}?username={{ftp.user}}&password={{ftp.pass}}&passiveMode=true")

    from(ftpSource)
      .to("file:/tmp/ftp")
      .log("Downloaded file ${file:name} complete.")
  }
}

我可以说这很有效,但是。。。绝对不是我想要的那样

  1. 如何跟踪文件进度,以及(可能)在下载所有文件时停止Camel上下文?我必须分开处理吗
  2. 来自FTP的文件正在被反复下载。不停。他为什么一直在重新下载文件
  3. 正在创建大量FTP连接disconnect=true在这里没有帮助。你有这方面的经验吗

非常感谢


共 (1) 个答案

  1. # 1 楼答案

    ftp组件正在轮询消费者

    这样的组件每隔延迟秒轮询一个URI

    每次轮询,组件都会从头到尾获取所有新文件。 这就是为什么你会看到不断的文件下载。下载所有文件后,组件将停止并等待计时器或时间表发出的新事件

    1. 您必须使用幂等消费EIP(http://camel.apache.org/idempotent-consumer.html)。 我建议您使用JDBC幂等消费程序。SQLite是我的最爱

    我的ftp选项:

    disconnect=true&
    connectTimeout=300000&
    ftpClient.dataTimeout=300000&
    timeout=300000&soTimeout=300000&
    delay=3600000&reconnectDelay=10000&
    backoffErrorThreshold=1&
    maximumReconnectAttempts=3&
    backoffMultiplier=2&
    passiveMode=true&
    noop=true&
    idempotent=true&
    idempotentRepository=#jdbcDocFileIdempotentRepository&
    download=true&
    recursive=true&
    stepwise=true&
    antInclude=**/currMonth/*.zip&antExclude=**/prevMonth/*.zip&
    idempotentKey=${file:onlyname}-${file:size}
    

    和幂等消费者:

    <bean id="jdbcDocFileIdempotentRepository"
          class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
        <argument ref="embeddedDataSource"/>
        <argument value="DocFileRepo"/>
        <property name="tableExistsString" value="SELECT 1 FROM MESSAGE_REPOSITORY WHERE 1 = 0"/>
        <property name="createString" value="CREATE TABLE MESSAGE_REPOSITORY (processorName TEXT, messageId TEXT, createdAt TIMESTAMP, result TEXT)"/>
        <property name="queryString" value="SELECT COUNT(*) FROM MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?"/>
        <property name="insertString" value="INSERT INTO MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, ?, ?)"/>
        <property name="deleteString" value="DELETE FROM MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?"/>
    </bean>
    

    数据源:

    <bean id="embeddedDataSource" class="org.sqlite.javax.SQLiteConnectionPoolDataSource">
       <property name="url" value="jdbc:sqlite:${embedded_db_config_path}" />
       <property name="encoding" value="UTF-8"/>
       <property name="journalMode" value="WAL"/>
    </bean>
    
    1. 您需要设置一个合适的延迟,它需要比默认值大得多
    2. 如果延迟足够大,则连接将通过超时关闭。当然,你可以调整超时时间

    更新

    根据Camel文件:

    选项:幂等元=true

    选项使用幂等消费者EIP模式,让Camel跳过已处理的文件。默认情况下,将使用基于内存的LRUCache,该LRUCache可容纳1000个条目。如果noop=true,则还将启用幂等项,以避免反复使用相同的文件

    选项:幂等存储库

    一个可插入的存储库组织。阿帕奇。骆驼spi。幂等存储库,默认情况下,如果未指定且幂等为真,则使用MemorymessagedRepository

    因此,默认情况下,ftp使用内存解决方案