九色国产,午夜在线视频,新黄色网址,九九色综合,天天做夜夜做久久做狠狠,天天躁夜夜躁狠狠躁2021a,久久不卡一区二区三区

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
大數(shù)據(jù)IMF傳奇行動絕密課程第59課:使用Java和Scala在IDE中實戰(zhàn)RDD和DataFrame轉(zhuǎn)換操作

1、RDD與DataFrame轉(zhuǎn)換的重大意義
2、使用Java實戰(zhàn)RDD與DataFrame轉(zhuǎn)換
3、使用Scala實戰(zhàn)RDD與DataFrame轉(zhuǎn)換
RDD接上數(shù)據(jù)庫、接上文件系統(tǒng),無限想象空間~,極大加速和簡化了大數(shù)據(jù)開發(fā)
通過反射來預(yù)測轉(zhuǎn)換
case class/JavaBean適合于知道RDD的元數(shù)據(jù),
不知道RDD的元數(shù)據(jù)動態(tài)獲取元數(shù)據(jù)
JavaBean不支持嵌套,也不可以有復(fù)雜數(shù)據(jù)結(jié)構(gòu)(List等)
Person.class傳進(jìn)去后,會以反射的方式創(chuàng)建DataFrame

遇到錯誤

java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class com.tom.spark.SparkApps.sql.Person with modifiers "public"    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)    at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)    at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)    at java.lang.reflect.Method.invoke(Method.java:490)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1358)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1356)    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)    at scala.collection.Iterator$class.foreach(Iterator.scala:727)    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)    at scala.collection.AbstractIterator.to(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)	at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    at java.lang.Thread.run(Thread.java:745)16/09/07 12:42:35 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class com.tom.spark.SparkApps.sql.Person with modifiers "public"    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)    at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)    at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)    at java.lang.reflect.Method.invoke(Method.java:490)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1358)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1356)    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)    at scala.collection.Iterator$class.foreach(Iterator.scala:727)    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)    at scala.collection.AbstractIterator.to(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)	at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    at java.lang.Thread.run(Thread.java:745)

需要將Person類放到一個單獨的文件并標(biāo)記為public

又遇到一個錯誤

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String    at org.apache.spark.sql.Row$class.getString(Row.scala:250)    at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:192)    at com.tom.spark.SparkApps.sql.RDD2DataFrameByReflection$2.call(RDD2DataFrameByReflection.java:57)    at com.tom.spark.SparkApps.sql.RDD2DataFrameByReflection$2.call(RDD2DataFrameByReflection.java:1)    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)    at scala.collection.Iterator$class.foreach(Iterator.scala:727)    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)    at scala.collection.AbstractIterator.to(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    at java.lang.Thread.run(Thread.java:745)

原因是列的順序與原來不一致,列被排序了

Java實現(xiàn)

    public static void ByReflection(){        SparkConf conf = new SparkConf().setAppName("RDD2DataFrameByReflection").setMaster("local");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);        JavaRDD<String> lines = sc.textFile("F:\\sparkData\\personBean.txt");        JavaRDD<Person> persons = lines.map(new Function<String, Person>() {            public Person call(String line) throws Exception {                // TODO Auto-generated method stub                String[] arr = line.split(",");                Person p = new Person();                p.setId(Integer.valueOf(arr[0].trim()));                p.setName(arr[1].trim());                p.setAge(Integer.valueOf(arr[2].trim()));                return p;            }                   });        //在底層,通過反射的方式獲得Person的所有fields,結(jié)合RDD本身,就生成了DataFrame        DataFrame df = sqlContext.createDataFrame(persons, Person.class);        //注冊成臨時表,在臨時表上就可以寫SQL        df.registerTempTable("persons");        DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6");        bigDatas.show();        JavaRDD<Row> row = bigDatas.javaRDD();        JavaRDD<Person> result = row.map(new Function<Row, Person>() {            public Person call(Row row) throws Exception {                // TODO Auto-generated method stub                Person p = new Person();                p.setId(row.getInt(1));                p.setName(row.getString(2));                p.setAge(row.getInt(0));                return p;            }        });        List<Person> personlist = result.collect();        for(Person p : personlist) {            System.out.println(p);        }    }

Scala實現(xiàn)

  def ByReflection(): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val lines = sc.textFile("F:\\sparkData\\personBean.txt")    val persons = lines.map(line => {      val arr = line.split(",")      Person(arr(0).toInt, arr(1), arr(2).toInt)    })    val df = sqlContext.createDataFrame(persons)    df.registerTempTable("persons")    df.printSchema()    val bigDatas = sqlContext.sql("select * from persons where age >= 6")    bigDatas.show()    val result = bigDatas.rdd.map(row => {      Person(row.getInt(0), row.getString(1), row.getInt(2))    })    result.collect().foreach(println)  }
本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
spark
SparkSQL結(jié)合SparkStreaming,使用SQL完成實時計算中的數(shù)據(jù)統(tǒng)計
SparkSession簡單介紹
Spark入門:讀寫Parquet(DataFrame)
理解Spark SQL(二)—— SQLContext和HiveContext
sparksql 報錯Container killed by YARN for exceeding memory limits. xGB of x GB physical memory used. C
更多類似文章 >>
生活服務(wù)
熱點新聞
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服