有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在Apache Spark中确定偏移量?

我正在搜索一些数据文件(~20GB)。我想在数据中找到一些特定的术语,并标记匹配的偏移量。有没有办法让Spark识别我正在操作的数据块的偏移量

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

import java.util.regex.*;

public class Grep {
        public static void main( String args[] ) {
            SparkConf        conf       = new SparkConf().setMaster( "spark://ourip:7077" );
            JavaSparkContext jsc        = new JavaSparkContext( conf );
            JavaRDD<String>  data       = jsc.textFile( "hdfs://ourip/test/testdata.txt" ); // load the data from HDFS
            JavaRDD<String>  filterData = data.filter( new Function<String, Boolean>() {
                    // I'd like to do something here to get the offset in the original file of the string "babe ruth"
                    public Boolean call( String s ) { return s.toLowerCase().contains( "babe ruth" ); } // case insens matching

            });

            long matches = filterData.count();  // count the hits

            // execute the RDD filter
            System.out.println( "Lines with search terms: " + matches );
 );
        } //  end main
} // end class Grep

我想在“filter”操作中计算原始文件中“baberuth”的偏移量。我可以得到当前行中“babe ruth”的偏移量,但是文件中告诉我行偏移量的过程或函数是什么


共 (2) 个答案

  1. # 1 楼答案

    在Spark common中,可以使用Hadoop输入格式。要从文件读取字节偏移量,可以使用Hadoop(org.apache.Hadoop.mapreduce.lib.input)中的类TextInputFormat。它已经与Spark捆绑在一起了

    它将文件读取为键(字节偏移量)和值(文本行):

    An InputFormat for plain text files. Files are broken into lines. Either linefeed or carriage-return are used to signal end of line. Keys are the position in the file, and values are the line of text.

    在Spark中,可以通过调用newAPIHadoopFile()来使用它

    SparkConf conf = new SparkConf().setMaster("");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    
    // read the content of the file using Hadoop format
    JavaPairRDD<LongWritable, Text> data = jsc.newAPIHadoopFile(
            "file_path", // input path
            TextInputFormat.class, // used input format class
            LongWritable.class, // class of the value
            Text.class, // class of the value
            new Configuration());    
    
    JavaRDD<String> mapped = data.map(new Function<Tuple2<LongWritable, Text>, String>() {
        @Override
        public String call(Tuple2<LongWritable, Text> tuple) throws Exception {
            // you will get each line from as a tuple (offset, text)    
            long pos = tuple._1().get(); // extract offset
            String line = tuple._2().toString(); // extract text
    
            return pos + " " + line;
        }
    });
    
  2. # 2 楼答案

    您可以使用来自JavaSparkContextwholeTextFiles(String path, int minPartitions)方法返回一个JavaPairRDD<String,String>,其中键是filename,值是一个包含文件全部内容的字符串(因此,此RDD中的每个记录表示一个文件)。从这里开始,只需运行一个map(),它将对每个值调用indexOf(String searchString)。这将返回每个文件中的第一个索引以及相关字符串的出现

    (编辑:)

    因此,以分布式方式查找一个文件的偏移量(根据注释中下面的用例)是可能的。下面是一个在Scala中工作的示例

    val searchString = *search string*
    val rdd1 = sc.textFile(*input file*, *num partitions*)
    
    // Zip RDD lines with their indices
    val zrdd1 = rdd1.zipWithIndex()
    
    // Find the first RDD line that contains the string in question
    val firstFind = zrdd1.filter { case (line, index) => line.contains(searchString) }.first()
    
    // Grab all lines before the line containing the search string and sum up all of their lengths (and then add the inline offset)
    val filterLines = zrdd1.filter { case (line, index) => index < firstFind._2 }
    val offset = filterLines.map { case (line, index) => line.length }.reduce(_ + _) + firstFind._1.indexOf(searchString)
    

    请注意,由于不考虑任何新行字符(输入格式使用新行作为记录之间的界限),因此您还需要在其上手动添加任何新行字符。新行数只是包含搜索字符串的行之前的行数,因此这是很容易添加的

    不幸的是,我并不完全熟悉Java API,而且它也不太容易测试,因此我不确定下面的代码是否有效,但我已经做到了(我也使用了Java 1.7,但1.8用lambda表达式压缩了很多代码):

    String searchString = *search string*;
    JavaRDD<String> data = jsc.textFile("hdfs://ourip/test/testdata.txt");
    
    JavaRDD<Tuple2<String, Long>> zrdd1 = data.zipWithIndex();
    
    Tuple2<String, Long> firstFind = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
          public Boolean call(Tuple2<String, Long> input) { return input.productElement(0).contains(searchString); }
      }).first();
    
    JavaRDD<Tuple2<String, Long>> filterLines = zrdd1.filter(new Function<Tuple2<String, Long>, Boolean>() {
          public Boolean call(Tuple2<String, Long> input) { return input.productElement(1) < firstFind.productElement(1); }
      });
    
    Long offset = filterLines.map(new Function<Tuple2<String, Long>, Int>() {
          public Int call(Tuple2<String, Long> input) { return input.productElement(0).length(); }
      }).reduce(new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer a, Integer b) { return a + b; }
      }) + firstFind.productElement(0).indexOf(searchString);
    

    只有当您的输入是一个文件时(否则,zipWithIndex()就不能保证文件中的偏移量),才可以执行此操作,但此方法适用于任意数量的分区的RDD,因此可以将文件划分为任意数量的块