Skip to content

Commit

Permalink
create upload path and refactor code to models
Browse files Browse the repository at this point in the history
  • Loading branch information
Pratomrerk committed Jan 31, 2023
1 parent 30ca837 commit 6165143
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 129 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
env
upload/*
models/__pycache__
docker-compose.production
slip.test.jpg
181 changes: 52 additions & 129 deletions app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import os, sys
import requests
import json
import base64
import cv2
import os
from flask import Flask, request, jsonify
from flask_cors import CORS
import datetime as dt
import hashlib
import uuid
import random
from models.slip import slip
from models.scb import scb
from models.kbank import kbank

# slip verification
# pratomrerk
Expand All @@ -18,11 +17,7 @@
root_path = os.path.dirname(os.path.abspath(__file__))
UPLOAD_FOLDER = os.path.join(root_path, 'upload')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}

KBANK_CONSUMER_ID = os.environ.get('KBANK_CONSUMER_ID', 'a2FzaWtvcm5iYW5rdXNlcg==')
KBANK_CONSUMER_SECRET = os.environ.get('KBANK_CONSUMER_SECRET', 'a2FzaWtvcm5iYW5rcGFzc3dvcmQ=')
KBANK_AUTHORIZATION = base64.b64encode(f'{KBANK_CONSUMER_ID}:{KBANK_CONSUMER_SECRET}'.encode()).decode()
KBANK_TEST_MODE = os.environ.get('KBANK_TEST_MODE', 'false')
USER_AGENT = 'Slip-Verification/1.0'

SENDING_BANK_IDS = {
'002': 'Bangkok Bank',
Expand All @@ -45,90 +40,7 @@
'073': 'Land and Houses Bank'
}

USER_AGENT = 'PostmanRuntime/7.26.8'

app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['KBANK_ACCESS_TOKEN'] = None

def kbank_oauth():
url = 'https://openapi-sandbox.kasikornbank.com/v2/oauth/token'
headers = {
'Authorization': 'Basic ' + KBANK_AUTHORIZATION,
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': USER_AGENT,
}
if KBANK_TEST_MODE == 'true':
headers['x-test-mode'] = 'true'
headers['env-id'] = 'OAUTH2'
data = {
'grant_type': 'client_credentials'
}
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
return response.json()
else:
print('Error: kbank_oauth')
print(response.text)
return None

def kbank_verify(user, slip):
url = 'https://openapi-sandbox.kasikornbank.com/v1/verslip/kbank/verify'
headers = {
'Authorization': 'Bearer ' + user['access_token'],
'Content-Type': 'application/json',
'User-Agent': USER_AGENT,
}
if KBANK_TEST_MODE == 'true':
headers['x-test-mode'] = 'true'
print(headers)
rqUID = str(uuid.uuid1())
# ISO 8601 format
rqDt = dt.datetime.now().isoformat() + '+07:00'
data = {
'rqUID': rqUID,
'rqDt': rqDt,
'data': {
'sendingBank' : slip['sending_bank_id'],
'transRef' : slip['trans_ref'],
},
}
print(data)
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()
else:
print('Error: kbank_verify')
print(response.text)
return None

def slip_image_qr_decoder(image_path):
img = cv2.imread(image_path)
qrDecoder = cv2.QRCodeDetector()
data, bbox, straight_qrcode = qrDecoder.detectAndDecode(img)
if data:
return data

def crc_iso13239(data, poly=0x1021, init=0xffff, xor_out=0xffff):
crc = init
for byte in data:
crc = crc ^ (byte << 8)
for i in range(8):
if crc & 0x8000:
crc = (crc << 1) ^ poly
else:
crc = crc << 1
return crc & xor_out

def get_field(data: str) -> str:
try:
id = data[0:2]
length = int(data[2:4])
payload = data[4:4 + length]
payload_next = data[4 + length:]
return id, length, payload, payload_next
except:
print('Error: get field : ' + data)
return '', None, None, None

def allowed_extension(filename):
return '.' in filename and \
Expand All @@ -143,39 +55,52 @@ def slip_info():
'message': 'slip-image is required'
})

# Slip Image QR Code
# slip image qr code
file = request.files['slip-image']
if file and allowed_extension(file.filename) == False:
return jsonify({
'statusCode': 400,
'message': 'slip-image must be image file'
})

#file md5
# get file md5
file_md5 = hashlib.md5(file.read()).hexdigest()
slip_image_path = os.path.join(app.config['UPLOAD_FOLDER'], file_md5 + '.jpg')

# create upload path
now = dt.datetime.now()
dir_ymd = ['%Y', '%m', '%d']
upload_path = app.config['UPLOAD_FOLDER']
for d in dir_ymd:
upload_path = os.path.join(upload_path, now.strftime(d))
if not os.path.exists(upload_path):
os.mkdir(upload_path)

# slip image save
slip_image_path = os.path.join(upload_path, file_md5 + '.jpg')
file.seek(0)
file.save(slip_image_path)
file.close()

mini_qr_data = slip_image_qr_decoder(slip_image_path)
# read qr code
mini_qr_data = slip.qr_decoder(slip_image_path)

info = {}
#test_data = '0046000600000101030140225202301299A6KPki5w0SLXn2685102TH91048BBD'
info['MINI_QR_DATA'] = mini_qr_data

# Payload
id00, length00, payload00, payload_next = get_field(mini_qr_data)
id00, length00, payload00, payload_next = slip.get_field(mini_qr_data)
if id00 == '00':
subid00, sublength00, API_ID, next = get_field(payload00)
subid00, sublength00, API_ID, next = slip.get_field(payload00)
info['API_ID'] = API_ID
subid01, sublength01, BANK_ID, next = get_field(next)
subid01, sublength01, BANK_ID, next = slip.get_field(next)
info['SENDING_BANK_ID'] = BANK_ID
if BANK_ID in SENDING_BANK_IDS:
info['SENDING_BANK_NAME'] = SENDING_BANK_IDS[BANK_ID]
else:
info['SENDING_BANK_NAME'] = 'Unknown'
subid02, sublength02, REF_ID, next = get_field(next)
print('[+] New Bank ID: ' + BANK_ID)
subid02, sublength02, REF_ID, next = slip.get_field(next)
info['REF_ID'] = REF_ID
yyyymmdd = REF_ID[0:8]
info['DATE'] = yyyymmdd[0:4] + '-' + yyyymmdd[4:6] + '-' + yyyymmdd[6:8]
Expand All @@ -188,7 +113,7 @@ def slip_info():
})

# Country Code
id51, length51, payload51, payload_next = get_field(payload_next)
id51, length51, payload51, payload_next = slip.get_field(payload_next)
if id51 == '51':
info['COUNTRY_CODE'] = payload51
else:
Expand All @@ -199,13 +124,13 @@ def slip_info():
})

# CRC
id91, length91, payload91, payload_next = get_field(payload_next)
id91, length91, payload91, payload_next = slip.get_field(payload_next)
if id91 == '91':
#info['CRC_INT'] = int(payload91, 16)
#info['CRC'] = payload91
checksum = mini_qr_data[-4:]
data_byte = bytearray(mini_qr_data[0:-4].encode())
crc = crc_iso13239(data_byte)
crc = slip.crc_iso13239(data_byte)
crc_hex = hex(crc)[2:].upper()
is_match = 0
if crc_hex == checksum:
Expand All @@ -231,27 +156,7 @@ def slip_info():

@app.route('/verify', methods=['POST'])
def verifier():

timestamp = dt.datetime.timestamp(dt.datetime.now())

# check expire
if app.config['KBANK_ACCESS_TOKEN'] is not None:
user = app.config['KBANK_ACCESS_TOKEN']
if timestamp >= user['expires']:
app.config['KBANK_ACCESS_TOKEN'] = None

if app.config['KBANK_ACCESS_TOKEN'] is None:
user = kbank_oauth()
if user is None:
return jsonify({
'statusCode': 400,
'message': 'Error: bank access'
})
user['expires'] = timestamp + int(user['expires_in'])
app.config['KBANK_ACCESS_TOKEN'] = user

user = app.config['KBANK_ACCESS_TOKEN']
#print(user)
data = request.get_json()

if 'sending_bank_id' not in data:
Expand All @@ -272,15 +177,33 @@ def verifier():
'message': 'sending_bank_id is invalid'
})

response = kbank_verify(user, data)
if response is None:
sending_bank_id = data['sending_bank_id']
trans_ref = data['trans_ref']

response = None
use_banks = os.environ.get('USE_BANKS', 'KBANK,SCB').split(',')
use_banks = random.sample(use_banks, len(use_banks))
for bank in use_banks:
if os.environ.get('USE_' + bank, '1') == '0':
use_banks.remove(bank)

for bank in use_banks:
if response is not None:
break
if bank == 'SCB':
bank = scb(USER_AGENT)
if bank == 'KBANK':
bank = kbank(USER_AGENT)
response = bank.verifier(sending_bank_id, trans_ref)

if response is not None:
return jsonify(response)
else:
return jsonify({
'statusCode': 400,
'message': 'Error: bank verify'
})

return jsonify(response)

if __name__ == '__main__':

app.run(port=9111, debug=True)
Expand Down
96 changes: 96 additions & 0 deletions models/kbank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import os
import requests
import base64
import uuid
import datetime as dt

# pratomrerk
# update 2023-01-31
# ref https://apiportal.kasikornbank.com/product/public/Information/Slip Verification/Try API/OAuth 2.0

class kbank:

def __init__(self, USER_AGENT):
self.KBANK_CONSUMER_ID = os.environ.get('KBANK_CONSUMER_ID', 'a2FzaWtvcm5iYW5rdXNlcg==')
self.KBANK_CONSUMER_SECRET = os.environ.get('KBANK_CONSUMER_SECRET', 'a2FzaWtvcm5iYW5rcGFzc3dvcmQ=')
self.KBANK_AUTHORIZATION = base64.b64encode(f'{self.KBANK_CONSUMER_ID}:{self.KBANK_CONSUMER_SECRET}'.encode()).decode()
self.KBANK_TEST_MODE = os.environ.get('KBANK_TEST_MODE', '0')
self.KBANK_ACCESS_TOKEN = None
self.USER_AGENT = USER_AGENT
self.BASE_URL = 'https://openapi-sandbox.kasikornbank.com'

def oauth(self):
url = f'{self.BASE_URL}/v2/oauth/token'
headers = {
'Authorization': 'Basic ' + self.KBANK_AUTHORIZATION,
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': self.USER_AGENT,
}
if self.KBANK_TEST_MODE == '1':
headers['x-test-mode'] = 'true'
headers['env-id'] = 'OAUTH2'
data = {
'grant_type': 'client_credentials'
}
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
return response.json()
else:
print('Error: kbank_oauth')
print(response.text)
return None


def verify(self, user, slip):
url = f'{self.BASE_URL}/v1/verslip/kbank/verify'
headers = {
'Authorization': 'Bearer ' + user['access_token'],
'Content-Type': 'application/json',
'User-Agent': self.USER_AGENT,
}
if self.KBANK_TEST_MODE == '1':
headers['x-test-mode'] = 'true'
print(headers)
rqUID = str(uuid.uuid1())
# ISO 8601 format
rqDt = dt.datetime.now().isoformat() + '+07:00'
data = {
'rqUID': rqUID,
'rqDt': rqDt,
'data': {
'sendingBank' : slip['sending_bank_id'],
'transRef' : slip['trans_ref'],
},
}
print(data)
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()
else:
print('Error: kbank_verify')
print(response.text)
return None

def verifier(self, sending_bank_id, trans_ref):

timestamp = dt.datetime.timestamp(dt.datetime.now())

# check expire
if self.KBANK_ACCESS_TOKEN is not None:
user = self.KBANK_ACCESS_TOKEN
if timestamp >= user['expires']:
self.KBANK_ACCESS_TOKEN = None

if self.KBANK_ACCESS_TOKEN is None:
user = self.oauth()
if user is None:
return None
user['expires'] = timestamp + int(user['expires_in'])
self.KBANK_ACCESS_TOKEN = user

user = self.KBANK_ACCESS_TOKEN
#print(user)
return self.verify(user, {
'sending_bank_id': sending_bank_id,
'trans_ref': trans_ref
})
Loading

0 comments on commit 6165143

Please sign in to comment.