-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
executable file
·79 lines (56 loc) · 1.93 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import os
import pandas as pd
import numpy as np
import tweepy
from prefect import flatten, task, Flow
from prefect.tasks.notifications.email_task import EmailTask
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
TWEET_ATS = """Check out my latest blog: "{title}"!
Medium: {medium_link}
My blog: {blog_link}
{ats}
{hashtags}
"""
TWEET_NO_ATS = """Check out my latest blog: "{title}"!
Medium: {medium_link}
My blog: {blog_link}
{hashtags}
"""
def get_blob_info():
df = pd.read_csv("/mnt/shared/blogs.csv")
df = df.iloc[-1,:]
return df["Title"], df["MediumLink"], df["BlogLink"], df["Hashtags"], df["Ats"]
@task
def send_tweet(title: str, medium_link: str, blog_link: str, hashtags: str, ats: str):
CONSUMER_KEY = os.environ.get('CONSUMER_KEY', None)
CONSUMER_SECRET = os.environ.get('CONSUMER_SECRET', None)
ACCESS_TOKEN = os.environ.get('ACCESS_TOKEN', None)
ACCESS_TOKEN_SECRET = os.environ.get('ACCESS_TOKEN_SECRET', None)
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
if ats is not np.nan:
tweet = TWEET_ATS.format(
title=title,
medium_link=medium_link,
blog_link=blog_link,
ats=ats.strip('"'),
hashtags=hashtags
)
else:
tweet = TWEET_NO_ATS.format(
title=title,
medium_link=medium_link,
blog_link=blog_link,
hashtags=hashtags
)
api.update_status(
tweet
)
# Tuesday at 8pm
schedule = Schedule(clocks=[CronClock("0 0 * * 3")])
with Flow("Send Tweet", schedule=schedule) as flow:
title, medium_link, blog_link, hashtags, ats = get_blob_info()
send_tweet(title, medium_link, blog_link, hashtags, ats)
flow.register(project_name="Blog-Tweeter")