This repository has been archived by the owner on Dec 18, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 75
/
twitter.py
82 lines (67 loc) · 2.78 KB
/
twitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# author = rhnvrm <[email protected]>
import os
import re
import tweepy
from tweepy import OAuthHandler
from textblob import TextBlob
class TwitterClient(object):
'''
Generic Twitter Class for the App
'''
def __init__(self, query, retweets_only=False, with_sentiment=False):
# keys and tokens from the Twitter Dev Console
consumer_key = os.environ['CONSUMER_KEY']
consumer_secret = os.environ['CONSUMER_SECRET']
access_token = os.environ['ACCESS_TOKEN']
access_token_secret = os.environ['ACCESS_TOKEN_SECRET']
# Attempt authentication
try:
self.auth = OAuthHandler(consumer_key, consumer_secret)
self.auth.set_access_token(access_token, access_token_secret)
self.query = query
self.retweets_only = retweets_only
self.with_sentiment = with_sentiment
self.api = tweepy.API(self.auth)
self.tweet_count_max = 100 # To prevent Rate Limiting
except:
print("Error: Authentication Failed")
def set_query(self, query=''):
self.query = query
def set_retweet_checking(self, retweets_only='false'):
self.retweets_only = retweets_only
def set_with_sentiment(self, with_sentiment='false'):
self.with_sentiment = with_sentiment
def clean_tweet(self, tweet):
return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", tweet).split())
def get_tweet_sentiment(self, tweet):
analysis = TextBlob(self.clean_tweet(tweet))
if analysis.sentiment.polarity > 0:
return 'positive'
elif analysis.sentiment.polarity == 0:
return 'neutral'
else:
return 'negative'
def get_tweets(self):
tweets = []
try:
recd_tweets = self.api.search(q=self.query,
count=self.tweet_count_max)
if not recd_tweets:
pass
for tweet in recd_tweets:
parsed_tweet = {}
parsed_tweet['text'] = tweet.text
parsed_tweet['user'] = tweet.user.screen_name
if self.with_sentiment == 1:
parsed_tweet['sentiment'] = self.get_tweet_sentiment(tweet.text)
else:
parsed_tweet['sentiment'] = 'unavailable'
if tweet.retweet_count > 0 and self.retweets_only == 1:
if parsed_tweet not in tweets:
tweets.append(parsed_tweet)
elif not self.retweets_only:
if parsed_tweet not in tweets:
tweets.append(parsed_tweet)
return tweets
except tweepy.TweepError as e:
print("Error : " + str(e))