我正在使用spark-sql 2.4.1,spark-cassandra-connector_2.11-2.4.1.jar和java8。
我有这样的卡桑德拉表:
create company(company_id int, start_date date, company_name text , PRIMARY_KEY(company_id ,start_date )
)WITH CLUSTERING ORDER BY ( start_date DESC );
开始日期在这里是派生字段,它是在业务逻辑中计算的。
我有spark-sql流式代码,其中我在mapFunction下面调用了。
public static MapFunction<Company, CompanyTransformed> mapFunInsertCompany = ( record ) ->{
CompanyTransformed rec = new CompanyTransformed();
rec.setCompany_id(record.getCompanyId());
rec.setCompany_name(record.getCompanyName());
if(record.getChangeFlag().equalsIgnoreCase("I") && record.getCreateDate() != null )
rec.setStart_date(record.getCreateDate());
if(record.getChangeFlag().equalsIgnoreCase("U"))
rec.setStart_date(new Date(CommonUtils.today().getTime() + 86400000));
return rec;
};
启动我的消费者时,kafka主题中没有记录。对于空记录流,继续调用上述map函数。
因为有record.getCreateDate()= null起始日期设置为null。
但是主键的start_date部分因此插入失败并不确定地等待,可以将数据恢复并将其保存到C *表中。
所以应该怎么做才能解决?任何线索请