1、RDD與DataFrame轉(zhuǎn)換的重大意義
2、使用Java實戰(zhàn)RDD與DataFrame轉(zhuǎn)換
3、使用Scala實戰(zhàn)RDD與DataFrame轉(zhuǎn)換
RDD接上數(shù)據(jù)庫、接上文件系統(tǒng),無限想象空間~,極大加速和簡化了大數(shù)據(jù)開發(fā)
通過反射來預(yù)測轉(zhuǎn)換
case class/JavaBean適合于知道RDD的元數(shù)據(jù),
不知道RDD的元數(shù)據(jù)動態(tài)獲取元數(shù)據(jù)
JavaBean不支持嵌套,也不可以有復(fù)雜數(shù)據(jù)結(jié)構(gòu)(List等)
Person.class傳進(jìn)去后,會以反射的方式創(chuàng)建DataFrame
遇到錯誤
java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class com.tom.spark.SparkApps.sql.Person with modifiers "public" at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296) at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288) at java.lang.reflect.Method.invoke(Method.java:490) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1358) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1356) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)16/09/07 12:42:35 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class com.tom.spark.SparkApps.sql.Person with modifiers "public" at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296) at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288) at java.lang.reflect.Method.invoke(Method.java:490) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1358) at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1356) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
需要將Person類放到一個單獨的文件并標(biāo)記為public
又遇到一個錯誤
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String at org.apache.spark.sql.Row$class.getString(Row.scala:250) at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:192) at com.tom.spark.SparkApps.sql.RDD2DataFrameByReflection$2.call(RDD2DataFrameByReflection.java:57) at com.tom.spark.SparkApps.sql.RDD2DataFrameByReflection$2.call(RDD2DataFrameByReflection.java:1) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
原因是列的順序與原來不一致,列被排序了
Java實現(xiàn)
public static void ByReflection(){ SparkConf conf = new SparkConf().setAppName("RDD2DataFrameByReflection").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> lines = sc.textFile("F:\\sparkData\\personBean.txt"); JavaRDD<Person> persons = lines.map(new Function<String, Person>() { public Person call(String line) throws Exception { // TODO Auto-generated method stub String[] arr = line.split(","); Person p = new Person(); p.setId(Integer.valueOf(arr[0].trim())); p.setName(arr[1].trim()); p.setAge(Integer.valueOf(arr[2].trim())); return p; } }); //在底層,通過反射的方式獲得Person的所有fields,結(jié)合RDD本身,就生成了DataFrame DataFrame df = sqlContext.createDataFrame(persons, Person.class); //注冊成臨時表,在臨時表上就可以寫SQL df.registerTempTable("persons"); DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6"); bigDatas.show(); JavaRDD<Row> row = bigDatas.javaRDD(); JavaRDD<Person> result = row.map(new Function<Row, Person>() { public Person call(Row row) throws Exception { // TODO Auto-generated method stub Person p = new Person(); p.setId(row.getInt(1)); p.setName(row.getString(2)); p.setAge(row.getInt(0)); return p; } }); List<Person> personlist = result.collect(); for(Person p : personlist) { System.out.println(p); } }
Scala實現(xiàn)
def ByReflection(): Unit = { val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val lines = sc.textFile("F:\\sparkData\\personBean.txt") val persons = lines.map(line => { val arr = line.split(",") Person(arr(0).toInt, arr(1), arr(2).toInt) }) val df = sqlContext.createDataFrame(persons) df.registerTempTable("persons") df.printSchema() val bigDatas = sqlContext.sql("select * from persons where age >= 6") bigDatas.show() val result = bigDatas.rdd.map(row => { Person(row.getInt(0), row.getString(1), row.getInt(2)) }) result.collect().foreach(println) }
聯(lián)系客服