java如何在Apache Spark中确定偏移量?
我正在搜索一些数据文件(~20GB)。我想在数据中找到一些特定的术语,并标记匹配的偏移量。有没有办法让Spark识别我正在操作的数据块的偏移量
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import java.util.regex.*;
public class Grep {
public static void main( String args[] ) {
SparkConf conf = new SparkConf().setMaster( "spark://ourip:7077" );
JavaSparkContext jsc = new JavaSparkContext( conf );
JavaRDD<String> data = jsc.textFile( "hdfs://ourip/test/testdata.txt" ); // load the data from HDFS
JavaRDD<String> filterData = data.filter( new Function<String, Boolean>() {
// I'd like to do something here to get the offset in the original file of the string "babe ruth"
public Boolean call( String s ) { return s.toLowerCase().contains( "babe ruth" ); } // case insens matching
});
long matches = filterData.count(); // count the hits
// execute the RDD filter
System.out.println( "Lines with search terms: " + matches );
);
} // end main
} // end class Grep
我想在“filter”操作中计算原始文件中“baberuth”的偏移量。我可以得到当前行中“babe ruth”的偏移量,但是文件中告诉我行偏移量的过程或函数是什么
# 1 楼答案
在Spark common中,可以使用Hadoop输入格式。要从文件读取字节偏移量,可以使用Hadoop(org.apache.Hadoop.mapreduce.lib.input)中的类TextInputFormat。它已经与Spark捆绑在一起了
它将文件读取为键(字节偏移量)和值(文本行):
在Spark中,可以通过调用
newAPIHadoopFile()
来使用它# 2 楼答案
您可以使用来自
JavaSparkContext
的wholeTextFiles(String path, int minPartitions)
方法返回一个JavaPairRDD<String,String>
,其中键是filename,值是一个包含文件全部内容的字符串(因此,此RDD中的每个记录表示一个文件)。从这里开始,只需运行一个map()
,它将对每个值调用indexOf(String searchString)
。这将返回每个文件中的第一个索引以及相关字符串的出现(编辑:)
因此,以分布式方式查找一个文件的偏移量(根据注释中下面的用例)是可能的。下面是一个在Scala中工作的示例
请注意,由于不考虑任何新行字符(输入格式使用新行作为记录之间的界限),因此您还需要在其上手动添加任何新行字符。新行数只是包含搜索字符串的行之前的行数,因此这是很容易添加的
不幸的是,我并不完全熟悉Java API,而且它也不太容易测试,因此我不确定下面的代码是否有效,但我已经做到了(我也使用了Java 1.7,但1.8用lambda表达式压缩了很多代码):
只有当您的输入是一个文件时(否则,
zipWithIndex()
就不能保证文件中的偏移量),才可以执行此操作,但此方法适用于任意数量的分区的RDD,因此可以将文件划分为任意数量的块