-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfetch.py
185 lines (148 loc) · 5.47 KB
/
fetch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from twitterscraper import query_tweets
from bs4 import BeautifulSoup
from pprint import pprint
import datetime as dt
import pymongo
import sys
import codecs
import json
import copy
import re
class Fetch():
# Constructor to initialize attributes:
def __init__(self):
# Database handler:
self.db = None
# Dictionary of most used words:
self.mostUsed = {}
# Connects with the database:
client = pymongo.MongoClient()
db = client.tweet_db
self.collection = db.tweet_collection
self.collection.create_index([("id",pymongo.ASCENDING)],unique=True)
# Corrects encoding to UTF-8:
if sys.stdout.encoding != 'UTF-8':
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'strict')
# Fetches at least *count* tweets containing the words *q*:
def fetchTweets(self, q, count, begindate=dt.date(2006,3,21), enddate=dt.date.today(), save=True, printTweets=False):
# Function to return the language attribute out of an HTML containing a <p> element:
def getLang(string):
soup = BeautifulSoup(string, 'lxml')
return soup.find('p')["lang"]
# Performs the query:
list_of_tweets = query_tweets(q, count, begindate=begindate, enddate=enddate)
# Checks if saving to database is enabled:
if (save == True):
# Loops through obtained tweets:
for tweet in list_of_tweets:
# Treats newline:
tweet.text = tweet.text.replace('\n', ' ').replace('\r', ' ')
# Checks if printing is enabled:
if (printTweets == True):
print( '@%s tweeted: %s \n' % ( tweet.user, tweet.text ) )
# Converts Tweet object to JSON:
tweetDict = {
"id" : tweet.id,
"text" : tweet.text,
"user" : tweet.user,
"fullname" : tweet.fullname,
"lang" : getLang(tweet.html),
"replies" : tweet.replies,
"retweets" : tweet.retweets,
"likes" : tweet.likes,
"timestamp" : tweet.timestamp
}
tweetJson = json.dumps(tweetDict, default=str)
# Attempts to save in database:
try:
self.collection.insert(tweetDict)
except pymongo.errors.PyMongoError as e:
print("Database insertion failed: %s" % e)
# Function to retrieve a dictionary with most used words. calculateMostUsed() must have been called before.
# *chosenWords* is a list of words that you want to actually retain in your dictionary. By
# default, it is passed as None, which retains every word:
def getMostUsed(self):
# Returns frequency of words:
return self.mostUsed
# Function to remove any word from the dictionary that is not desired:
def filterMostUsed(self, chosenWords):
# Retrieves frequency of words:
for word in list(self.mostUsed):
if word not in chosenWords:
del self.mostUsed[word]
# Function to save most used words into a file. calculateMostUsed() must have been called before.
def saveMostUsed(self):
# Prepares a file:
file = open('syria.txt', "a")
# Loops through dictionary:
for word in self.mostUsed:
for j in range(self.mostUsed[word]):
file.write(word + " ")
# Function to print most used words. calculateMostUsed() must have been called before.
def printMostUsed(self):
# Loops through dictionary:
for word in self.mostUsed:
print ( 'Word \"%s\" appears %s times' % ( results[i]["_id"], results[i]["count"] ) )
# Function to save most used words into a JSON file. calculateMostUsed() must have been called before.
def jsonMostUsed(self, numOfTweets):
# Prepares main JSON list:
list = []
# Prepares list of words:
words = []
# Iterates through frequencies:
for word in self.mostUsed:
# List of tweets containing this word:
tweets = []
# Prepares regex:
regStr = r'\b%s\b' % word
regx = re.compile(regStr, re.IGNORECASE)
# Iterates through database to retrieve tweets containing this word, limited by numOfTweets:
for tweet in self.collection.find({'text': {'$regex':regx}}).limit(numOfTweets):
# Treats tweet links:
msg = tweet["text"]
msg = msg.replace("http", " http")
msg = " ".join(filter(lambda x:'http' not in x, msg.split()))
msg = " ".join(filter(lambda x:'.com' not in x, msg.split()))
tweets.append(msg)
# Builds object to later become a JSON:
obj = {
"name" : word,
"value" : self.mostUsed[word],
"tweets" : tweets
}
# Appends object to list:
words.append(obj)
mainObj = {
"country": "syria",
"frequencies": words
}
list.append(mainObj)
# Saves to file:
with open('syria.json', 'w') as fp:
json.dump(list, fp)
def getSamples(self, word):
# Prepares list of tweet samples:
samples = []
for tweet in self.collection.find({'text': {'$regex':word}}).limit(5):
print(tweet["text"])
# Function to retrieve a dictionary with the most used words in the db collection.
def calculateMostUsed(self, count=5, smooth=0):
# Prepare query:
query = [
{ "$project": { "words": { "$split": ["$text", " "] } } },
{ "$unwind": { "path": "$words" } },
{ "$group": { "_id": "$words", "count": { "$sum": 1 } } }
]
# Retrieve count of words:
results = list(self.collection.aggregate(query))
# Sort result (so most used words are displayed first):
results.sort(key=lambda x: x["count"], reverse=True)
# Print requested results:
for i in range(count):
# Value to be added to dictionary (how many times a word has appeared):
valueToDict = results[i]["count"]
# Checks if smoothing is enable. This makes words that appear a lot become smaller:
if (smooth != 0):
frac = results[i]["count"] / smooth
valueToDict -= results[i]["count"]*(frac**2)
self.mostUsed[results[i]["_id"]] = valueToDict