1、TopN的排序 Scala開發(fā)
讀入數(shù)據(jù)
1
4
2
5
7
9
11
23
1
2
5
6
7
8
topn排序結(jié)果
23
11
9
8
7
按隱士轉(zhuǎn)換,按字符串排序運(yùn)行結(jié)果
9
8
7
7
6
2、TOPN分組排序,使用JAVA開發(fā)
輸入文件
Spark 100
Hadoop 62
Flink 77
Kafka 91
Hadoop 93
Spark 78
Hadoop 69
Spark 98
Hadoop 62
Spark 99
Hadoop 61
Spark 70
Hadoop 75
Spark 88
Hadoop 68
Spark 90
Hadoop 61
運(yùn)行結(jié)果
Group key :Flink
77
null
null
null
null
**********************
Group key :Spark
100
99
98
90
88
**********************
Group key :Kafka
91
null
null
null
null
**********************
Group key :Hadoop
93
75
69
68
62
**********************
排序的幾個內(nèi)容:
3、二分算法,將key值放入對于的分區(qū)
在未接觸二分查找算法時,最通用的一種做法是,對數(shù)組進(jìn)行遍歷,跟每個元素進(jìn)行比較,其時間為O(n).但二分查找算法則
更優(yōu),因?yàn)槠洳檎視r間為O(lgn),譬如數(shù)組{1, 2, 3, 4, 5, 6, 7, 8, 9},查找元素6,用二分查找的算法執(zhí)行的話,
其順序?yàn)椋?br> 1.第一步查找中間元素,即5,由于5<6,則6必然在5之后的數(shù)組元素中,那么就在{6, 7, 8, 9}中查找,
2.尋找{6, 7, 8, 9}的中位數(shù),為7,7>6,則6應(yīng)該在7左邊的數(shù)組元素中,那么只剩下6,即找到了。
4、水桶抽樣算法,以下乘以3的原因
乘3的原因是RDD的分區(qū)可能有數(shù)據(jù)傾斜,sampleSize是期望的樣本大小,但是某些分區(qū)的數(shù)據(jù)量可能少于
sampleSize/PartitionNumber,乘以3后期望其他的分區(qū)可以多采樣點(diǎn)數(shù)據(jù),使得總的采樣量達(dá)到或超過sampleSize。
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
5、 分組TOPN排序
1.讀入每行數(shù)據(jù) JavaRDD<String> lines
2、生成pairs K,V鍵值對 JavaPairRDD<String, Integer> pairs
輸入一行的數(shù)據(jù)
輸出的KEY值是名稱,Value是分?jǐn)?shù) Iterable;
3、groupByKey按名稱進(jìn)行分組: JavaPairRDD<String, Iterable<Integer>> groupedPairs =pairs.groupByKey();
4、分組以后進(jìn)行排序
輸入groupdata,其中 KEY是名稱的組名,VALUE是分?jǐn)?shù)的集合
輸出 KEY:分組排序以后的組名,VALUE:是排序以后的分?jǐn)?shù)的集合 取5個值
JavaPairRDD<String, Iterable<Integer>> top5=groupedPairs.mapToPair(new
PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {
5、
call方法的實(shí)現(xiàn):
-定義top5的數(shù)組
-獲取 groupedData._1的分組的組名
-獲取 groupedData._2.iterator()的分?jǐn)?shù)的集合
-循環(huán)遍歷iterator()
while(groupedValue.hasNext()){
Integer value = groupedValue.next();
for (int i =0; i<5; i++){
if (top5[i] ==null) {
top5[i] = value ;
break;
} else if (value > top5[i]) {
for (int j = 4; j > i; j--){
top5[j] = top5[j-1];
}
top5[i]=value;
break;
}
}
}
6、打印輸出top5的值
topn的源代碼
package com.dt.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TopNBasic {
def main (ags:Array[String]) {
val conf = new SparkConf() //創(chuàng)建SparkConf對象
conf.setAppName("Wow,My TopN App!") //設(shè)置應(yīng)用程序的名稱,在程序運(yùn)行的監(jiān)控界面可以看到名稱
conf.setMaster("local") //此時,程序在本地運(yùn)行,不需要安裝Spark集群
val sc = new SparkContext(conf) //創(chuàng)建SparkContext對象,通過傳入SparkConf實(shí)例來定制Spark運(yùn)行的具體參數(shù)和配
置信息
val lines = sc.textFile("G://IMFBigDataSpark2016//tesdata//basicTopN.txt", 1) //讀取本地文件并設(shè)置為一個
Partion
val pairs=lines.map { line => (line.toInt,line) }
implicit val sortIntegersByString = new Ordering[Int]{
override def compare(a: Int, b: Int) =
a.toString.compare(b.toString)} //隱士轉(zhuǎn)換
val sortedPairs = pairs.sortByKey(false)
val sortedData =sortedPairs.map(pair => pair._2)
val top5=sortedData.take(5)
top5.foreach(println)
}
}
分組排序的源代碼JAVA
package com.dt.spark.SparkApps.cores;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class TopNGroup {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf().setAppName("TopNGroup").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf); //其底層實(shí)際上就是Scala的SparkContext
JavaRDD<String> lines = sc.textFile("G://IMFBigDataSpark2016//tesdata//topNGroup.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID =1L ;
@Override
public Tuple2<String, Integer> call(String line) throws Exception {
// TODO Auto-generated method stub
String[] splitedLine =line.split(" ");
System.out.println(splitedLine[0]);
return new Tuple2<String,Integer>(splitedLine[0],Integer.valueOf(splitedLine[1]));
}
});
JavaPairRDD<String, Iterable<Integer>> groupedPairs =pairs.groupByKey();
JavaPairRDD<String, Iterable<Integer>> top5=groupedPairs.mapToPair(new
PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedData)
throws Exception {
// TODO Auto-generated method stub
Integer[] top5=new Integer[5];
String groupedKey= groupedData._1;
Iterator<Integer> groupedValue = groupedData._2.iterator();
while(groupedValue.hasNext()){
Integer value = groupedValue.next();
for (int i =0; i<5; i++){
if (top5[i] ==null) {
top5[i] = value ;
break;
} else if (value > top5[i]) {
for (int j = 4; j > i; j--){
top5[j] = top5[j-1];
}
top5[i]=value;
break;
}
}
}
return new Tuple2<String, Iterable<Integer>>(groupedKey,Arrays.asList(top5));
}
}) ;
top5.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
@Override
public void call(Tuple2<String, Iterable<Integer>> topped) throws Exception {
// TODO Auto-generated method stub
System.out.println("Group key :"+ topped._1);
Iterator<Integer> toppedValue = topped._2.iterator();
while (toppedValue.hasNext()){
Integer value =toppedValue.next();
System.out.println(value);
}
System.out.println("**********************");
}
});
}
}
二分算法 附錄 參考
1、前提:二分查找的前提是需要查找的數(shù)組必須是已排序的,我們這里的實(shí)現(xiàn)默認(rèn)為升序
2、原理:將數(shù)組分為三部分,依次是中值(所謂的中值就是數(shù)組中間位置的那個值)前,中值,中值后;將要查找的值和數(shù)組
的中值進(jìn)行比較,若小于中值則在中值前面找,若大于中值則在中值后面找,等于中值時直接返回。然后依次是一個遞歸過程,
將前半部分或者后半部分繼續(xù)分解為三部分。 實(shí)現(xiàn)分為遞歸和循環(huán)兩種,可以根據(jù)代碼來理解算法
http://blog.csdn.net/cyxlzzs/article/details/7418337#
01.package org.cyxl.algorithm.search;
02.
03./**
04. * 二分查找
05. * @author cyxl
06. *
07. */
08.public class BinarySearch {
09. private int rCount=0;
10. private int lCount=0;
11.
12. /**
13. * 獲取遞歸的次數(shù)
14. * @return
15. */
16. public int getrCount() {
17. return rCount;
18. }
19.
20. /**
21. * 獲取循環(huán)的次數(shù)
22. * @return
23. */
24. public int getlCount() {
25. return lCount;
26. }
27.
28. /**
29. * 執(zhí)行遞歸二分查找,返回第一次出現(xiàn)該值的位置
30. * @param sortedData 已排序的數(shù)組
31. * @param start 開始位置
32. * @param end 結(jié)束位置
33. * @param findValue 需要找的值
34. * @return 值在數(shù)組中的位置,從0開始。找不到返回-1
35. */
36. public int searchRecursive(int[] sortedData,int start,int end,int findValue)
37. {
38. rCount++;
39. if(start<=end)
40. {
41. //中間位置
42. int middle=(start+end)>>1; //相當(dāng)于(start+end)/2
43. //中值
44. int middleValue=sortedData[middle];
45.
46. if(findValue==middleValue)
47. {
48. //等于中值直接返回
49. return middle;
50. }
51. else if(findValue<middleValue)
52. {
53. //小于中值時在中值前面找
54. return searchRecursive(sortedData,start,middle-1,findValue);
55. }
56. else
57. {
58. //大于中值在中值后面找
59. return searchRecursive(sortedData,middle+1,end,findValue);
60. }
61. }
62. else
63. {
64. //找不到
65. return -1;
66. }
67. }
68.
69. /**
70. * 循環(huán)二分查找,返回第一次出現(xiàn)該值的位置
71. * @param sortedData 已排序的數(shù)組
72. * @param findValue 需要找的值
73. * @return 值在數(shù)組中的位置,從0開始。找不到返回-1
74. */
75. public int searchLoop(int[] sortedData,int findValue)
76. {
77. int start=0;
78. int end=sortedData.length-1;
79.
80. while(start<=end)
81. {
82. lCount++;
83. //中間位置
84. int middle=(start+end)>>1; //相當(dāng)于(start+end)/2
85. //中值
86. int middleValue=sortedData[middle];
87.
88. if(findValue==middleValue)
89. {
90. //等于中值直接返回
91. return middle;
92. }
93. else if(findValue<middleValue)
94. {
95. //小于中值時在中值前面找
96. end=middle-1;
97. }
98. else
99. {
100. //大于中值在中值后面找
101. start=middle+1;
102. }
103. }
104. //找不到
105. return -1;
106. }
107.}
package com.dt.spark.SparkApps.cores;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class TopNGroup {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf().setAppName("TopNGroup").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf); //其底層實(shí)際上就是Scala的SparkContext
JavaRDD<String> lines = sc.textFile("G://IMFBigDataSpark2016//tesdata//topNGroup.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID =1L ;
@Override
public Tuple2<String, Integer> call(String line) throws Exception {
// TODO Auto-generated method stub
String[] splitedLine =line.split(" ");
System.out.println(splitedLine[0]);
return new Tuple2<String,Integer>(splitedLine[0],Integer.valueOf(splitedLine[1]));
}
});
JavaPairRDD<String, Iterable<Integer>> groupedPairs =pairs.groupByKey();
JavaPairRDD<String, Iterable<Integer>> top5=groupedPairs.mapToPair(new
PairFunction<Tuple2<String,Iterable<Integer>>, String, Iterable<Integer>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedData)
throws Exception {
// TODO Auto-generated method stub
Integer[] top5=new Integer[5];
String groupedKey= groupedData._1;
Iterator<Integer> groupedValue = groupedData._2.iterator();
while(groupedValue.hasNext()){
Integer value = groupedValue.next();
System.out.println(" 讀入一個 value : "+ value );
for (int i =0; i<5; i++){
System.out.println(" ************************************ [i] : "+ i
+" *************************************");
if (top5[i] ==null) {
top5[i] = value ;
System.out.println("top5 i : "+i+" + value =====: "+ top5[i]
);
break;
} else if (value > top5[i]) {
for (int j = 4; j > i; j--){
System.out.println(" #### [j] : "+ j +"
####");
top5[j] = top5[j-1];
System.out.println("top5[j] : "+ j + " " + top5[j] );
System.out.println("top5[j-1] : "+ " " + top5[j-1]
);
}
top5[i]=value;
System.out.println("top5 i : "+i+" + value =====: "+ top5[i]
);
break;
}
}
}
return new Tuple2<String, Iterable<Integer>>(groupedKey,Arrays.asList(top5));
}
}) ;
top5.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
@Override
public void call(Tuple2<String, Iterable<Integer>> topped) throws Exception {
// TODO Auto-generated method stub
System.out.println(" ========================分組topn結(jié)果
=====================");
System.out.println(" Group key :"+ topped._1);
Iterator<Integer> toppedValue = topped._2.iterator();
while (toppedValue.hasNext()){
Integer value =toppedValue.next();
System.out.println(" "+value);
}
System.out.println(" ========================分組topn結(jié)果
=====================");
}
});
}
}
打印的日志
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/03/06 10:45:10 INFO SparkContext: Running Spark version 1.6.0
16/03/06 10:45:12 INFO SecurityManager: Changing view acls to: admin
16/03/06 10:45:12 INFO SecurityManager: Changing modify acls to: admin
16/03/06 10:45:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with
view permissions: Set(admin); users with modify permissions: Set(admin)
16/03/06 10:45:13 INFO Utils: Successfully started service 'sparkDriver' on port 61015.
16/03/06 10:45:14 INFO Slf4jLogger: Slf4jLogger started
16/03/06 10:45:14 INFO Remoting: Starting remoting
16/03/06 10:45:14 INFO Remoting: Remoting started; listening on addresses :
[akka.tcp://sparkDriverActorSystem@192.168.3.6:61028]
16/03/06 10:45:14 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 61028.
16/03/06 10:45:14 INFO SparkEnv: Registering MapOutputTracker
16/03/06 10:45:14 INFO SparkEnv: Registering BlockManagerMaster
16/03/06 10:45:14 INFO DiskBlockManager: Created local directory at C:\Users\admin\AppData\Local\Temp\blockmgr-
5d7ea022-663b-4ef4-a2af-6acbb2388af9
16/03/06 10:45:14 INFO MemoryStore: MemoryStore started with capacity 146.2 MB
16/03/06 10:45:14 INFO SparkEnv: Registering OutputCommitCoordinator
16/03/06 10:45:14 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/03/06 10:45:14 INFO SparkUI: Started SparkUI at http://192.168.3.6:4040
16/03/06 10:45:14 INFO Executor: Starting executor ID driver on host localhost
16/03/06 10:45:14 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 61035.
16/03/06 10:45:14 INFO NettyBlockTransferService: Server created on 61035
16/03/06 10:45:14 INFO BlockManagerMaster: Trying to register BlockManager
16/03/06 10:45:14 INFO BlockManagerMasterEndpoint: Registering block manager localhost:61035 with 146.2 MB RAM,
BlockManagerId(driver, localhost, 61035)
16/03/06 10:45:14 INFO BlockManagerMaster: Registered BlockManager
16/03/06 10:45:15 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
16/03/06 10:45:15 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 114.9 KB, free
114.9 KB)
16/03/06 10:45:15 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB,
free 128.8 KB)
16/03/06 10:45:15 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:61035 (size: 13.9 KB,
free: 146.2 MB)
16/03/06 10:45:15 INFO SparkContext: Created broadcast 0 from textFile at TopNGroup.java:23
16/03/06 10:45:17 WARN : Your hostname, pc resolves to a loopback/non-reachable address:
fe80:0:0:0:bdb2:979:df5e:7337%eth18, but we couldn't find any external IP address!
16/03/06 10:45:19 INFO FileInputFormat: Total input paths to process : 1
16/03/06 10:45:19 INFO SparkContext: Starting job: foreach at TopNGroup.java:87
16/03/06 10:45:19 INFO DAGScheduler: Registering RDD 2 (mapToPair at TopNGroup.java:26)
16/03/06 10:45:19 INFO DAGScheduler: Got job 0 (foreach at TopNGroup.java:87) with 1 output partitions
16/03/06 10:45:19 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at TopNGroup.java:87)
16/03/06 10:45:19 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/03/06 10:45:19 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/03/06 10:45:19 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at mapToPair at
TopNGroup.java:26), which has no missing parents
16/03/06 10:45:19 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.0 KB, free
133.9 KB)
16/03/06 10:45:19 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.7 KB,
free 136.6 KB)
16/03/06 10:45:19 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:61035 (size: 2.7 KB,
free: 146.2 MB)
16/03/06 10:45:19 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/03/06 10:45:19 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at
mapToPair at TopNGroup.java:26)
16/03/06 10:45:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/03/06 10:45:19 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition
0,PROCESS_LOCAL, 2141 bytes)
16/03/06 10:45:19 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/03/06 10:45:19 INFO HadoopRDD: Input split: file:/G:/IMFBigDataSpark2016/tesdata/topNGroup.txt:0+177
16/03/06 10:45:20 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/03/06 10:45:20 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/03/06 10:45:20 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/03/06 10:45:20 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/03/06 10:45:20 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
Spark
Hadoop
Flink
Kafka
Hadoop
Spark
Hadoop
Spark
Hadoop
Spark
Hadoop
Spark
Hadoop
Spark
Hadoop
Spark
Hadoop
16/03/06 10:45:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver
16/03/06 10:45:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 133 ms on localhost (1/1)
16/03/06 10:45:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/03/06 10:45:20 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at TopNGroup.java:26) finished in 0.149 s
16/03/06 10:45:20 INFO DAGScheduler: looking for newly runnable stages
16/03/06 10:45:20 INFO DAGScheduler: running: Set()
16/03/06 10:45:20 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/03/06 10:45:20 INFO DAGScheduler: failed: Set()
16/03/06 10:45:20 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at mapToPair at
TopNGroup.java:39), which has no missing parents
16/03/06 10:45:20 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 6.3 KB, free
142.9 KB)
16/03/06 10:45:20 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.2 KB,
free 146.0 KB)
16/03/06 10:45:20 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:61035 (size: 3.2 KB,
free: 146.2 MB)
16/03/06 10:45:20 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/03/06 10:45:20 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at
mapToPair at TopNGroup.java:39)
16/03/06 10:45:20 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/03/06 10:45:20 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL,
1894 bytes)
16/03/06 10:45:20 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/03/06 10:45:20 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/03/06 10:45:20 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 4 ms
讀入一個 value : 77
************************************ [i] : 0 *************************************
top5 i : 0 + value =====: 77
========================分組topn結(jié)果=====================
Group key :Flink
77
null
null
null
null
========================分組topn結(jié)果=====================
讀入一個 value : 100
************************************ [i] : 0 *************************************
top5 i : 0 + value =====: 100
讀入一個 value : 78
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
top5 i : 1 + value =====: 78
讀入一個 value : 98
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
#### [j] : 4 ####
top5[j] : 4 null
top5[j-1] : null
#### [j] : 3 ####
top5[j] : 3 null
top5[j-1] : null
#### [j] : 2 ####
top5[j] : 2 78
top5[j-1] : 78
top5 i : 1 + value =====: 98
讀入一個 value : 99
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
#### [j] : 4 ####
top5[j] : 4 null
top5[j-1] : null
#### [j] : 3 ####
top5[j] : 3 78
top5[j-1] : 78
#### [j] : 2 ####
top5[j] : 2 98
top5[j-1] : 98
top5 i : 1 + value =====: 99
讀入一個 value : 70
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
************************************ [i] : 2 *************************************
************************************ [i] : 3 *************************************
************************************ [i] : 4 *************************************
top5 i : 4 + value =====: 70
讀入一個 value : 88
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
************************************ [i] : 2 *************************************
************************************ [i] : 3 *************************************
#### [j] : 4 ####
top5[j] : 4 78
top5[j-1] : 78
top5 i : 3 + value =====: 88
讀入一個 value : 90
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
************************************ [i] : 2 *************************************
************************************ [i] : 3 *************************************
#### [j] : 4 ####
top5[j] : 4 88
top5[j-1] : 88
top5 i : 3 + value =====: 90
========================分組topn結(jié)果=====================
Group key :Spark
100
99
98
90
88
========================分組topn結(jié)果=====================
讀入一個 value : 91
************************************ [i] : 0 *************************************
top5 i : 0 + value =====: 91
========================分組topn結(jié)果=====================
Group key :Kafka
91
null
null
null
null
========================分組topn結(jié)果=====================
讀入一個 value : 62
************************************ [i] : 0 *************************************
top5 i : 0 + value =====: 62
讀入一個 value : 93
************************************ [i] : 0 *************************************
#### [j] : 4 ####
top5[j] : 4 null
top5[j-1] : null
#### [j] : 3 ####
top5[j] : 3 null
top5[j-1] : null
#### [j] : 2 ####
top5[j] : 2 null
top5[j-1] : null
#### [j] : 1 ####
top5[j] : 1 62
top5[j-1] : 62
top5 i : 0 + value =====: 93
讀入一個 value : 69
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
#### [j] : 4 ####
top5[j] : 4 null
top5[j-1] : null
#### [j] : 3 ####
top5[j] : 3 null
top5[j-1] : null
#### [j] : 2 ####
top5[j] : 2 62
top5[j-1] : 62
top5 i : 1 + value =====: 69
讀入一個 value : 62
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
************************************ [i] : 2 *************************************
************************************ [i] : 3 *************************************
top5 i : 3 + value =====: 62
讀入一個 value : 61
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
************************************ [i] : 2 *************************************
************************************ [i] : 3 *************************************
************************************ [i] : 4 *************************************
top5 i : 4 + value =====: 61
讀入一個 value : 75
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
#### [j] : 4 ####
top5[j] : 4 62
top5[j-1] : 62
#### [j] : 3 ####
top5[j] : 3 62
top5[j-1] : 62
#### [j] : 2 ####
top5[j] : 2 69
top5[j-1] : 69
top5 i : 1 + value =====: 75
讀入一個 value : 68
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
************************************ [i] : 2 *************************************
************************************ [i] : 3 *************************************
#### [j] : 4 ####
top5[j] : 4 62
top5[j-1] : 62
top5 i : 3 + value =====: 68
讀入一個 value : 61
************************************ [i] : 0 *************************************
************************************ [i] : 1 *************************************
************************************ [i] : 2 *************************************
************************************ [i] : 3 *************************************
************************************ [i] : 4 *************************************
========================分組topn結(jié)果=====================
Group key :Hadoop
93
75
69
68
62
========================分組topn結(jié)果=====================
16/03/06 10:45:20 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver
16/03/06 10:45:20 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 55 ms on localhost (1/1)
16/03/06 10:45:20 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/03/06 10:45:20 INFO DAGScheduler: ResultStage 1 (foreach at TopNGroup.java:87) finished in 0.056 s
16/03/06 10:45:20 INFO DAGScheduler: Job 0 finished: foreach at TopNGroup.java:87, took 0.351727 s
16/03/06 10:45:20 INFO SparkContext: Invoking stop() from shutdown hook
16/03/06 10:45:20 INFO SparkUI: Stopped Spark web UI at http://192.168.3.6:4040
16/03/06 10:45:20 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/06 10:45:20 INFO MemoryStore: MemoryStore cleared
16/03/06 10:45:20 INFO BlockManager: BlockManager stopped
16/03/06 10:45:20 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/06 10:45:20 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/06 10:45:20 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/03/06 10:45:20 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with
flushing remote transports.
16/03/06 10:45:20 INFO SparkContext: Successfully stopped SparkContext
16/03/06 10:45:20 INFO ShutdownHookManager: Shutdown hook called
16/03/06 10:45:20 INFO ShutdownHookManager: Deleting directory C:\Users\admin\AppData\Local\Temp\spark-9adfba90
-32fa-49ed-b729-20617cdaac50
16/03/06 10:45:20 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
聯(lián)系客服