-
Notifications
You must be signed in to change notification settings - Fork 0
/
melty_blood_bot.py
executable file
·133 lines (103 loc) · 3.86 KB
/
melty_blood_bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import logging
import logging.config
import os
import sys
from twitter_bot import TwitterBot, TwitterVideoBot, JobManager
import define
# Set default encording.
try:
if sys.version_info[0] >= 3:
from imp import reload
reload(sys)
sys.setdefaultencoding('utf-8')
except ImportError:
pass
SEARCH_WORD = 'mbaacc'
SEARCH_WORD_YOUTUBE = 'mbaacc OR mbaa'
# Config.
LOG_CONFIG = 'config/log.cfg'
BOT_CONFIG = 'config/bot.cfg'
# Initialize logger.
logger = logging.getLogger('app')
## Main
def main(argv):
"""main function"""
with TwitterBot(BOT_CONFIG) as tw_bot:
is_test = False
if len(argv) >= 1:
if argv[0] == 'test':
is_test = True
tw_bot.is_test = is_test
elif argv[0] == 'init':
# Initialize database.
tw_bot.create_database()
return
elif argv[0] == 'follow':
tw_bot.make_follow_list_from_followers()
return
else:
raise Exception('Unknow argument: argv={}'.format(argv))
with JobManager() as job_manager:
#register_twitter_bot_jobs(job_manager, tw_bot)
video_bot = TwitterVideoBot(BOT_CONFIG)
video_bot.is_test = is_test
register_twitter_video_bot_jobs(job_manager, video_bot)
# Run jobs.
job_manager.run()
def register_twitter_bot_jobs(job_manager, bot):
# Make func_and_intervals.
func_and_intervals = []
func_tuple = (TwitterBot.update_database, None, None, 12 * 60)
func_and_intervals.append(func_tuple)
func_tuple = (TwitterBot.retweet_mentions, None, None, 11)
func_and_intervals.append(func_tuple)
func_tuple = (TwitterBot.retweet_retweeted_of_me, None, None, 23)
func_and_intervals.append(func_tuple)
func_tuple = (TwitterBot.follow_not_following_users, None, None, 7 * 60)
func_and_intervals.append(func_tuple)
# Register.
job_manager.register_jobs(bot, func_and_intervals)
def register_twitter_video_bot_jobs(job_manager, bot):
# Make func_and_intervals.
func_and_intervals = []
prev_datetime = job_manager \
.get_job_called_datetime(TwitterVideoBot.__name__,
TwitterVideoBot.nico_comment_post.__name__)
func_tuple = (TwitterVideoBot.nico_comment_post,
[SEARCH_WORD, prev_datetime],
{'filter_func': filter_func_for_nico_comment_post}, 1.1 * 30)
func_and_intervals.append(func_tuple)
prev_datetime = job_manager \
.get_job_called_datetime(TwitterVideoBot.__name__,
TwitterVideoBot.youtube_video_post.__name__)
func_tuple = (TwitterVideoBot.youtube_video_post, [SEARCH_WORD_YOUTUBE, prev_datetime],
None, 1.3 * 30)
func_and_intervals.append(func_tuple)
prev_datetime = job_manager \
.get_job_called_datetime(TwitterVideoBot.__name__,
TwitterVideoBot.nico_video_post.__name__)
func_tuple = (TwitterVideoBot.nico_video_post, [SEARCH_WORD, prev_datetime],
None, 1.5 * 30)
func_and_intervals.append(func_tuple)
# Register.
job_manager.register_jobs(bot, func_and_intervals)
def filter_func_for_nico_comment_post(video):
if video.id in define.NG_ID:
return True
return False
if __name__ == "__main__":
try:
os.chdir(os.path.dirname(sys.argv[0]) or '.')
logging.config.fileConfig(LOG_CONFIG)
logger.info('Start melty_blood_bot.py argv={}, cwd={}'
.format(sys.argv[1:], os.getcwd()))
main(sys.argv[1:])
except:
logger.exception('Unhandled exception')
raise
finally:
logger.info('End melty_blood_bot.py')
logger.info('-' * 80)