我整天都用Google搜索,但找不到答案,所以最后在这里发布了一个问题。
我有一个包含行分隔的json对象的文件:
{"device_id": "103b", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}
{"device_id": "103b", "timestamp": 1435677490, "rooms": ["Office", "Lab"]}
{"device_id": "103b", "timestamp": 1436673850, "rooms": ["Office", "Foyer"]}
我的目标是使用Java中的Apache Spark解析此文件。我引用了How to Parsing CSV or JSON File with Apache Spark,到目前为止,我可以使用Gson成功地将json的每一行解析为JavaRDD。
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> data = sc.textFile("fileName");
JavaRDD<JsonObject> records = data.map(new Function<String, JsonObject>() {
public JsonObject call(String line) throws Exception {
Gson gson = new Gson();
JsonObject json = gson.fromJson(line, JsonObject.class);
return json;
}
});
我真正陷入困境的是我想反序列化“rooms”数组,以便它适合我的类Event。
public class Event implements Serializable {
public static final long serialVersionUID = 42L;
private String deviceId;
private int timestamp;
private String room;
// constructor , getters and setters
}
换句话说,从这一行:
{"device_id": "103b", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}
我想在Spark中创建两个Event对象:
obj1: deviceId = "103b", timestamp = 1436941050, room = "Office"
obj2: deviceId = "103b", timestamp = 1436941050, room = "Foyer"
我做了我的小搜索,尝试了flatMapVlue,但没有运气......它给我一个错误......
JavaRDD<Event> events = records.flatMapValue(new Function<JsonObject, Iterable<Event>>() {
public Iterable<Event> call(JsonObject json) throws Exception {
JsonArray rooms = json.get("rooms").getAsJsonArray();
List<Event> data = new LinkedList<Event>();
for (JsonElement room : rooms) {
data.add(new Event(json.get("device_id").getAsString(), json.get("timestamp").getAsInt(), room.toString()));
}
return data;
}
});
我是Spark和Map / Reduce的新手。如果你能帮助我,我将不胜感激。提前致谢!
如果你将json数据加载到DataFrame
:
DataFrame df = sqlContext.read().json("/path/to/json");
您可以通过explode
轻松完成此操作。
df.select(
df.col("device_id"),
df.col("timestamp"),
org.apache.spark.sql.functions.explode(df.col("rooms")).as("room")
);
输入:
{"device_id": "1", "timestamp": 1436941050, "rooms": ["Office", "Foyer"]}
{"device_id": "2", "timestamp": 1435677490, "rooms": ["Office", "Lab"]}
{"device_id": "3", "timestamp": 1436673850, "rooms": ["Office", "Foyer"]}
你会得到:
+---------+------+----------+
|device_id| room| timestamp|
+---------+------+----------+
| 1|Office|1436941050|
| 1| Foyer|1436941050|
| 2|Office|1435677490|
| 2| Lab|1435677490|
| 3|Office|1436673850|
| 3| Foyer|1436673850|
+---------+------+----------+
val formatrecord = records.map(fromJson[mapClass](_))
mapClass应该是一个case类,用于映射记录json中的对象。