Skip to content

Commit

Permalink
替换 torndb 为 aiomysql
Browse files Browse the repository at this point in the history
  • Loading branch information
mailgyc committed Jan 7, 2019
1 parent d2b4762 commit deb9d20
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 66 deletions.
6 changes: 3 additions & 3 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from tornado.ioloop import IOLoop
from tornado.options import options

from db import torndb
from db import aio_db
from settings.base import settings, DATABASE, LOGGING
from urls import url_patterns
from settings.urls import url_patterns

logging.config.dictConfig(LOGGING)


class WebApp(tornado.web.Application):
def __init__(self):
super().__init__(url_patterns, **settings)
self.db = torndb.Connection(**DATABASE)
self.db = aio_db.AsyncConnection(**DATABASE)
self.executor = ThreadPoolExecutor()


Expand Down
98 changes: 98 additions & 0 deletions db/aio_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio
import functools
import logging

import aiomysql
from aiomysql import Pool, DictCursor


class AsyncConnection(object):

def __init__(self, loop=None, **kwargs):
self._db_args = {
'host': kwargs.get('host'),
'db': kwargs.get('database'),
'user': kwargs.get('user'),
'password': kwargs.get('password'),
'port': kwargs.get('port', 3306),
'minsize': kwargs.get('min_connections', 1),
'maxsize': kwargs.get('max_connections', 10),
'cursorclass': DictCursor,
'autocommit': True,
}
self._loop = loop
self._conn_pool: Pool = None
self._async_wait = None

@property
def loop(self):
return self._loop or asyncio.get_event_loop()

async def close(self):
if self._async_wait:
await self._async_wait
if self._conn_pool:
self._conn_pool.terminate()
await self._conn_pool.wait_closed()
self._conn_pool = None
self._async_wait = None

async def reconnect(self):
if self._conn_pool:
return
elif self._async_wait:
await self._async_wait
else:
# self._async_wait = asyncio.Future(loop=self._loop)
self._async_wait = self.loop.create_future()
try:
self._conn_pool = await aiomysql.create_pool(loop=self.loop, **self._db_args)
self._async_wait.set_result(True)
except Exception as e:
if not self._async_wait.done():
self._async_wait.set_exception(e)
self._async_wait = None
raise

async def fetchone(self, query: str, *args, **kwargs):
cursor = await self.cursor()
try:
await self._execute(cursor, query, args, kwargs)
return cursor.fetchone()
finally:
await cursor.close()

async def insert(self, query: str, *args, **kwargs):
cursor = await self.cursor()
try:
await self._execute(cursor, query, args, kwargs)
return cursor.lastrowid
finally:
await cursor.close()

def release(self, conn):
self._conn_pool.release(conn)

async def cursor(self, conn=None) -> aiomysql.Cursor:
in_transaction = conn is not None
if not conn:
if not self._conn_pool:
await self.reconnect()
conn = await self._conn_pool.acquire()
cursor = await conn.cursor()
cursor.release = functools.partial(self.release_cursor, cursor, in_transaction=in_transaction)
return cursor

async def release_cursor(self, cursor, in_transaction=False):
conn = cursor.connection
await cursor.close()
if not in_transaction:
self.release(conn)

async def _execute(self, cursor: aiomysql.Cursor, query: str, args, kwargs):
try:
return await cursor.execute(query, kwargs or args)
except aiomysql.OperationalError:
logging.exception("Error connecting to MySQL on %s", self.host)
await self.close()
raise
11 changes: 9 additions & 2 deletions handlers/base.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import json
from concurrent.futures import ThreadPoolExecutor

from tornado.escape import json_encode
from tornado.web import RequestHandler

from db import torndb
from db import aio_db


class BaseHandler(RequestHandler):

@property
def db(self) -> torndb.Connection:
def db(self) -> aio_db.AsyncConnection:
return self.application.db

@property
Expand All @@ -19,6 +20,12 @@ def executor(self) -> ThreadPoolExecutor:
def data_received(self, chunk):
pass

def get_query_params(self, name, default=None, strip=True):
if not hasattr(self, 'query_params'):
query_params = json.loads(self.request.body.decode('utf-8'))
setattr(self, 'query_params', query_params)
return self.query_params.get(name, default)

def get_current_user(self):
return self.get_secure_cookie("user")

Expand Down
34 changes: 16 additions & 18 deletions handlers/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,30 @@
from tornado.websocket import WebSocketHandler, WebSocketClosedError

from core.player import Player
from core.room import RoomManager, Room
from db import torndb
from core.room import RoomManager
from db import aio_db
from .protocol import Protocol as Pt

logger = logging.getLogger('ddz')


def shot_turn(method):
@functools.wraps(method)
def wrapper(socket, packet):
if socket.player.seat == socket.player.table.whose_turn:
method(socket, packet)
else:
logger.warning('Player[%d] TURN CHEAT', socket.uid)
logging.warning('Player[%d] TURN CHEAT', socket.uid)
return wrapper


class SocketHandler(WebSocketHandler):

def __init__(self, application, request, **kwargs):
super().__init__(application, request, **kwargs)
self.db: torndb.Connection = self.application.db
self.db: aio_db.AsyncConnection = self.application.db
self.player: Player = None

def data_received(self, chunk):
logger.info('socket data_received')
logging.info('socket data_received')

def get_current_user(self):
return json_decode(self.get_secure_cookie("user"))
Expand All @@ -49,15 +47,15 @@ def room(self):
def open(self):
user = self.current_user
self.player = Player(user['uid'], user['username'], self)
logger.info('SOCKET[%s] OPEN', self.player.uid)
logging.info('SOCKET[%s] OPEN', self.player.uid)

def on_close(self):
self.player.leave_table()
logger.info('SOCKET[%s] CLOSE', self.player.uid)
logging.info('SOCKET[%s] CLOSE', self.player.uid)

def on_message(self, message):
async def on_message(self, message):
packet = json.loads(message)
logger.info('REQ[%d]: %s', self.uid, packet)
logging.info('REQ[%d]: %s', self.uid, packet)

code = packet[0]
if code == Pt.REQ_LOGIN:
Expand All @@ -78,22 +76,22 @@ def on_message(self, message):
# TODO: check player was already in table.
table = self.room.new_table()
self.player.join_table(table)
logger.info('PLAYER[%s] NEW TABLE[%d]', self.uid, table.uid)
logging.info('PLAYER[%s] NEW TABLE[%d]', self.uid, table.uid)
self.write_message([Pt.RSP_NEW_TABLE, table.uid])

elif code == Pt.REQ_JOIN_TABLE:
table = self.room.find_waiting_table(packet[1])
if not table:
self.write_message([Pt.RSP_TABLE_LIST, self.room.rsp_tables()])
logger.info('PLAYER[%d] JOIN TABLE[%d] NOT FOUND', self.uid, packet[1])
logging.info('PLAYER[%d] JOIN TABLE[%d] NOT FOUND', self.uid, packet[1])
return

self.player.join_table(table)
logger.info('PLAYER[%s] JOIN TABLE[%d]', self.uid, table.uid)
logging.info('PLAYER[%s] JOIN TABLE[%d]', self.uid, table.uid)
if table.is_full():
table.deal_poker()
self.room.on_table_changed(table)
logger.info('TABLE[%s] GAME BEGIN[%s]', table.uid, table.players)
logging.info('TABLE[%s] GAME BEGIN[%s]', table.uid, table.players)

elif code == Pt.REQ_CALL_SCORE:
self.handle_call_score(packet)
Expand All @@ -112,7 +110,7 @@ def on_message(self, message):
elif code == Pt.REQ_RESTART:
self.player.table.reset()
else:
logger.info('UNKNOWN PACKET: %s', code)
logging.info('UNKNOWN PACKET: %s', code)

@shot_turn
def handle_call_score(self, packet):
Expand All @@ -136,12 +134,12 @@ def handle_cheat(self, uid):
def write_message(self, message, binary=False):
if self.ws_connection is None:
raise WebSocketClosedError()
logger.info('RSP[%d]: %s', self.uid, message)
logging.info('RSP[%d]: %s', self.uid, message)
packet = json.dumps(message)
return self.ws_connection.write_message(packet, binary=binary)

def send_updates(cls, chat):
logger.info('sending message to %d waiters', len(cls.waiters))
logging.info('sending message to %d waiters', len(cls.waiters))
for waiter in cls.waiters:
waiter.write_message('tornado:' + chat)

Expand Down
37 changes: 19 additions & 18 deletions handlers/web.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import json

import bcrypt
from tornado.escape import json_encode

from handlers.base import BaseHandler

Expand All @@ -15,36 +17,35 @@ def get(self):

class RegHandler(BaseHandler):

def post(self):
email = self.get_argument('email', self.get_argument('username'))
account = self.db.get('SELECT * FROM account WHERE email="%s"', email)

if account:
self.write('1')
async def post(self):
email = self.get_query_params('email', self.get_query_params('username'))
account: asyncio.Future = await self.db.fetchone('SELECT id FROM account WHERE email=%s', email)
if account and account.result():
self.write({'errcode': 1, 'errmsg': 'The email has already exist'})
return

username = self.get_argument('username')
password = self.get_argument('password')
username = self.get_query_params('username')
password = self.get_query_params('password')
password = bcrypt.hashpw(password.encode('utf8'), bcrypt.gensalt())

uid = self.db.insert('INSERT INTO account (email, username, password) VALUES ("%s", "%s", "%s")',
email, username, password)

uid = await self.db.insert('INSERT INTO account (email, username, password) VALUES (%s, %s, %s)',
email, username, password)
print(uid)
self.set_current_user(uid, username)
self.set_header('Content-Type', 'application/json')
info = {
'uid': uid,
'username': username,
response = {
'errcode': 0,
'userinfo': {'uid': uid, 'username': username}
}
self.write(json_encode(info))
self.write(response)


class LoginHandler(BaseHandler):

def post(self):
async def post(self):
email = self.get_argument('email')
password = self.get_argument("password")
account = self.db.get('SELECT * FROM account WHERE email="%s"', email)
account = await self.db.get('SELECT * FROM account WHERE email=%s', email)
password = bcrypt.hashpw(password.encode('utf8'), account.get('password'))

self.set_header('Content-Type', 'application/json')
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aiomysql==0.0.20
bcrypt==3.1.4
coverage==4.5.2
Pillow==5.3.0
Expand Down
13 changes: 4 additions & 9 deletions settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,14 @@
},
},
'loggers': {
'db': {
'': {
'level': 'INFO',
'handlers': ['console', 'file'],
'handlers': ['file'],
'propagate': False,
},
'core': {
'root': {
'level': 'INFO',
'handlers': ['console'],
'propagate': False,
},
'handlers': {
'level': 'INFO',
'handlers': ['console'],
'handlers': ['console', 'file'],
'propagate': False,
},
},
Expand Down
File renamed without changes.
30 changes: 14 additions & 16 deletions static/js/boot.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,28 +184,26 @@ PG.Login = {
return;
}

var httpRequest = new XMLHttpRequest();
var that = this;
httpRequest.onreadystatechange = function () {
if (httpRequest.readyState === XMLHttpRequest.DONE) {
if (httpRequest.status === 200) {
if (httpRequest.responseText == '1') {
that.errorText.text = '该用户名已经被占用';
} else {
PG.playerInfo = JSON.parse(httpRequest.responseText);
var xhr = new XMLHttpRequest();
xhr.open('POST', '/reg', true);
xhr.setRequestHeader('Content-type', 'application/json;charset=UTF-8');
xhr.setRequestHeader('X-Csrftoken', PG.getCookie("_xsrf"));
xhr.onreadystatechange = function () {
if (xhr.readyState === XMLHttpRequest.DONE) {
if (xhr.status === 200) {
var response = JSON.parse(xhr.responseText);
if (response.errcode == 0) {
PG.playerInfo = response.userinfo
that.state.start('MainMenu');
} else {
that.errorText.text = response.errmsg;
}
} else {
console.log('Error:' + httpRequest.status);
that.errorText.text = httpRequest.responseText;
that.errorText.text = xhr.responseText;
}
}
};
httpRequest.open('POST', '/reg', true);
httpRequest.setRequestHeader('Content-Type', 'application/x-www-form-urlencoded');
httpRequest.setRequestHeader('X-Csrftoken', PG.getCookie("_xsrf"));

var req = 'username=' + encodeURIComponent(this.username.value) + '&password=' + encodeURIComponent(this.password.value);
httpRequest.send(req);
xhr.send(JSON.stringify({"username": this.username.value, "password": this.password.value}));
}
};

0 comments on commit deb9d20

Please sign in to comment.