· 6 years ago · Nov 15, 2019, 12:36 AM
1import json
2import requests
3from kafka import KafkaClient, SimpleProducer
4import schedule
5import datetime
6import time
7# Get the API Key
8API_KEY = '6ae0ae17ad4a704d053d600d126b2b51'
9class WeatherProducer():
10 def __init__(self):
11 self.api_url = 'http://api.openweathermap.org/data/2.5/weather/'
12 self.kafka = KafkaClient("localhost:9092")
13 self.producer = SimpleProducer(self.kafka)
14
15 def get_curr_weather(self):
16 self.params = {'q': 'new york', 'appid': API_KEY}
17 r = requests.get(url = self.api_url, params = self.params)
18 data = r.json()
19
20 timestamp = datetime.datetime.now()
21 data['timestamp'] = timestamp
22
23 return data
24
25 def myconverter(self, o):
26 if isinstance(o, datetime.datetime):
27 return o.__str__()
28 def send_to_kafka(self):
29 data = self.get_curr_weather()
30 try:
31 self.producer.send_messages('weather', json.dumps(data, default=self.myconverter).encode('utf-8'))
32 print("Successfully sent to kafka")
33 except BaseException as e:
34 print("Error on_data %s" % str(e))
35def job():
36 weather_producer = WeatherProducer()
37 weather_producer.send_to_kafka()
38if __name__=='__main__':
39 schedule.every(1).minutes.do(job)
40 while 1:
41 schedule.run_pending()
42 time.sleep(1)