· 5 years ago · Dec 30, 2020, 08:44 PM
1/**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19package com.cloudera.flume.source;
20
21import java.util.HashMap;
22import java.util.Map;
23
24import org.apache.flume.Context;
25import org.apache.flume.Event;
26import org.apache.flume.EventDrivenSource;
27import org.apache.flume.channel.ChannelProcessor;
28import org.apache.flume.conf.Configurable;
29import org.apache.flume.event.EventBuilder;
30import org.apache.flume.source.AbstractSource;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import twitter4j.FilterQuery;
35import twitter4j.StallWarning;
36import twitter4j.Status;
37import twitter4j.StatusDeletionNotice;
38import twitter4j.StatusListener;
39import twitter4j.TwitterStream;
40import twitter4j.TwitterStreamFactory;
41import twitter4j.auth.AccessToken;
42import twitter4j.conf.ConfigurationBuilder;
43import twitter4j.json.DataObjectFactory;
44
45//Libraries to modify JSON
46import org.json.simple.JSONObject;
47import org.json.simple.JSONArray;
48import org.json.simple.parser.ParseException;
49import org.json.simple.parser.JSONParser;
50
51/**
52 * A Flume Source, which pulls data from Twitter's streaming API. Currently,
53 * this only supports pulling from the sample API, and only gets new status
54 * updates.
55 */
56public class TwitterSource extends AbstractSource
57 implements EventDrivenSource, Configurable {
58
59 private static final Logger logger =
60 LoggerFactory.getLogger(TwitterSource.class);
61
62 /** Information necessary for accessing the Twitter API */
63 private String consumerKey;
64 private String consumerSecret;
65 private String accessToken;
66 private String accessTokenSecret;
67
68 private String[] keywords;
69
70 /** The actual Twitter stream. It's set up to collect raw JSON data */
71 private TwitterStream twitterStream;
72 private int num_tweets = 0;
73
74 /**
75 * The initialization method for the Source. The context contains all the
76 * Flume configuration info, and can be used to retrieve any configuration
77 * values necessary to set up the Source.
78 */
79 @Override
80 public void configure(Context context) {
81 consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
82 consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);
83 accessToken = context.getString(TwitterSourceConstants.ACCESS_TOKEN_KEY);
84 accessTokenSecret = context.getString(TwitterSourceConstants.ACCESS_TOKEN_SECRET_KEY);
85
86 String keywordString = context.getString(TwitterSourceConstants.KEYWORDS_KEY, "");
87 if (keywordString.trim().length() == 0) {
88 keywords = new String[0];
89 } else {
90 keywords = keywordString.split(",");
91 for (int i = 0; i < keywords.length; i++) {
92 keywords[i] = keywords[i].trim();
93 }
94 }
95
96 ConfigurationBuilder cb = new ConfigurationBuilder();
97 cb.setOAuthConsumerKey(consumerKey);
98 cb.setOAuthConsumerSecret(consumerSecret);
99 cb.setOAuthAccessToken(accessToken);
100 cb.setOAuthAccessTokenSecret(accessTokenSecret);
101 cb.setJSONStoreEnabled(true);
102 cb.setIncludeEntitiesEnabled(true);
103
104 twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
105 }
106
107 /**
108 * Start processing events. This uses the Twitter Streaming API to sample
109 * Twitter, and process tweets.
110 */
111 @Override
112 public void start() {
113 // The channel is the piece of Flume that sits between the Source and Sink,
114 // and is used to process events.
115 final ChannelProcessor channel = getChannelProcessor();
116
117 final Map<String, String> headers = new HashMap<String, String>();
118
119 // The StatusListener is a twitter4j API, which can be added to a Twitter
120 // stream, and will execute methods every time a message comes in through
121 // the stream.
122 StatusListener listener = new StatusListener() {
123 // The onStatus method is executed every time a new tweet comes in.
124 public void onStatus(Status status) {
125 // The EventBuilder is used to build an event using the headers and
126 // the raw JSON of a tweet
127 if(!status.isRetweet() && status.getInReplyToStatusId() == 0){
128 String rawJson = DataObjectFactory.getRawJSON(status);
129 try{
130 JSONParser parser = new JSONParser();
131 JSONObject jsonStatus = (JSONObject) parser.parse(rawJson);
132 JSONObject filtered_tweet = new JSONObject();
133
134 filtered_tweet.put(jsonStatus.getString("created_at"));
135 filtered_tweet.put(jsonStatus.getString("id"));
136 filtered_tweet.put(jsonStatus.getString("text"));
137 filtered_tweet.put(jsonStatus.getString("source"));
138 filtered_tweet.put(jsonStatus.getString("lang"));
139 filtered_tweet.put(jsonStatus.getString("filter_level"));
140 JSONObject user = json.getJSONObject("user");
141
142 JSONObject filteredUser = new JSONObject();
143 filteredUser.put(user.getString("id"));
144 filteredUser.put(user.getString("screen_name"));
145 filteredUser.put(user.getString("lang"));
146 filteredUser.put(user.getString("followers_count"));
147 filteredUser.put(user.getString("friends_count"));
148
149 filtered_tweet.put(filteredUser);
150
151 logger.debug("tweet arrived");
152 headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
153 Event event = EventBuilder.withBody(filtered_tweet.toString().getBytes(), headers);
154 channel.processEvent(event);
155 num_tweets++;
156
157 if (num_tweets == 200) {
158 stop();
159 }
160
161 }catch(Exception e){
162 System.out.println("Something went wrong.");
163 e.printStackTrace();
164 }
165 }
166 }
167
168 // This listener will ignore everything except for new tweets
169 public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
170 public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
171 public void onScrubGeo(long userId, long upToStatusId) {}
172 public void onException(Exception ex) {}
173 public void onStallWarning(StallWarning warning) {}
174 };
175
176 logger.debug("Setting up Twitter sample stream using consumer key {} and" +
177 " access token {}", new String[] { consumerKey, accessToken });
178 // Set up the stream's listener (defined above),
179 twitterStream.addListener(listener);
180
181 // Set up a filter to pull out industry-relevant tweets
182 if (keywords.length == 0) {
183 logger.debug("Starting up Twitter sampling...");
184 twitterStream.sample();
185 } else {
186 logger.debug("Starting up Twitter filtering...");
187
188 FilterQuery query = new FilterQuery().track(keywords);
189 twitterStream.filter(query);
190 }
191 super.start();
192 }
193
194 /**
195 * Stops the Source's event processing and shuts down the Twitter stream.
196 */
197 @Override
198 public void stop() {
199 logger.debug("Shutting down Twitter sample stream...");
200 twitterStream.shutdown();
201 super.stop();
202 }
203}
204