我正在使用MongoDB-Hadoop连接器读取具有嵌入式文档的集合。
JSON集合:PersonaMetaData
{
"user_id" : NumberLong(2),
"persona_created" : true,
"persona_createdAt" : ISODate("2016-02-24T06:41:49.761Z"),
"persona" : [{"persona_type" : 1,
"created_using_algo" : "Name of the algo",
"version_algo" : "1.0",
"createdAt" : ISODate("2016-02-24T06:41:49.761Z"),
"persona_items": {"key1":"value1", "key2": "value2"} }]
}
我创建了以下类来表示集合中的数据
class Persona_Items implements Serializable
{
private int key1;
private String key2;
// Getter/Setter and constructor
}
class Persona implements Serializable
{
String persona_type;
String created_using_algo
String version_algo
long createdAt;
List<Persona_Items> listPersonaItems;
// Getter/setter and constructor
}
class PersonaMetaData implements Serializable
{
long user_id;
boolean persona_created;
long persona_createdAt;
List<Persona> listPersona;
// Getter/setter and constructor
}
然后将其用作
// RDD representing the complete collection
JavaPairRDD<Object, BSONObject> bsonRdd = sc.newAPIHadoopRDD(inputConfig,
com.mongodb.hadoop.MongoInputFormat.class,
Object.class, BSONObject.class);
// Get RDD of PersonaMetaData
JavaRDD<PersonaMetaData> metaDataSchemaJavaRDD =
bsonRdd.map(new Function<Tuple2<Object, BSONObject>, PersonaMetaData >() {
@Override
public PersonaMetaData call(Tuple2<Object, BSONObject> objectBSONObjectTuple2)
throws Exception { // Parse the BSON object and return a new PersonaMetaData object }
// Convert into DataFrame
dataFrame= sqlContext.createDataFrame(metaDataSchemaJavaRDD,
PersonaMetaData.class);
例外
scala.MatchError:io.abc.spark.schema.PersonaMetaData @ 31ff5060(io.abc.spark.schema.PersonaMetaData类)位于org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalyscalTypeI255(io.abc.spark.schema.PersonaMetaData类) )在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(102)上的org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) apache.spark.sql.catalyst.CatalystTypeConverters $ ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:169)位于org.apache.spark.sql.catalyst.CatalystTypeConverters $ ArrayConverter.toCatalystImpl(CatalystTypeConverters。 org.apache.spark上的sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)。sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.apply(CatalystTypeConverters.scala:401)at org.apache.spark.sql.SQLContext $$ anonfun $ 9 $$ anonfun $ apply $ 1 $$ anonfun $ apply $ 2。 .scala:500),位于org.apache.spark.sql.SQLContext $$ anonfun $ 9 $$ anonfun $ apply $ 1 $$ anonfun $ apply $ 2.apply(SQLContext.scala:500),位于scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244)在scala.collection.TraversableLike $$ anonfun $ map $ 1.apply(TraversableLike.scala:244)在scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33)在scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps。)上的scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:108)在scala.collection.TraversableLike $ class.map(TraversableLike.scala:244)在scala.collection.mutable.ArrayOps $ ofRef.map(ArrayOps。 scala:108),位于org.apache.spark.sql.SQLContext $$ anonfun $ 9 $$ anonfun $ apply $ 1。在org.apache.spark.sql.SQLContext上应用(SQLContext.scala:500)$$ anonfun $ 9 $$ anonfun $ apply $ 1.apply(SQLContext.scala:498)
类中没有任何列表,运行不会有任何问题。
正如在《Spark SQL的使用反射推断架构》一节中明确指出的那样,《 DataFrames and Datasets Guide》
Spark SQL不支持包含嵌套或包含复杂类型(例如列表或数组)的JavaBean。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句