· 5 years ago · Dec 30, 2020, 08:40 PM
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package com.cloudera.flume.source;
20
21import java.util.HashMap;
22import java.util.Map;
23
24import org.apache.flume.Context;
25import org.apache.flume.Event;
26import org.apache.flume.EventDrivenSource;
27import org.apache.flume.channel.ChannelProcessor;
28import org.apache.flume.conf.Configurable;
29import org.apache.flume.event.EventBuilder;
30import org.apache.flume.source.AbstractSource;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import twitter4j.FilterQuery;
35import twitter4j.StallWarning;
36import twitter4j.Status;
37import twitter4j.StatusDeletionNotice;
38import twitter4j.StatusListener;
39import twitter4j.TwitterStream;
40import twitter4j.TwitterStreamFactory;
41import twitter4j.auth.AccessToken;
42import twitter4j.conf.ConfigurationBuilder;
43import twitter4j.json.DataObjectFactory;
44
45//Libraries to modify JSON
46import org.json.simple.JSONObject;
47import org.json.simple.JSONArray;
48import org.json.simple.parser.ParseException;
49import org.json.simple.parser.JSONParser;
50
51/**
52 * A Flume Source, which pulls data from Twitter's streaming API. Currently,
53 * this only supports pulling from the sample API, and only gets new status
54 * updates.
55 */
56public class TwitterSource extends AbstractSource implements EventDrivenSource, Configurable {
57
58 private static final Logger logger = LoggerFactory.getLogger(TwitterSource.class);
59
60 /** Information necessary for accessing the Twitter API */
61 private String consumerKey;
62 private String consumerSecret;
63 private String accessToken;
64 private String accessTokenSecret;
65
66 private String[] keywords;
67
68 /** The actual Twitter stream. It's set up to collect raw JSON data */
69 private TwitterStream twitterStream;
70 private int num_tweets = 0;
71
72 /**
73 * The initialization method for the Source. The context contains all the Flume
74 * configuration info, and can be used to retrieve any configuration values
75 * necessary to set up the Source.
76 */
77 @Override
78 public void configure(Context context) {
79 consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
80 consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
81 accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
82 accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
83
84 String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
85 if (keywordString.trim().length() == 0) {
86 keywords = new String[0];
87 } else {
88 keywords = keywordString.split(",");
89 for (int i = 0; i < keywords.length; i++) {
90 keywords[i] = keywords[i].trim();
91 }
92 }
93
94 ConfigurationBuilder cb = new ConfigurationBuilder();
95 cb.setOAuthConsumerKey(consumerKey);
96 cb.setOAuthConsumerSecret(consumerSecret);
97 cb.setOAuthAccessToken(accessToken);
98 cb.setOAuthAccessTokenSecret(accessTokenSecret);
99 cb.setJSONStoreEnabled(true);
100 cb.setIncludeEntitiesEnabled(true);
101
102 twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
103 }
104
105 /**
106 * Start processing events. This uses the Twitter Streaming API to sample
107 * Twitter, and process tweets.
108 */
109 @Override
110 public void start() {
111 // The channel is the piece of Flume that sits between the Source and Sink,
112 // and is used to process events.
113 final ChannelProcessor channel = getChannelProcessor();
114
115 final Map<String, String> headers = new HashMap<String, String>();
116
117 // The StatusListener is a twitter4j API, which can be added to a Twitter
118 // stream, and will execute methods every time a message comes in through
119 // the stream.
120 StatusListener listener = new StatusListener() {
121 // The onStatus method is executed every time a new tweet comes in.
122 public void onStatus(Status status) {
123 // The EventBuilder is used to build an event using the headers and
124 // the raw JSON of a tweet
125 String rawJson = DataObjectFactory.getRawJSON(status);
126 JSONParser parser = new JSONParser();
127 JSONObject statusJSON = null;
128 try {
129 statusJSON = (JSONObject) parser.parse(rawJson);
130 } catch (Exception e) {
131 e.printStackTrace();
132 }
133
134 if ((statusJSON.get("in_reply_to_status_id") == null) && (statusJSON.get("retweeted_status") == null)
135 && (statusJSON.get("quoted_status") == null)) {
136
137 // System.out.println("-------------");
138 // System.out.println(statusJSON.toString());
139 // System.out.println("-------------");
140 try {
141 JSONObject filtered_tweet = new JSONObject();
142 filtered_tweet.put("created_at", statusJSON.get("created_at"));
143 filtered_tweet.put("id", statusJSON.get("id"));
144 filtered_tweet.put("text", statusJSON.get("text"));
145 filtered_tweet.put("source", statusJSON.get("source"));
146 filtered_tweet.put("lang", statusJSON.get("lang"));
147 filtered_tweet.put("filter_level", statusJSON.get("filter_level"));
148
149 JSONObject user = new JSONObject();
150 JSONObject statusUser = (JSONObject) statusJSON.get("user");
151
152 user.put("id", statusUser.get("id"));
153 user.put("screen_name", statusUser.get("screen_name"));
154 user.put("lang", statusUser.get("lang"));
155 user.put("followers_count", statusUser.get("followers_count"));
156 user.put("friends_count", statusUser.get("friends_count"));
157
158 filtered_tweet.put("user", user);
159
160 // System.out.println("-------------");
161 // System.out.println(filtered_tweet.toString());
162 // System.out.println("-------------");
163
164 logger.debug("tweet arrived");
165 headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
166 Event event = EventBuilder.withBody(filtered_tweet.toString().getBytes(), headers);
167 channel.processEvent(event);
168
169 } catch (Exception e) {
170 System.out.println("Something went wrong.");
171 e.printStackTrace();
172 }
173 }
174 }
175
176 // This listener will ignore everything except for new tweets
177 public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
178 }
179
180 public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
181 }
182
183 public void onScrubGeo(long userId, long upToStatusId) {
184 }
185
186 public void onException(Exception ex) {
187 }
188
189 public void onStallWarning(StallWarning warning) {
190 }
191 };
192
193 logger.debug("Setting up Twitter sample stream using consumer key {} and" + " access token {}",
194 new String[] { consumerKey, accessToken });
195 // Set up the stream's listener (defined above),
196 twitterStream.addListener(listener);
197
198 // Set up a filter to pull out industry-relevant tweets
199 if (keywords.length == 0) {
200 logger.debug("Starting up Twitter sampling...");
201 twitterStream.sample();
202 } else {
203 logger.debug("Starting up Twitter filtering...");
204
205 FilterQuery query = new FilterQuery().track(keywords);
206 twitterStream.filter(query);
207 }
208 super.start();
209 }
210
211 /**
212 * Stops the Source's event processing and shuts down the Twitter stream.
213 */
214 @Override
215 public void stop() {
216 logger.debug("Shutting down Twitter sample stream...");
217 twitterStream.shutdown();
218 super.stop();
219 }
220}
221