我现在遇到的核心问题是,当我运行部署到Google Cloud Dataflow的Dataflow管道时,出现错误:
java.lang.IllegalStateException:名称为[DEFAULT]的FirebaseApp不存在。
如果我在本地运行相同的管道,则全部正常。因此我怀疑身份验证问题或环境问题。
代码位:
DEPLOY和REAL变量用于控制是否推送到Cloud(或在本地运行)以及是否使用我的Pub / Sub源或使用moc'd数据。在Moc'd和pub / sub数据之间进行切换似乎完全不会对Firestore产生影响。仅部署与否。
我正在初始化Firestore应用程序的main()件:
public class BreakingDataTransactions {
// When true, this pulls from the specified Pub/Sub topic
static Boolean REAL = true;
// when set to true the job gets deployed to Cloud Dataflow
static Boolean DEPLOY = true;
public static void main(String[] args) {
// validate our env vars
if (GlobalVars.projectId == null ||
GlobalVars.pubsubTopic == null ||
GlobalVars.gcsBucket == null ||
GlobalVars.region == null) {
System.out.println("You have to set environment variables for project (BREAKING_PROJECT), pubsub topic (BREAKING_PUBSUB), region (BREAKING_REGION) and Cloud Storage bucket for staging (BREAKING_DATAFLOW_BUCKET) in order to deploy this pipeline.");
System.exit(1);
}
// Initialize our Firestore instance
try {
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
System.out.println("*************************");
System.out.println(credentials);
FirebaseOptions firebaseOptions =
new FirebaseOptions.Builder()
.setCredentials(credentials)
.setProjectId(GlobalVars.projectId)
.build();
FirebaseApp firebaseApp = FirebaseApp.initializeApp(firebaseOptions);
} catch (IOException e) {
e.printStackTrace();
}
// Start dataflow pipeline
DataflowPipelineOptions options =
PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject(GlobalVars.projectId);
if (DEPLOY) {
options.setRunner(DataflowRunner.class);
options.setTempLocation(GlobalVars.gcsBucket);
options.setRegion(GlobalVars.region);
}
Pipeline p = Pipeline.create(options);
还有我正在处理的部分:
PCollection<Data> dataCollection =
jsonStrings
.apply(ParDo.of(JSONToPOJO.create(Data.class)))
.setCoder(AvroCoder.of(Data.class));
PCollection<Result> result =
dataCollection
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(WithKeys.of(x -> x.operation + "-" + x.job_id))
.setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(Data.class)))
.apply(Combine.<String, Data, Result>perKey(new DataAnalysis()))
.apply(Reify.windowsInValue())
.apply(MapElements.into(TypeDescriptor.of(Result.class))
.<KV<String, ValueInSingleWindow<Result>>>via(
x -> {
Result r = new Result();
String key = x.getKey();
r.query_action = key.substring(0, key.indexOf("-"));
r.job_id = key.substring(key.indexOf("-") + 1);
r.average_latency = x.getValue().getValue().average_latency;
r.failure_percent = x.getValue().getValue().failure_percent;
r.timestamp = x.getValue().getTimestamp().getMillis();
return r;
}));
// this node will (hopefully) actually write out to Firestore
result.apply(ParDo.of(new FireStoreOutput()));
最后是FireStoreOutput类:
public static class FireStoreOutput extends DoFn<Result, String> {
Firestore db;
@ProcessElement
public void processElement(@Element Result result) {
db = FirestoreClient.getFirestore();
DocumentReference docRef = db.collection("events")
.document("next2020")
.collection("transactions")
.document(result.job_id)
.collection("transactions")
.document();
//System.out.println(docRef.getId());
// Add document data with id "alovelace" using a hashmap
Map<String, Object> data = new HashMap<>();
data.put("failure_percent", result.failure_percent);
data.put("average_latency", result.average_latency);
data.put("query_action", result.query_action);
data.put("timestamp", result.timestamp);
// asynchronously write data
ApiFuture<WriteResult> writeResult = docRef.set(data);
try {
writeResult.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
;
}
}
错误发生在行:db = FirestoreClient.getFirestore();
我正在使用--serviceAccount
标志来部署Dataflow作业,以指定有权执行所有操作的服务帐户。
因此,除非GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
某种程度上不起作用(但是您在那儿看到了print语句,并且它确实在构建时正确打印出了凭据),不是吗。
但是,这仅在构建时发生...所以我想知道是否存在持久性问题,在构建时它可以很好地初始化,但是当作业实际上在云中运行时,它会在部署和处理。如果是这种情况,我该如何解决这个问题?
谢谢!
[好,我找到了一个解决方案……最大的问题是我的DAG的PCollection被分为两个线程路径。我有两种类型的操作“读取”和“写入”,因此这些结果都向我的FirestoreOut类发送了一个PCollection,这是我试图初始化Firestore应用程序的地方,导致已经初始化的问题。
但是,使我的数据库对象成为同步的静态对象,并在未设置同步的情况下在其中初始化一个同步的getDB()方法。最后更新的FireStoreOut相关代码:
public static class FireStoreOutput extends DoFn<Result, String> {
static Firestore db;
public static synchronized Firestore getDB() {
if (db == null) {
System.out.println("I'm being called");
// Initialize our Firestore instance
try {
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
System.out.println("*************************");
System.out.println(credentials);
FirebaseOptions firebaseOptions =
new FirebaseOptions.Builder()
.setCredentials(credentials)
.setProjectId(GlobalVars.projectId)
.build();
FirebaseApp firebaseApp = FirebaseApp.initializeApp(firebaseOptions);
} catch (IOException e) {
e.printStackTrace();
}
db = FirestoreClient.getFirestore();
}
return db;
}
@ProcessElement
public void processElement(@Element Result result) {
DocumentReference docRef = getDB().collection("events")
.document("next2020")
.collection("transactions")
.document(result.job_id)
.collection("transactions")
.document();