Skip to content

Commit

Permalink
feat: separte domestic, overseas in api
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroam committed Oct 23, 2024
1 parent 3b722f4 commit 6a86a5e
Show file tree
Hide file tree
Showing 23 changed files with 896 additions and 490 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
### Kispy
- [ ] 사용방법 작성
- hashkey는 더 이상 사용되지 않음. 코드에서도 포함하지 않았음
- 토큰은 계속 발급 받으면 API를 호출할 수 없으므로 tmp 경로에 token 데이터 저장
- 각 sector 별로 패키지 분리 (인증, 한국, 해외, ...)

### ToDos
- [x] Response 핸들링 필요
- [x] Response 클래스 구현
- [x] 유효하지 않은 Response에 대해서 예외 처리
- [ ] 한번 발급 받은 토큰을 어떻게 관리할 지 정리 필요
- [ ] 웹 소켓 방식 인증에 대한 구현
- [ ] 인증 방식에 따라서 Auth 클래스를 다르게 사용
- 별도 클라이언트(RTClient)가 필요할 수 있을 것 같음
- [ ] 어떤 API들이 구현되었는지 작성
- [ ] 로깅 고도화 (디버그 모드일 경우, 전체 request, response 조회 가능해야 함)
- [ ] 페이지네이션 구현
- [ ] 페이지네이션 구현
- [ ] 테스트 코드 작성
91 changes: 82 additions & 9 deletions kispy/auth.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import logging
import os
import pickle
import tempfile
from datetime import datetime

import requests
from pydantic import BaseModel, Field

from kispy.constants import REAL_URL, VIRTUAL_URL
from kispy.responses import AuthResponse

class Auth(BaseModel):
"""
인증 방식을 정의하는 클래스
"""
logger = logging.getLogger(__name__)

pass


class Token(Auth):
class Token(BaseModel):
access_token: str = Field(description="액세스 토큰")
token_type: str = "Bearer"
expires_in: int = Field(description="접근 토큰 유효기간")
Expand All @@ -21,5 +23,76 @@ def is_expired(self) -> bool:
return datetime.now() > self.access_token_token_expired


class WebSocket(Auth):
approval_key: str = Field(description="웹소켓 접속키")
class AuthAPI:
def __init__(
self,
app_key: str,
app_secret: str,
is_real: bool,
account_no: str,
):
self._url = REAL_URL if is_real else VIRTUAL_URL
self.app_key = app_key
self.app_secret = app_secret
self.is_real = is_real
self.cano, self.acnt_prdt_cd = account_no.split("-")
self._token: Token | None = None
self._file_path = os.path.join(tempfile.gettempdir(), f"kis_{self.app_key}")

def _request(self, method: str, url: str, **kwargs) -> AuthResponse:
resp = requests.request(method, url, **kwargs)
custom_resp = AuthResponse(status_code=resp.status_code, json=resp.json())
custom_resp.raise_for_status()
return custom_resp

def _get_token(self) -> Token:
url = f"{self._url}/oauth2/tokenP"
resp = self._request(
"POST",
url,
json={
"grant_type": "client_credentials",
"appkey": self.app_key,
"appsecret": self.app_secret,
},
)
access_token_token_expired = datetime.strptime(
resp.json["access_token_token_expired"],
"%Y-%m-%d %H:%M:%S",
)
return Token(
token_type=resp.json["token_type"],
access_token=resp.json["access_token"],
expires_in=int(resp.json["expires_in"]),
access_token_token_expired=access_token_token_expired,
)

@property
def access_token(self) -> str:
if self._token is not None and not self._token.is_expired():
return self._token.access_token

token: Token | None = None
if os.path.exists(self._file_path):
logger.debug("load token from pickle")
with open(self._file_path, "rb") as f:
token = pickle.load(f)
logger.debug(f"loaded token: {token}")

if token is None or token.is_expired():
logger.debug("get new token")
token = self._get_token()
with open(self._file_path, "wb") as f:
pickle.dump(token, f)
logger.debug(f"saved token: {token}")

self._token = token
return self._token.access_token

def get_header(self) -> dict:
return {
"content-type": "application/json",
"authorization": f"Bearer {self.access_token}",
"appkey": self.app_key,
"appsecret": self.app_secret,
}
Loading

0 comments on commit 6a86a5e

Please sign in to comment.