forked from chimpler/async-stream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_writer.py
32 lines (23 loc) · 977 Bytes
/
async_writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from typing import Optional, Iterable, List, Any
from asyncstream import AsyncFileObj
class AsyncWriter(object):
def __init__(self, afd: AsyncFileObj, columns=Optional[Iterable[str]], column_types=Optional[Iterable[str]], has_header=False, sep=b',', eol=b'\n'):
self._afd = afd
self._sep = sep
self._eol = eol
self._columns = columns
self._column_types = column_types
self._has_header = has_header
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
return await self._afd.close()
async def writeheader(self):
await self._afd.write(self._sep.join(self._columns) + self._eol)
async def writerow(self, row: List[Any]):
await self._afd.write(self._sep.join(row) + self._eol)
async def writerows(self, rows: List[List[Any]]):
for row in rows:
await self.writerow(row)
async def flush(self):
await self._afd.flush()