Skip to content

Commit

Permalink
Update gDrive_calculator.py
Browse files Browse the repository at this point in the history
  • Loading branch information
XtremeRahul authored Sep 6, 2020
1 parent 77619d0 commit e9df10e
Showing 1 changed file with 141 additions and 141 deletions.
282 changes: 141 additions & 141 deletions gDrive_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,139 +12,139 @@


def get_readable_file_size(size_in_bytes) -> str:
SIZE_UNITS = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
if size_in_bytes is None:
return '0B'
index = 0
while size_in_bytes >= 1024:
size_in_bytes /= 1024
index += 1
try:
return f'{round(size_in_bytes, 2)}{SIZE_UNITS[index]}'
except IndexError:
return 'File too large'
SIZE_UNITS = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
if size_in_bytes is None:
return '0B'
index = 0
while size_in_bytes >= 1024:
size_in_bytes /= 1024
index += 1
try:
return f'{round(size_in_bytes, 2)}{SIZE_UNITS[index]}'
except IndexError:
return 'File too large'


class GoogleDriveSizeCalculate:
def __init__(self, service=None):
self.__G_DRIVE_DIR_MIME_TYPE = "application/vnd.google-apps.folder"
self.__service = service
if service is None:
print("Pass service to GoogleDriveSizeCalculate class. Please fully read the instructions given in the repo.")
return
self.total_bytes = 0
self.total_files = 0
self.total_folders = 0

@staticmethod
def getIdFromUrl(link: str):
if "folders" in link or "file" in link:
regex = r"https://drive\.google\.com/(drive)?/?u?/?\d?/?(mobile)?/?(file)?(folders)?/?d?/([-\w]+)[?+]?/?(w+)?"
res = re.search(regex,link)
if res is None:
return "GDrive ID not found. Try sending url in different format."
return res.group(5)
parsed = urlparse.urlparse(link)
return parse_qs(parsed.query)['id'][0]

def gdrive_checker(self, LINKorID):
if self.__service is None:
return
if 'drive.google.com' in LINKorID:
try:
file_id = self.getIdFromUrl(LINKorID)
if 'GDrive ID not found.' in file_id:
print(file_id)
return
except (KeyError, IndexError):
print("GDrive ID could not be found in the provided link.")
return
else:
file_id = LINKorID.strip()

error = False
print("Calculating... Please Wait!")

try:
drive_file = self.__service.files().get(fileId=file_id, fields="id, name, mimeType, size",
supportsTeamDrives=True).execute()
name = drive_file['name']
if drive_file['mimeType'] == self.__G_DRIVE_DIR_MIME_TYPE:
typee = 'Folder'
self.total_folders += 1
self.gDrive_directory(**drive_file)
else:
try:
typee = drive_file['mimeType']
except:
typee = 'File'
self.total_files += 1
self.gDrive_file(**drive_file)

except Exception as e:
print('\n')
if 'HttpError' in str(e):
h_e = str(e)
ori = h_e
try:
h_e = h_e.replace('<', '').replace('>', '')
h_e = h_e.split('when')
f = h_e[0].strip()
s = h_e[1].split('"')[1].split('"')[0].strip()
e = f"{f}\n{s}"
except:
e = ori
print(str(e))
error = True
finally:
if error:
return
return {
'name': name,
'size': get_readable_file_size(self.total_bytes),
'bytes': self.total_bytes,
'type': typee,
'files': self.total_files,
'folders': self.total_folders
}

def list_drive_dir(self, file_id: str) -> list:
query = f"'{file_id}' in parents and (name contains '*')"
fields = 'nextPageToken, files(id, mimeType, size)'
page_token = None
page_size = 1000
files = []
while True:
response = self.__service.files().list(supportsTeamDrives=True,
includeTeamDriveItems=True,
q=query, spaces='drive',
fields=fields, pageToken=page_token,
pageSize=page_size, corpora='allDrives',
orderBy='folder, name').execute()
files.extend(response.get('files', []))
page_token = response.get('nextPageToken', None)
if page_token is None:
break
return files

def gDrive_file(self, **kwargs):
try:
size = int(kwargs['size'])
except:
size = 0
self.total_bytes += size

def gDrive_directory(self, **kwargs) -> None:
files = self.list_drive_dir(kwargs['id'])
if len(files) == 0:
return
for file_ in files:
if file_['mimeType'] == self.__G_DRIVE_DIR_MIME_TYPE:
self.total_folders += 1
self.gDrive_directory(**file_)
else:
self.total_files += 1
self.gDrive_file(**file_)
def __init__(self, service=None):
self.__G_DRIVE_DIR_MIME_TYPE = "application/vnd.google-apps.folder"
self.__service = service
if service is None:
print("Pass service to GoogleDriveSizeCalculate class. Please fully read the instructions given in the repo.")
return
self.total_bytes = 0
self.total_files = 0
self.total_folders = 0

@staticmethod
def getIdFromUrl(link: str):
if "folders" in link or "file" in link:
regex = r"https://drive\.google\.com/(drive)?/?u?/?\d?/?(mobile)?/?(file)?(folders)?/?d?/([-\w]+)[?+]?/?(w+)?"
res = re.search(regex,link)
if res is None:
return "GDrive ID not found. Try sending url in different format."
return res.group(5)
parsed = urlparse.urlparse(link)
return parse_qs(parsed.query)['id'][0]

def gdrive_checker(self, LINKorID):
if self.__service is None:
return
if 'drive.google.com' in LINKorID:
try:
file_id = self.getIdFromUrl(LINKorID)
if 'GDrive ID not found.' in file_id:
print(file_id)
return
except (KeyError, IndexError):
print("GDrive ID could not be found in the provided link.")
return
else:
file_id = LINKorID.strip()

error = False
print("Calculating... Please Wait!")

try:
drive_file = self.__service.files().get(fileId=file_id, fields="id, name, mimeType, size",
supportsTeamDrives=True).execute()
name = drive_file['name']
if drive_file['mimeType'] == self.__G_DRIVE_DIR_MIME_TYPE:
typee = 'Folder'
self.total_folders += 1
self.gDrive_directory(**drive_file)
else:
try:
typee = drive_file['mimeType']
except:
typee = 'File'
self.total_files += 1
self.gDrive_file(**drive_file)

except Exception as e:
print('\n')
if 'HttpError' in str(e):
h_e = str(e)
ori = h_e
try:
h_e = h_e.replace('<', '').replace('>', '')
h_e = h_e.split('when')
f = h_e[0].strip()
s = h_e[1].split('"')[1].split('"')[0].strip()
e = f"{f}\n{s}"
except:
e = ori
print(str(e))
error = True
finally:
if error:
return
return {
'name': name,
'size': get_readable_file_size(self.total_bytes),
'bytes': self.total_bytes,
'type': typee,
'files': self.total_files,
'folders': self.total_folders
}

def list_drive_dir(self, file_id: str) -> list:
query = f"'{file_id}' in parents and (name contains '*')"
fields = 'nextPageToken, files(id, mimeType, size)'
page_token = None
page_size = 1000
files = []
while True:
response = self.__service.files().list(supportsTeamDrives=True,
includeTeamDriveItems=True,
q=query, spaces='drive',
fields=fields, pageToken=page_token,
pageSize=page_size, corpora='allDrives',
orderBy='folder, name').execute()
files.extend(response.get('files', []))
page_token = response.get('nextPageToken', None)
if page_token is None:
break
return files

def gDrive_file(self, **kwargs):
try:
size = int(kwargs['size'])
except:
size = 0
self.total_bytes += size

def gDrive_directory(self, **kwargs) -> None:
files = self.list_drive_dir(kwargs['id'])
if len(files) == 0:
return
for file_ in files:
if file_['mimeType'] == self.__G_DRIVE_DIR_MIME_TYPE:
self.total_folders += 1
self.gDrive_directory(**file_)
else:
self.total_files += 1
self.gDrive_file(**file_)

#
# ~ Notes ~
Expand All @@ -161,18 +161,18 @@ def gDrive_directory(self, **kwargs) -> None:
# Required credentials.json in this script's directory!
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as f:
credentials = pickle.load(f)
with open('token.pickle', 'rb') as f:
credentials = pickle.load(f)
if credentials is None or not credentials.valid:
if credentials and credentials.expired and credentials.refresh_token:
credentials.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', oauth_scope)
print(flow)
credentials = flow.run_console(port=0)
with open('token.pickle', 'wb') as token:
pickle.dump(credentials, token)
if credentials and credentials.expired and credentials.refresh_token:
credentials.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', oauth_scope)
print(flow)
credentials = flow.run_console(port=0)
with open('token.pickle', 'wb') as token:
pickle.dump(credentials, token)
service = build('drive', 'v3', credentials=credentials, cache_discovery=False)
"""
Expand Down

0 comments on commit e9df10e

Please sign in to comment.