如何在Apache Spark中使用PathFilter?

问题描述 投票:5回答:1

我有一个简单的文件过滤器,基本上选择特定日期的文件。在Hadoop中,我会使用PathFilterInputFormat类设置为setInputPathFilter参数。我怎样才能在Spark中执行此操作?

public class FilesFilter extends Configured implements PathFilter {

    @Override
    public boolean accept(Path path) {

        try {
            if (fs.isDirectory(path))
                return true;
        } catch (IOException e1) {
            e1.printStackTrace();
            return false;
        }

        String file_date = "01.30.2015";
        SimpleDateFormat sdf = new SimpleDateFormat("MM.dd.yyyy");
        Date date = null;

        try {
            date = sdf.parse(file_date);
        } catch (ParseException e1) {
            e1.printStackTrace();
        }

        long dt = date.getTime()/(1000 * 3600 * 24);

        try {
            FileStatus file = fs.getFileStatus(path);
            long time = file.getModificationTime() / (1000 * 3600 * 24);
            return time == dt;
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }

    }
}
java scala hadoop apache-spark
1个回答
8
投票

用这个:

sc.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[TmpFileFilter], classOf[PathFilter])

这是我的TmpFileFilter.scala代码,它将省略.tmp文件:

import org.apache.hadoop.fs.{Path, PathFilter}

class TmpFileFilter  extends PathFilter {
  override def accept(path : Path): Boolean = !path.getName.endsWith(".tmp")
}

你可以定义自己的PathFilter

© www.soinside.com 2019 - 2024. All rights reserved.