在Apache Spark上为每个工作者创建一个单例

问题描述 投票:1回答:1

假设我想使用创建某种程度昂贵的对象来映射RDD。我希望每个worker / thread拥有一个这个对象,并且必须在处理每个worker上的RDD分区的项之前创建它。

我的解决方案是:

    final Function0<ModelEvaluator> f = () -> {

        if (ModelEvaluator.getInstance() == null) {
            ModelEvaluator m = new ModelEvaluator(script);
            ModelEvaluator.setInstance(m);
        }

        return ModelEvaluator.getInstance();
    };

    JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
            (t) -> {
                try {
                    double val = f.call().evaluateModel(t);
                    return new Tuple2<>(val, t);
                } catch (Exception ex) {
                    return null;
                }
            }
    );



public class ModelEvaluator {

  private static ModelEvaluator instance;

  public static void setInstance(ModelEvaluator instance) {
    ModelEvaluator.instance = instance;
  }

  public static ModelEvaluator getInstance() {
      return instance;
  } 
...

在这种情况下,“ModelEvaluator”对象解析脚本,然后使用“服务”对象列表来配置模型参数,以便计算该参数配置的关联响应度量。但是每次处理RDD行时我都不想解析脚本。

我还配置了我的集群为每个集群创建一个进程,每个进程只生成一个worker,因为在同一进程中同一进程中多个worker访问一个具有可变状态的单例实例会有问题。

对我的问题有更优雅的解决方案吗?

java apache-spark singleton
1个回答
3
投票

这可以使用Broadcast变量来完成。这将允许您在驱动程序上创建一个对象,并且每个工作程序将根据需要发送一个对象。

final Broadcast<ModelEvaluator> model = jsc.broadcast(new ModelEvaluator(script));

JavaPairRDD<Double, List<Service>> results = cartesian.mapToPair(
        (t) -> {
            try {
                double val = model.value().evaluateModel(t);
                return new Tuple2<>(val, t);
            } catch (Exception ex) {
                return null;
            }
        }
);
© www.soinside.com 2019 - 2024. All rights reserved.