Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/Deployment' into Deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
utkn committed May 14, 2021
2 parents 717ee1e + 076aa24 commit 418f4e8
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 58 deletions.
45 changes: 29 additions & 16 deletions backend/processor/notification_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def _calculate_counts(time_range: TimeRange, aggregate_model, source) -> (int, i
return start_cum, end_cum

@staticmethod
def _calculate_aggregate_percent_change(time_range: TimeRange, before_epoch_aggregate_model, after_epoch_aggregate_model, source) -> float:
def _calculate_aggregate_percent_change(time_range: TimeRange, before_epoch_aggregate_model,
after_epoch_aggregate_model, source) -> float:
before_epoch, after_epoch = NotificationDeployer._split_from_epoch(time_range)
if before_epoch is not None:
before_epoch_start, before_epoch_end = NotificationDeployer._calculate_counts(before_epoch,
Expand All @@ -78,27 +79,39 @@ def prepare_change_map(self, before_epoch_model, after_epoch_model, curr_time):
after_epoch_model, aggr_source)
self.post_count_change_map[time_window_str][aggr_source] = coin_change

# Returns the notification and a boolean representing if an e-mail should be sent.
# Returns the notification and a boolean representing if an e-mail should be sent, returns None if trigger isn't
# triggered.
def process_trigger(self, trigger: Trigger) -> (Notification, bool):
# Check if there are unread notifications that are associated with this trigger.
has_unread = any(n.read == 0 for n in trigger.notifications)
# Check if the e-mail notifications for this trigger are turned on.
notify_email = trigger.follow.notify_email

should_send_email = notify_email and not has_unread
n = Notification()

return None, False
# Lookup the relevant change in the change map.
change = self.post_count_change_map[trigger.time_window][trigger.follow.type + ":" + trigger.follow.target]
# If the change is above the set threshold...
if change > trigger.threshold:
# Check if there are unread notifications that are associated with this trigger.
has_unread = any(n.read == 0 for n in trigger.notifications)
# Check if the e-mail notifications for this trigger are turned on.
notify_email = trigger.follow.notify_email
# Decide whether to send or not send and email.
should_send_email = notify_email and not has_unread
# Create notification.
message = "%s saw a %s%% increase in the last %s" \
% (trigger.follow.target, "{:.2f}".format(change), trigger.time_window)
notification = Notification(user_id=trigger.follow.user_id, trigger_id=trigger.id,
content=message, time=int(time.time()), read=False)
# Add notification to the list of notifications of the trigger.
trigger.notifications.append(notification)
return notification, should_send_email
else:
return None, False


def deploy_notifications(curr_time: int, coins, sources):
d = NotificationDeployer(coins, sources)
d.prepare_change_map(AggregatePostCount, StreamedAggregatePostCount, curr_time)
triggers = Trigger.query.all()
for t in triggers:
notif, should_email = d.process_trigger(t)
if notif is not None:
db.session.add(notif)
if should_email:
notification, should_send_email = d.process_trigger(t)
if notification is not None:
db.session.add(notification)
if should_send_email:
pass
# send_email
# send_email
127 changes: 85 additions & 42 deletions data/collector/twitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from data.database import Post
from data.collector import Collector
from misc import TimeRange, CoinType
import os
import csv

LIMIT = True

usernames = ["officialmcafee", "VitalikButerin", "SatoshiLite", "pmarca", "rogerkver", "aantonop", "ErikVoorhees",
"NickSzabo4", "CryptoYoda1338", "bgarlinghouse", "WhalePanda", "cryptoSqueeze", "ZeusZissou",
Expand All @@ -21,68 +25,107 @@
CoinType.xrp: ["Ripple", "XRP"],
CoinType.ltc: ["Litecoin", "LTC"],
CoinType.xlm: ["Stellar", "XLM"],
CoinType.omg: ["omise_go", "omgnetwork", "OMG"],
CoinType.omg: ["omgnetwork", "OMG"],
}


def calculate_interaction_score(replies_count, likes_count, retweet_count):
return int(replies_count) + int(likes_count) + int(retweet_count)


def convert_to_unix(datestamp, timestamp):
date = datestamp.split("-")
time = timestamp.split(":")
date_time = datetime.datetime(int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), 0)
def convert_to_unix(datestamp):
date, hour, _ = datestamp.split(" ")
date = date.split("-")
hour = hour.split(":")
date_time = datetime.datetime(year=int(date[0]), month=int(date[1]), day=int(date[2]), hour=int(hour[0]),
minute=int(hour[1]), second=int(hour[2]))
return int(date_time.timestamp())


class TwitterCrawler(Collector):
def convert_to_date(time_range: TimeRange):
lower_date = datetime.datetime.fromtimestamp(time_range.low - 86400).strftime("%Y-%m-%d")
upper_date = datetime.datetime.fromtimestamp(time_range.high + 86400).strftime("%Y-%m-%d")
return lower_date, upper_date


class TwitterCrawler(Collector):
def __init__(self, coin: CoinType = CoinType.btc, only_users=False):
super().__init__(coin=coin, only_users=only_users)
self.config = twint.Config()
self.config.Limit = 1
self.config.Store_object = True
if LIMIT:
self.config.Limit = 1
self.config.Store_csv = True
self.config.Output = "out.csv"
self.config.Resume = "out_last.csv"
self.config.Debug = True
self.config.Lang = "en"
self.config.Hide_output = True

@staticmethod
def get_all_sources() -> list:
return ["*@twitter/" + s for s in functools.reduce(list.__add__, COIN_KEYWORDS.values())]

def collect(self, time_range: TimeRange):
for username in usernames:
# print("TwitterCrawler:", "Collecting from @" + username, "with time range", time_range)
tweets = []
lower, upper = convert_to_date(time_range)
self.config.Since = lower
self.config.Until = upper
for keyword in COIN_KEYWORDS[self.settings.coin]:
if self.settings.only_users:
self.config.Username = username
self.config.Store_object_tweets_list = tweets
for keyword in COIN_KEYWORDS[self.settings.coin]:
self.config.Search = keyword
try:
twint.run.Search(self.config)
except Exception as e:
print("TwitterCrawler: An occurred, skipping the keyword...")
print(e)
continue
for tweet in tweets:
unix_timestamp = convert_to_unix(tweet.datestamp, tweet.timestamp)
if time_range.is_higher(unix_timestamp):
continue
elif time_range.is_lower(unix_timestamp):
for username in usernames:
self.config.Username = username
self.config.Search = keyword
while True:
try:
twint.run.Search(self.config)
break
except:
pass
os.remove("out_last.csv")
os.remove("twint-last-request.log")
os.remove("twint-request_urls.log")
with open("out.csv", "r") as file:
next(file)
reader = csv.reader(file)
for row in reader:
unix_timestamp = convert_to_unix(row[2])
if time_range.low < time_range.high < unix_timestamp:
continue
if unix_timestamp < time_range.low < time_range.high:
break

yield Post(unique_id="tw" + row[0], user=row[7], content=row[10],
interaction=calculate_interaction_score(int(row[15]), int(row[17]),
int(row[16])),
source="twitter/" + keyword.lower(),
time=unix_timestamp,
coin_type=self.settings.coin)

os.remove("out.csv")
else:
self.config.Search = "#" + keyword
while True:
try:
twint.run.Search(self.config)
break
# print("TwitterCrawler:", "Found tweet that includes", keyword, "with date", tweet.datestamp,
# tweet.timestamp)
tweet_id = tweet.id
username = tweet.username
tweet_body = tweet.tweet
interaction_score = calculate_interaction_score(tweet.replies_count, tweet.likes_count,
tweet.retweets_count)
yield Post(unique_id="tw" + str(tweet_id), user=username, content=tweet_body,
interaction=interaction_score, source="twitter/" + keyword.lower(), time=unix_timestamp,
coin_type=self.settings.coin)
if not self.settings.only_users:
break

# For Testing
# TwitterCrawler().collect_posts(CoinType.BTC, TimeRange(int(datetime.datetime.now().timestamp()) - 86400 * 10,
# int(datetime.datetime.now().timestamp())))
except:
pass
os.remove("out_last.csv")
os.remove("twint-last-request.log")
os.remove("twint-request_urls.log")
with open("out.csv", "r") as file:
next(file)
reader = csv.reader(file)
for row in reader:
unix_timestamp = convert_to_unix(row[2])
if time_range.low < time_range.high < unix_timestamp:
continue
if unix_timestamp < time_range.low < time_range.high:
break

yield Post(unique_id="tw" + row[0], user=row[7], content=row[10],
interaction=calculate_interaction_score(int(row[15]), int(row[17]), int(row[16])),
source="twitter/" + keyword.lower(),
time=unix_timestamp,
coin_type=self.settings.coin)

os.remove("out.csv")

0 comments on commit 418f4e8

Please sign in to comment.