· 7 years ago · Dec 05, 2018, 08:48 PM
1public class MainApp {
2
3private final String logFile= Properties.getString("SparkLogFileDir");
4private static final String KAFKA_GROUPID = Properties.getString("KafkaGroupId");
5private static final String ZOOKEEPER_URL = Properties.getString("ZookeeperURL");
6private static final String KAFKA_BROKER = Properties.getString("KafkaBroker");
7private static final String KAFKA_TOPIC = Properties.getString("KafkaTopic");
8private static final String Database = Properties.getString("HiveDatabase");
9private static final Integer KAFKA_PARA = Properties.getInt("KafkaParrallel");
10
11public static void main(String[] args){
12 //set settings
13 String sql="";
14
15 //START APP
16 System.out.println("Starting NPI_TWITTERAPP...." + new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
17 System.out.println("Configuring Settings...."+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
18 SparkConf conf = new SparkConf()
19 .setAppName(Properties.getString("SparkAppName"))
20 .setMaster(Properties.getString("SparkMasterUrl"));
21
22 //Set Spark/hive/sql Context
23 JavaSparkContext sc = new JavaSparkContext(conf);
24 JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(5000));
25 JavaHiveContext HiveSqlContext = new JavaHiveContext(sc);
26
27 //Check if Twitter Hive Table Exists
28 try {
29 HiveSqlContext.sql("DROP TABLE IF EXISTS "+Database+"TWITTERSTORE");
30 HiveSqlContext.sql("CREATE TABLE IF NOT EXISTS "+Database+".TWITTERSTORE "
31 +" (created_at String, id String, id_str String, text String, source String, truncated String, in_reply_to_user_id String, processed_at String, lon String, lat String)"
32 +" STORED AS TEXTFILE");
33 }catch(Exception e){
34 System.out.println(e);
35 }
36 //Check if Ivapp Table Exists
37
38 sql ="CREATE TABLE IF NOT EXISTS "+Database+".IVAPPGEO AS SELECT DISTINCT a.LATITUDE, a.LONGITUDE, b.ODNCIRCUIT_OLT_CLLI, b.ODNCIRCUIT_OLT_TID, a.CITY, a.STATE, a.ZIP FROM "
39 +Database+".T_PONNMS_SERVICE B, "
40 +Database+".CLLI_LATLON_MSTR A WHERE a.BID_CLLI = substr(b.ODNCIRCUIT_OLT_CLLI,0,8)";
41 try {
42 System.out.println(sql + new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
43 HiveSqlContext.sql(sql);
44
45 sql = "SELECT LATITUDE, LONGITUDE, ODNCIRCUIT_OLT_CLLI, ODNCIRCUIT_OLT_TID, CITY, STATE, ZIP FROM "+Database+".IVAPPGEO";
46
47 JavaSchemaRDD RDD_IVAPPGEO = HiveSqlContext.sql(sql).cache();
48
49 }catch(Exception e){
50 System.out.println(sql + new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
51 }
52
53 //JavaHiveContext hc = new JavaHiveContext();
54 System.out.println("Retrieve Data from Kafka Topic: "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
55 Map<String, Integer> topicMap = new HashMap<String, Integer>();
56 topicMap.put(KAFKA_TOPIC,KAFKA_PARA);
57
58 JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(
59 jssc, KAFKA_GROUPID, ZOOKEEPER_URL, topicMap);
60
61 JavaDStream<String> json = messages.map(
62 new Function<Tuple2<String, String>, String>() {
63 private static final long serialVersionUID = 42l;
64 @Override
65 public String call(Tuple2<String, String> message) {
66 return message._2();
67 }
68 }
69 );
70 System.out.println("Completed Kafka Messages... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
71
72
73 System.out.println("Filtering Resultset... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
74
75 JavaPairDStream<Long, String> tweets = json.mapToPair(
76 new TwitterFilterFunction());
77
78 JavaPairDStream<Long, String> filtered = tweets.filter(
79 new Function<Tuple2<Long, String>, Boolean>() {
80 private static final long serialVersionUID = 42l;
81 @Override
82 public Boolean call(Tuple2<Long, String> tweet) {
83 return tweet != null;
84 }
85 }
86 );
87
88 JavaDStream<Tuple2<Long, String>> tweetsFiltered = filtered.map(
89 new TextFilterFunction());
90
91 tweetsFiltered = tweetsFiltered.map(
92 new StemmingFunction());
93
94 System.out.println("Finished Filtering Resultset... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
95
96
97
98 System.out.println("Processing Sentiment Data... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
99
100 //calculate postive tweets
101 JavaPairDStream<Tuple2<Long, String>, Float> positiveTweets =
102 tweetsFiltered.mapToPair(new PositiveScoreFunction());
103 //calculate negative tweets
104 JavaPairDStream<Tuple2<Long, String>, Float> negativeTweets =
105 tweetsFiltered.mapToPair(new NegativeScoreFunction());
106
107 JavaPairDStream<Tuple2<Long, String>, Tuple2<Float, Float>> joined =
108 positiveTweets.join(negativeTweets);
109
110 //Score tweets
111 JavaDStream<Tuple4<Long, String, Float, Float>> scoredTweets =
112 joined.map(new Function<Tuple2<Tuple2<Long, String>,
113 Tuple2<Float, Float>>,
114 Tuple4<Long, String, Float, Float>>() {
115 private static final long serialVersionUID = 42l;
116 @Override
117 public Tuple4<Long, String, Float, Float> call(
118 Tuple2<Tuple2<Long, String>, Tuple2<Float, Float>> tweet)
119 {
120 return new Tuple4<Long, String, Float, Float>(
121 tweet._1()._1(),
122 tweet._1()._2(),
123 tweet._2()._1(),
124 tweet._2()._2());
125 }
126 });
127
128 System.out.println("Finished Processing Sentiment Data... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
129
130 System.out.println("Outputting Tweets Data to flat file "+Properties.getString("HdfsOutput")+" ... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
131
132 JavaDStream<Tuple5<Long, String, Float, Float, String>> result =
133 scoredTweets.map(new ScoreTweetsFunction());
134
135 result.foreachRDD(new FileWriter());
136
137 System.out.println("Outputting Sentiment Data to Hive... "+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
138
139
140 jssc.start();
141 jssc.awaitTermination();
142}
143
144getOrCreate(): SparkContext
145getOrCreate(conf: SparkConf): SparkContext
146
147import org.apache.spark.SparkContext
148val sc = SparkContext.getOrCreate()
149
150// Using an explicit SparkConf object
151import org.apache.spark.SparkConf
152val conf = new SparkConf()
153 .setMaster("local[*]")
154 .setAppName("SparkMe App")
155val sc = SparkContext.getOrCreate(conf)
156
157public class ContextManager {
158
159private static JavaSparkContext context;
160private static String currentType;
161
162private ContextManager() {}
163
164public static JavaSparkContext getContext(String type) {
165
166if(type == currentType && context != null) {
167
168 return context;
169}
170else if (type == "streaming"){
171
172 .. clean up the current context ..
173 .. initialize the context to streaming context ..
174 currentType = type;
175}
176else {
177 ..clean up the current context..
178 ... initialize the context to normal context ..
179 currentType = type;
180
181
182 }
183
184 return context;
185
186 }
187
188}
189
190SparkConf sparkConfig = new SparkConf().setAppName("foo");
191JavaStreamingContext jssc = new JavaStreamingContext(sparkConfig, Duration.seconds(30));
192SqlContext sqlContext = new SqlContext(jssc.sparkContext());