Hazelcast:mapUsingService 中的服务不可序列化

问题描述 投票:0回答:1

我的两个问题

  1. 如何在 hazelcast 管道中发出 HTTP 请求? (下面是我的尝试)
  2. 为什么我的
    Foo
    类不可序列化?

我在 Java 中使用嵌入式 hazelcast (5.4.0)。


有问题的管道:

Pipeline pipeline = Pipeline.create();
StreamStage<Bar> prepared = pipeline.readFrom(KafkaSources. ...)
                                    .withTimestamps(...)
                                    .map(Map.Entry::getValue);
ServiceFactory<?, Foo> service = ServiceFactories.sharedService(ctx -> new Foo(url))
                                                 .toNonCooperative();
prepared.mapUsingService(service, (s, bar) -> new EnhancedBar(bar, s.getDetails(bar.getId())))
        .writeTo(Sinks.logger());

错误:

java.lang.IllegalArgumentException: "createContextFn" must be serializable
        at com.hazelcast.internal.serialization.impl.SerializationUtil.checkSerializable(SerializationUtil.java:83)

有问题的班级:

@Getter
@Setter
public class Foo implements Serializable {

    private String url;
    private transient OkHttpClient client;

    public Foo() {
        this.url = "unknown";
        client = new OkHttpClient();
    }

    public Foo(String url) {
        this.url = url;
        client = new OkHttpClient();
    }

    public String getDetails() {
        Request request = new Request.Builder().url(url).build();

        Call call = client.newCall(request);
        try {
            Response response = call.execute();
            return response.body().string();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Serial
    private void writeObject(ObjectOutputStream oos) throws IOException {
        oos.writeObject(url);
    }

    @Serial
    private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
        String url = (String) ois.readObject();
        this.setUrl(url);
        this.setClient(new OkHttpClient());
    }
}

问题源于

OkHttpClient
没有实现Serialized(我猜)。

我的尝试:

  1. 当我删除客户端并简单返回后
    "constant detail"
    ,作业就成功提交了。
  2. 当我删除两个
    @Serial
    方法并使用 CompactSerializer 时,我得到了同样的错误。

在最后一个例子中,我的 CompactSerializer 的读/写方法是

@NotNull @Override
public Foo read(@NotNull CompactReader reader) {
    String url = reader.readString("url");
    return new Foo(url);
}

@Override
public void write(@NotNull CompactWriter writer, @NotNull Foo object) {
    writer.writeString("url", object.getUrl());
}
java hazelcast hazelcast-jet
1个回答
0
投票

如果你使用的是OkHttpClient,那么你还需要上传

  1. 将jar文件发送到HZ服务器
  2. 如果服务是内部类,则为服务的父类。如果服务不是内部类那么这一步就没有必要

注意:在我看来,OkHttpClient 正在造成不必要的负担。只需使用JDK自带的HttpClient即可。作业配置会更简单

这是一个例子

// This example creates a Jet job that downsload the given url.
@Slf4j
class OkHttpClientJob {

  public static void main(String[] args) throws InterruptedException {
    log.info("Starting Test");
    // Start client
    HazelcastInstance hazelcastClient = getHazelcastClientInstance();
    submitJob(hazelcastClient);

    TimeUnit.SECONDS.sleep(10);

    hazelcastClient.shutdown();

    log.info("Test completed");
  }

  private static HazelcastInstance getHazelcastClient() {
    ClientConfig clientConfig = new ClientConfig();
    return HazelcastClient.newHazelcastClient(clientConfig);
  }

  private static void submitJob(HazelcastInstance hazelcastInstance) {
    // Create a service that will be used by Jet Job
    ServiceFactory<?, HttpClientService> service = 
      ServiceFactories.sharedService(ctx -> new HttpClientService())
        .toNonCooperative();

    Pipeline pipeline = Pipeline.create();
    List<String> input = List.of("https://www.google.com");
    BatchStage<String> prepared = pipeline.readFrom(TestSources.items(input));

    prepared.mapUsingService(service, HttpClientService::get)
        .writeTo(Sinks.logger());

    // Path to okhttp jar file
    File file = new File("src/main/resources/okhttp-5.0.0-alpha.14.jar");
    JobConfig jobConfig = new JobConfig()
      // Add jar to jet job
      .addJarsInZip(file)
      // Add parent class that contains the service class
      .addClass(OkHttpClientJob.class);
         
    JetService jetService = hazelcastInstance.getJet();
    Job job = jetService.newJob(pipeline, jobConfig);
    job.join();
  }


  private static class HttpClientService implements Serializable {
    private transient OkHttpClient client;

    public HttpClientService() {
      client = new OkHttpClient();
    }

    public String get(String url) {
      Request request = new Request.Builder()
        .url(url)
        .build();

      String result;
      try {
        Call call = client.newCall(request);
        try (Response response = call.execute()) {
          try (ResponseBody responseBody = response.body()) {
            result = responseBody.string();
          }
        }
      } catch (IOException exception) {
         result = exception.getMessage();
      }
      return result;
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.