diff --git a/backend/processor/notification_deployment.py b/backend/processor/notification_deployment.py index 76a4e48c..ace4d850 100644 --- a/backend/processor/notification_deployment.py +++ b/backend/processor/notification_deployment.py @@ -52,7 +52,8 @@ def _calculate_counts(time_range: TimeRange, aggregate_model, source) -> (int, i return start_cum, end_cum @staticmethod - def _calculate_aggregate_percent_change(time_range: TimeRange, before_epoch_aggregate_model, after_epoch_aggregate_model, source) -> float: + def _calculate_aggregate_percent_change(time_range: TimeRange, before_epoch_aggregate_model, + after_epoch_aggregate_model, source) -> float: before_epoch, after_epoch = NotificationDeployer._split_from_epoch(time_range) if before_epoch is not None: before_epoch_start, before_epoch_end = NotificationDeployer._calculate_counts(before_epoch, @@ -78,17 +79,29 @@ def prepare_change_map(self, before_epoch_model, after_epoch_model, curr_time): after_epoch_model, aggr_source) self.post_count_change_map[time_window_str][aggr_source] = coin_change - # Returns the notification and a boolean representing if an e-mail should be sent. + # Returns the notification and a boolean representing if an e-mail should be sent, returns None if trigger isn't + # triggered. def process_trigger(self, trigger: Trigger) -> (Notification, bool): - # Check if there are unread notifications that are associated with this trigger. - has_unread = any(n.read == 0 for n in trigger.notifications) - # Check if the e-mail notifications for this trigger are turned on. - notify_email = trigger.follow.notify_email - - should_send_email = notify_email and not has_unread - n = Notification() - - return None, False + # Lookup the relevant change in the change map. + change = self.post_count_change_map[trigger.time_window][trigger.follow.type + ":" + trigger.follow.target] + # If the change is above the set threshold... + if change > trigger.threshold: + # Check if there are unread notifications that are associated with this trigger. + has_unread = any(n.read == 0 for n in trigger.notifications) + # Check if the e-mail notifications for this trigger are turned on. + notify_email = trigger.follow.notify_email + # Decide whether to send or not send and email. + should_send_email = notify_email and not has_unread + # Create notification. + message = "%s saw a %s%% increase in the last %s" \ + % (trigger.follow.target, "{:.2f}".format(change), trigger.time_window) + notification = Notification(user_id=trigger.follow.user_id, trigger_id=trigger.id, + content=message, time=int(time.time()), read=False) + # Add notification to the list of notifications of the trigger. + trigger.notifications.append(notification) + return notification, should_send_email + else: + return None, False def deploy_notifications(curr_time: int, coins, sources): @@ -96,9 +109,9 @@ def deploy_notifications(curr_time: int, coins, sources): d.prepare_change_map(AggregatePostCount, StreamedAggregatePostCount, curr_time) triggers = Trigger.query.all() for t in triggers: - notif, should_email = d.process_trigger(t) - if notif is not None: - db.session.add(notif) - if should_email: + notification, should_send_email = d.process_trigger(t) + if notification is not None: + db.session.add(notification) + if should_send_email: pass - # send_email \ No newline at end of file + # send_email diff --git a/data/collector/twitter.py b/data/collector/twitter.py index 75ae4762..acf2575d 100644 --- a/data/collector/twitter.py +++ b/data/collector/twitter.py @@ -5,6 +5,10 @@ from data.database import Post from data.collector import Collector from misc import TimeRange, CoinType +import os +import csv + +LIMIT = True usernames = ["officialmcafee", "VitalikButerin", "SatoshiLite", "pmarca", "rogerkver", "aantonop", "ErikVoorhees", "NickSzabo4", "CryptoYoda1338", "bgarlinghouse", "WhalePanda", "cryptoSqueeze", "ZeusZissou", @@ -21,7 +25,7 @@ CoinType.xrp: ["Ripple", "XRP"], CoinType.ltc: ["Litecoin", "LTC"], CoinType.xlm: ["Stellar", "XLM"], - CoinType.omg: ["omise_go", "omgnetwork", "OMG"], + CoinType.omg: ["omgnetwork", "OMG"], } @@ -29,20 +33,32 @@ def calculate_interaction_score(replies_count, likes_count, retweet_count): return int(replies_count) + int(likes_count) + int(retweet_count) -def convert_to_unix(datestamp, timestamp): - date = datestamp.split("-") - time = timestamp.split(":") - date_time = datetime.datetime(int(date[0]), int(date[1]), int(date[2]), int(time[0]), int(time[1]), 0) +def convert_to_unix(datestamp): + date, hour, _ = datestamp.split(" ") + date = date.split("-") + hour = hour.split(":") + date_time = datetime.datetime(year=int(date[0]), month=int(date[1]), day=int(date[2]), hour=int(hour[0]), + minute=int(hour[1]), second=int(hour[2])) return int(date_time.timestamp()) -class TwitterCrawler(Collector): +def convert_to_date(time_range: TimeRange): + lower_date = datetime.datetime.fromtimestamp(time_range.low - 86400).strftime("%Y-%m-%d") + upper_date = datetime.datetime.fromtimestamp(time_range.high + 86400).strftime("%Y-%m-%d") + return lower_date, upper_date + +class TwitterCrawler(Collector): def __init__(self, coin: CoinType = CoinType.btc, only_users=False): super().__init__(coin=coin, only_users=only_users) self.config = twint.Config() - self.config.Limit = 1 - self.config.Store_object = True + if LIMIT: + self.config.Limit = 1 + self.config.Store_csv = True + self.config.Output = "out.csv" + self.config.Resume = "out_last.csv" + self.config.Debug = True + self.config.Lang = "en" self.config.Hide_output = True @staticmethod @@ -50,39 +66,66 @@ def get_all_sources() -> list: return ["*@twitter/" + s for s in functools.reduce(list.__add__, COIN_KEYWORDS.values())] def collect(self, time_range: TimeRange): - for username in usernames: - # print("TwitterCrawler:", "Collecting from @" + username, "with time range", time_range) - tweets = [] + lower, upper = convert_to_date(time_range) + self.config.Since = lower + self.config.Until = upper + for keyword in COIN_KEYWORDS[self.settings.coin]: if self.settings.only_users: - self.config.Username = username - self.config.Store_object_tweets_list = tweets - for keyword in COIN_KEYWORDS[self.settings.coin]: - self.config.Search = keyword - try: - twint.run.Search(self.config) - except Exception as e: - print("TwitterCrawler: An occurred, skipping the keyword...") - print(e) - continue - for tweet in tweets: - unix_timestamp = convert_to_unix(tweet.datestamp, tweet.timestamp) - if time_range.is_higher(unix_timestamp): - continue - elif time_range.is_lower(unix_timestamp): + for username in usernames: + self.config.Username = username + self.config.Search = keyword + while True: + try: + twint.run.Search(self.config) + break + except: + pass + os.remove("out_last.csv") + os.remove("twint-last-request.log") + os.remove("twint-request_urls.log") + with open("out.csv", "r") as file: + next(file) + reader = csv.reader(file) + for row in reader: + unix_timestamp = convert_to_unix(row[2]) + if time_range.low < time_range.high < unix_timestamp: + continue + if unix_timestamp < time_range.low < time_range.high: + break + + yield Post(unique_id="tw" + row[0], user=row[7], content=row[10], + interaction=calculate_interaction_score(int(row[15]), int(row[17]), + int(row[16])), + source="twitter/" + keyword.lower(), + time=unix_timestamp, + coin_type=self.settings.coin) + + os.remove("out.csv") + else: + self.config.Search = "#" + keyword + while True: + try: + twint.run.Search(self.config) break - # print("TwitterCrawler:", "Found tweet that includes", keyword, "with date", tweet.datestamp, - # tweet.timestamp) - tweet_id = tweet.id - username = tweet.username - tweet_body = tweet.tweet - interaction_score = calculate_interaction_score(tweet.replies_count, tweet.likes_count, - tweet.retweets_count) - yield Post(unique_id="tw" + str(tweet_id), user=username, content=tweet_body, - interaction=interaction_score, source="twitter/" + keyword.lower(), time=unix_timestamp, - coin_type=self.settings.coin) - if not self.settings.only_users: - break - -# For Testing -# TwitterCrawler().collect_posts(CoinType.BTC, TimeRange(int(datetime.datetime.now().timestamp()) - 86400 * 10, -# int(datetime.datetime.now().timestamp()))) + except: + pass + os.remove("out_last.csv") + os.remove("twint-last-request.log") + os.remove("twint-request_urls.log") + with open("out.csv", "r") as file: + next(file) + reader = csv.reader(file) + for row in reader: + unix_timestamp = convert_to_unix(row[2]) + if time_range.low < time_range.high < unix_timestamp: + continue + if unix_timestamp < time_range.low < time_range.high: + break + + yield Post(unique_id="tw" + row[0], user=row[7], content=row[10], + interaction=calculate_interaction_score(int(row[15]), int(row[17]), int(row[16])), + source="twitter/" + keyword.lower(), + time=unix_timestamp, + coin_type=self.settings.coin) + + os.remove("out.csv")