forked from Sentdex/socialsentiment
-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitter_stream.py
193 lines (158 loc) · 6.6 KB
/
twitter_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# set chdir to current dir
import os
import sys
sys.path.insert(0, os.path.realpath(os.path.dirname(__file__)))
os.chdir(os.path.realpath(os.path.dirname(__file__)))
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import json
import sqlite3
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from unidecode import unidecode
import time
from threading import Lock, Timer
import pandas as pd
from config import stop_words
import regex as re
from collections import Counter
import string
import pickle
import itertools
from textblob import TextBlob
analyzer = SentimentIntensityAnalyzer()
#consumer key, consumer secret, access token, access secret.
# https://apps.twitter.com/ to setup
ckey=""
csecret=""
atoken=""
asecret="
# isolation lever disables automatic transactions,
# we are disabling thread check as we are creating connection here, but we'll be inserting from a separate thread (no need for serialization)
conn = sqlite3.connect('twitter.db', isolation_level=None, check_same_thread=False)
c = conn.cursor()
def create_table():
try:
# http://www.sqlite.org/pragma.html#pragma_journal_mode
# for us - it allows concurrent write and reads
c.execute("PRAGMA journal_mode=wal")
c.execute("PRAGMA wal_checkpoint=TRUNCATE")
#c.execute("PRAGMA journal_mode=PERSIST")
# changed unix to INTEGER (it is integer, sqlite can use up to 8-byte long integers)
c.execute("CREATE TABLE IF NOT EXISTS sentiment(id INTEGER PRIMARY KEY AUTOINCREMENT, unix INTEGER, tweet TEXT, sentiment REAL)")
# key-value table for random stuff
c.execute("CREATE TABLE IF NOT EXISTS misc(key TEXT PRIMARY KEY, value TEXT)")
# id on index, both as DESC (as you are sorting in DESC order)
c.execute("CREATE INDEX id_unix ON sentiment (id DESC, unix DESC)")
# out full-text search table, i choosed creating data from external (content) table - sentiment
# instead of directly inserting to that table, as we are saving more data than just text
# https://sqlite.org/fts5.html - 4.4.2
c.execute("CREATE VIRTUAL TABLE sentiment_fts USING fts5(tweet, content=sentiment, content_rowid=id, prefix=1, prefix=2, prefix=3)")
# that trigger will automagically update out table when row is interted
# (requires additional triggers on update and delete)
c.execute("""
CREATE TRIGGER sentiment_insert AFTER INSERT ON sentiment BEGIN
INSERT INTO sentiment_fts(rowid, tweet) VALUES (new.id, new.tweet);
END
""")
except Exception as e:
print(str(e))
create_table()
# create lock
lock = Lock()
class listener(StreamListener):
data = []
lock = None
def __init__(self, lock):
# create lock
self.lock = lock
# init timer for database save
self.save_in_database()
# call __inint__ of super class
super().__init__()
def save_in_database(self):
# set a timer (1 second)
Timer(1, self.save_in_database).start()
# with lock, if there's data, save in transaction using one bulk query
with self.lock:
if len(self.data):
c.execute('BEGIN TRANSACTION')
try:
c.executemany("INSERT INTO sentiment (unix, tweet, sentiment) VALUES (?, ?, ?)", self.data)
except:
pass
c.execute('COMMIT')
self.data = []
def on_data(self, data):
try:
#print('data')
data = json.loads(data)
# there are records like that:
# {'limit': {'track': 14667, 'timestamp_ms': '1520216832822'}}
if 'truncated' not in data:
#print(data)
return True
if data['truncated']:
tweet = unidecode(data['extended_tweet']['full_text'])
else:
tweet = unidecode(data['text'])
time_ms = data['timestamp_ms']
vs = analyzer.polarity_scores(tweet)
sentiment = vs['compound']
#print(time_ms, tweet, sentiment)
# append to data list (to be saved every 1 second)
with self.lock:
self.data.append((time_ms, tweet, sentiment))
except KeyError as e:
#print(data)
print(str(e))
return True
def on_error(self, status):
print(status)
# make a counter with blacklist words and empty word with some big value - we'll use it later to filter counter
stop_words.append('')
blacklist_counter = Counter(dict(zip(stop_words, [1000000] * len(stop_words))))
# complie a regex for split operations (punctuation list, plus space and new line)
punctuation = [str(i) for i in string.punctuation]
split_regex = re.compile("[ \n" + re.escape("".join(punctuation)) + ']')
def map_nouns(col):
return [word[0] for word in TextBlob(col).tags if word[1] == u'NNP']
# generate "trending"
def generate_trending():
try:
# select last 10k tweets
df = pd.read_sql("SELECT * FROM sentiment ORDER BY id DESC, unix DESC LIMIT 10000", conn)
df['nouns'] = list(map(map_nouns,df['tweet']))
# make tokens
tokens = split_regex.split(' '.join(list(itertools.chain.from_iterable(df['nouns'].values.tolist()))).lower())
# clean and get top 10
trending = (Counter(tokens) - blacklist_counter).most_common(10)
# get sentiments
trending_with_sentiment = {}
for term, count in trending:
df = pd.read_sql("SELECT sentiment.* FROM sentiment_fts fts LEFT JOIN sentiment ON fts.rowid = sentiment.id WHERE fts.sentiment_fts MATCH ? ORDER BY fts.rowid DESC LIMIT 1000", conn, params=(term,))
trending_with_sentiment[term] = [df['sentiment'].mean(), count]
# save in a database
with lock:
c.execute('BEGIN TRANSACTION')
try:
c.execute("REPLACE INTO misc (key, value) VALUES ('trending', ?)", (pickle.dumps(trending_with_sentiment),))
except:
pass
c.execute('COMMIT')
except Exception as e:
with open('errors.txt','a') as f:
f.write(str(e))
f.write('\n')
finally:
Timer(5, generate_trending).start()
Timer(1, generate_trending).start()
while True:
try:
auth = OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
twitterStream = Stream(auth, listener(lock))
twitterStream.filter(track=["a","e","i","o","u"])
except Exception as e:
print(str(e))
time.sleep(5)