forked from Junyi-99/ChatGPT-API-Scanner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
352 lines (307 loc) · 11.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import argparse
import logging
import os
import pickle
import re
import time
from concurrent.futures import ThreadPoolExecutor
from sqlite3 import Connection, Cursor
from selenium import webdriver
from selenium.common.exceptions import UnableToSetCookieException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from tqdm import tqdm
from utils import (
check_key,
db_close,
db_delete,
db_get_all_keys,
db_insert,
db_key_exists,
db_open,
db_remove_duplication,
)
FORMAT = "%(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt="[%X]")
log = logging.getLogger("ChatGPT-API-Leakage")
class APIKeyLeakageScanner:
def __init__(self, db_file: str, keywords: list, languages: list):
self.db_file = db_file
log.info(f"📂 Opening database file {self.db_file}")
self.con, self.cur = db_open(self.db_file)
self.keywords = keywords
self.languages = languages
self.candidate_urls = [
f"https://github.com/search?q={keyword}+AND+%28%2Fsk-%5Ba-zA-Z0-9%5D%7B48%7D%2F%29+language%3A{language}&type=code&ref=advsearch"
for language in self.languages
for keyword in self.keywords
]
def _save_cookies(self):
cookies = self.driver.get_cookies()
with open("cookies.pkl", "wb") as file:
pickle.dump(cookies, file)
log.info("🍪 Cookies saved")
def _load_cookies(self):
try:
with open("cookies.pkl", "rb") as file:
cookies = pickle.load(file)
for cookie in cookies:
try:
self.driver.add_cookie(cookie)
except UnableToSetCookieException as e:
log.debug(f"🟡 Warning, unable to set a cookie {cookie}")
except EOFError as e:
if os.path.exists("cookies.pkl"):
os.remove("cookies.pkl")
log.error(
"🔴 Error, unable to load cookies, invalid cookies has been removed, please restart."
)
except pickle.UnpicklingError as e:
if os.path.exists("cookies.pkl"):
os.remove("cookies.pkl")
log.error(
"🔴 Error, load cookies failed, invalid cookies has been removed, please restart."
)
def _test_cookies(self):
"""
Test if the user is really logged in
"""
log.info("🤗 Redirecting ...")
self.driver.get("https://github.com/")
if self.driver.find_elements(
by=By.XPATH, value="//*[contains(text(), 'Sign in')]"
):
return False
return True
def _hit_rate_limit(self):
return self.driver.find_elements(
by=By.XPATH,
value="//*[contains(text(), 'You have exceeded a secondary rate limit')]",
)
def login_to_github(self):
log.info("🌍 Opening Chrome ...")
self.options = webdriver.ChromeOptions()
self.options.add_argument("--ignore-certificate-errors")
self.options.add_argument("--ignore-ssl-errors")
self.driver = webdriver.Chrome(options=self.options)
self.driver.implicitly_wait(3)
cookie_exists = os.path.exists("cookies.pkl")
self.driver.get("https://github.com/login")
if not cookie_exists:
log.info("🤗 No cookies found, please login to GitHub first")
input("Press Enter after you logged in: ")
self._save_cookies()
else:
log.info("🍪 Cookies found, loading cookies")
self._load_cookies()
if not self._test_cookies():
if os.path.exists("cookies.pkl"):
os.remove("cookies.pkl")
log.error("🔴 Error, you are not logged in, please restart and try again.")
exit(1)
# TODO: check if the user is logged in, if cookies are expired, etc.
def _process_url(self, url: str):
self.driver.get(url)
pattern = re.compile(r"sk-[a-zA-Z0-9]{48}")
while True:
# If current webpage is reached the rate limit, then wait for 30 seconds
if self._hit_rate_limit():
for _ in tqdm(range(30), desc="⏳ Rate limit reached, waiting ..."):
time.sleep(1)
self.driver.refresh()
continue
# Expand all the code
[
element.click()
for element in self.driver.find_elements(
by=By.XPATH, value="//*[contains(text(), 'more match')]"
)
]
codes = self.driver.find_elements(
by=By.CLASS_NAME, value="code-list"
) # find all elements with class name 'f4'
for element in codes:
apis = pattern.findall(element.text)
if len(apis) == 0:
continue
apis = list(set(apis))
apis = [api for api in apis if not db_key_exists(self.cur, api)]
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(check_key, apis))
for idx, result in enumerate(results):
db_insert(self.con, self.cur, apis[idx], result)
next_buttons = self.driver.find_elements(
by=By.XPATH, value="//a[@aria-label='Next Page']"
)
try:
WebDriverWait(self.driver, 5).until(
EC.presence_of_element_located(
(By.XPATH, "//a[@aria-label='Next Page']")
)
)
next_buttons = self.driver.find_elements(
by=By.XPATH, value="//a[@aria-label='Next Page']"
)
next_buttons[0].click()
except Exception as _:
# log.info(" ⚪️ No more pages")
break
def _save_progress(self, from_iter: int):
with open(".progress.txt", "w") as file:
# Save the progress and timestamp
file.write(f"{from_iter}/{len(self.candidate_urls)}/{time.time()}")
def _load_progress(self):
if not os.path.exists(".progress.txt"):
return 0
with open(".progress.txt", "r") as file:
progress = file.read().strip().split("/")
last = int(progress[0])
totl = int(progress[1])
tmst = float(progress[2])
# if the time is less than 1 hour, then continue from the last progress
if time.time() - tmst < 3600 and totl == len(self.candidate_urls):
# ask the user if they want to continue from the last progress
action = input(f"🔍 Progress found, do you want to continue from the last progress ({last}/{totl})? [yes] | no: ")
if action.lower() == "yes" or action.lower() == "y" or action == "":
return int(progress[0])
else:
return 0
return 0
def search(self, from_iter: int = None):
pbar = tqdm(
enumerate(self.candidate_urls),
total=len(self.candidate_urls),
desc="🔍 Searching ...",
)
if from_iter is None:
from_iter = self._load_progress()
for idx, url in enumerate(self.candidate_urls):
if idx < from_iter:
pbar.update()
time.sleep(0.05) # let tqdm print the bar
log.debug(f"⚪️ Skip {url}")
continue
self._process_url(url)
self._save_progress(idx)
log.debug(f"\n🔍 Finished {url}")
pbar.update()
pbar.close()
def deduplication(self):
db_remove_duplication(self.con, self.cur)
def update_existed_keys(self):
log.info("🔄 Updating existed keys")
keys = db_get_all_keys(self.cur)
for key in tqdm(keys, desc="🔄 Updating existed keys ..."):
result = check_key(key[0])
db_delete(self.con, self.cur, key[0])
db_insert(self.con, self.cur, key[0], result)
def all_available_keys(self) -> list:
return db_get_all_keys(self.cur)
def __del__(self):
if hasattr(self, "driver"):
self.driver.quit()
self.con.commit()
db_close(self.con)
def main(from_iter: int = None, check_existed_keys_only: bool = False):
keywords = [
"AI ethics",
"AI in customer service",
"AI in education",
"AI in finance",
"AI in healthcare",
"AI in marketing",
"AI-driven automation",
"AI-powered content creation",
"CoT",
"DPO",
"RLHF",
"agent",
"ai model",
"aios",
"api key",
"apikey",
"artificial intelligence",
"chain of thought",
"chatbot",
"chatgpt",
"competitor analysis",
"content strategy",
"conversational AI",
"data analysis",
"deep learning",
"direct preference optimization",
"experiment",
"gpt",
"gpt-3",
"gpt-4",
"gpt4",
"key",
"keyword clustering",
"keyword research",
"lab",
"language model experimentation",
"large language model",
"llama.cpp",
"llm",
"long-tail keywords",
"machine learning",
"multi-agent",
"multi-agent systems",
"natural language processing",
"openai",
"personalized AI",
"project",
"rag",
"reinforcement learning from human feedback",
"retrieval-augmented generation",
"search intent",
"semantic search",
"thoughts",
"virtual assistant",
"实验",
"密钥",
"测试",
"语言模型",
]
languages = [
'"Jupyter Notebook"',
"Python",
"Shell",
"JavaScript",
"TypeScript",
"Java",
"Go",
"C%2B%2B",
"PHP",
]
leakage = APIKeyLeakageScanner("github.db", keywords, languages)
if not check_existed_keys_only:
leakage.login_to_github()
leakage.search(from_iter=from_iter)
leakage.update_existed_keys()
leakage.deduplication()
keys = leakage.all_available_keys()
log.info(f"🔑 Available keys ({len(keys)}):")
for key in keys:
log.info(key)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--from-iter", type=int, default=None, help="Start from the specific iteration")
parser.add_argument(
"--debug",
action="store_true",
default=False,
help="Enable debug mode, otherwise INFO mode. Default is False (INFO mode)",
)
parser.add_argument(
"-ceko",
"--check-existed-keys-only",
action="store_true",
default=False,
help="Only check existed keys",
)
args = parser.parse_args()
if args.debug:
log.getLogger().setLevel(log.DEBUG)
main(from_iter=args.from_iter, check_existed_keys_only=args.check_existed_keys_only)