Skip to content

Commit

Permalink
[webhook] Tentative implementation of webhook-based differential update.
Browse files Browse the repository at this point in the history
Signed-off-by: Xiangyu Bu <[email protected]>
  • Loading branch information
xybu committed Jan 23, 2017
1 parent 75611fb commit 1cdcc99
Show file tree
Hide file tree
Showing 16 changed files with 712 additions and 61 deletions.
1 change: 1 addition & 0 deletions onedrived/data/ngrok_default_conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
console_ui: false
39 changes: 39 additions & 0 deletions onedrived/od_api_helper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import logging
import time

import onedrivesdk
import onedrivesdk.error
import requests

Expand All @@ -9,6 +11,43 @@
THROTTLE_PAUSE_SEC = 60


def get_drive_request_builder(repo):
return onedrivesdk.DriveRequestBuilder(
request_url=repo.authenticator.client.base_url + 'drives/' + repo.drive.id,
client=repo.authenticator.client)


def create_subscription(folder_item_request, repo, webhook_url, expiration_time):
"""
:param onedrivesdk.ItemRequestBuilder folder_item_request:
:param onedrived.od_repo.OneDriveLocalRepository repo:
:param str webhook_url:
:param datetime.datetime.datetime expiration_time:
:return onedrivesdk.Subscription:
"""
subscriptions_collection_req = folder_item_request.subscriptions
subscription_req_builder = onedrivesdk.SubscriptionRequestBuilder(subscriptions_collection_req._request_url,
subscriptions_collection_req._client)
subscription_req = item_request_call(repo, subscription_req_builder.request)
subscription_req.content_type = "application/json"
subscription_req.method = "POST"
subscription = onedrivesdk.Subscription()
subscription.notification_url = webhook_url
subscription.expiration_date_time = expiration_time
return onedrivesdk.Subscription(json.loads(subscription_req.send(subscription).content))


def update_subscription(self, subscription):
""" A temp patch for bug https://github.com/OneDrive/onedrive-sdk-python/issues/95. """
self.content_type = "application/json"
self.method = "PATCH"
entity = onedrivesdk.Subscription(json.loads(self.send(subscription).content))
return entity


onedrivesdk.SubscriptionRequest.update = update_subscription


def get_item_modified_datetime(item):
"""
:param onedrivesdk.Item item:
Expand Down
29 changes: 24 additions & 5 deletions onedrived/od_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import xdg

from . import mkdir, get_resource
from . import mkdir, get_resource, od_webhooks
from .od_models import account_profile as _account_profile
from .od_models import drive_config as _drive_config

Expand Down Expand Up @@ -49,21 +49,38 @@ class UserContext:
'proxies': {}, # Proxy is of format {'http': url1, 'https': url2}.
'accounts': {},
'drives': {},
'scan_interval_sec': 1800,
'scan_interval_sec': 21600, # Poll every 6 hours.
'webhook_type': od_webhooks.DEFAULT_WEBHOOK_TYPE,
'webhook_host': '',
'webhook_port': 0,
'webhook_renew_interval_sec': 7200, # Renew webhook every 2 hours.
'num_workers': 2,
'start_delay_sec': 0,
KEY_LOGFILE_PATH: '',
}

DEFAULT_CONFIG_FILENAME = 'onedrived_config_v2.json'
DEFAULT_IGNORE_FILENAME = 'ignore_v2.txt'
DEFAULT_NGROK_CONF_FILENAME = 'ngrok_conf.yaml'

CONFIGURABLE_INT_KEYS = {
'scan_interval_sec': 'Interval, in sec, between two actions of scanning the entire repository.',
'num_workers': 'Total number of worker threads.'
'scan_interval_sec': 'Interval, in seconds, between two actions of scanning the entire repository.',
'num_workers': 'Total number of worker threads.',
'webhook_renew_interval_sec': 'Renew webhook after this amount of time, in seconds. Ideal value should be '
'slightly larger than the lifespan of onedrived process.',
'webhook_port': 'Port number for webhook. Default: 0 (let OS allocate a free port).',
'start_delay_sec': 'Amount of time, in seconds, to sleep before main starts working.'
}

CONFIGURABLE_STR_KEYS = {
KEY_LOGFILE_PATH: 'Path to log file. Empty string means writing to stdout.'
KEY_LOGFILE_PATH: 'Path to log file. Empty string means writing to stdout.',
'webhook_host': 'Host to receive webhook notifications. Requests to https://host:port much reach localhost.'
'Use empty string "" to mean localhost.',
'webhook_type': 'Type of webhook. Use "direct" only if your machine can be reached from public network.'
}

ACCEPTED_VALUES = {
'webhook_type': od_webhooks.SUPPORTED_WEBHOOK_TYPES
}

SUPPORTED_PROXY_PROTOCOLS = ('http', 'https')
Expand Down Expand Up @@ -96,6 +113,8 @@ def _create_config_dir_if_missing(self):
mkdir(self.config_dir, self.user_uid, mode=0o700, exist_ok=True)
with open(self.config_dir + '/' + self.DEFAULT_IGNORE_FILENAME, 'w') as f:
f.write(get_resource('data/ignore_v2.txt'))
with open(self.config_dir + '/' + self.DEFAULT_NGROK_CONF_FILENAME, 'w') as f:
f.write(get_resource('data/ngrok_default_conf.yaml'))

@property
def loop(self):
Expand Down
80 changes: 71 additions & 9 deletions onedrived/od_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
from . import od_repo
from . import od_task
from . import od_threads
from .od_tasks import start_repo
from . import od_webhook
from .od_tasks import start_repo, merge_dir, update_subscriptions
from .od_auth import get_authenticator_and_drives
from .od_context import load_context
from .od_watcher import LocalRepositoryWatcher


context = load_context(asyncio.get_event_loop())
pidfile = context.config_dir + '/onedrived.pid'
task_workers = weakref.WeakSet()
task_pool = None
webhook_server = None
webhook_worker = None


def join_workers():
Expand All @@ -35,7 +39,11 @@ def join_workers():
# noinspection PyUnusedLocal
def shutdown_callback(msg, code):
logging.info('Shutting down.')
asyncio.gather(*asyncio.Task.all_tasks()).cancel()
context.loop.stop()
if webhook_server:
webhook_server.stop()
webhook_server.join()
if task_pool:
task_pool.close(len(task_workers))
join_workers()
Expand Down Expand Up @@ -70,6 +78,22 @@ def get_repo_table(ctx):
return all_accounts


def update_subscription_for_repo(repo, subscription_id=None):
"""
:param onedrived.od_repo.OneDriveLocalRepository repo:
:param str | None subscription_id:
:return onedrivesdk.Subscription | None:
"""
if webhook_server and webhook_worker:
task = update_subscriptions.UpdateSubscriptionTask(repo, task_pool, webhook_worker, subscription_id)
subscription = task.handle()
if subscription:
context.loop.call_later(int(context.config['webhook_renew_interval_sec'] * 0.75),
update_subscription_for_repo, repo, subscription.id)
return subscription
return None


def gen_start_repo_tasks(all_accounts, task_pool):
"""
:param dict[str, [onedrived.od_repo.OneDriveLocalRepository]] all_accounts:
Expand All @@ -78,8 +102,14 @@ def gen_start_repo_tasks(all_accounts, task_pool):
if task_pool.outstanding_task_count == 0:
for repo in itertools.chain.from_iterable(all_accounts.values()):
task_pool.add_task(start_repo.StartRepositoryTask(repo, task_pool))
logging.info('Scheduled deep-sync for Drive %s of account %s.', repo.drive.id, repo.account_id)
context.loop.call_later(context.config['scan_interval_sec'], gen_start_repo_tasks, all_accounts, task_pool)
logging.info('Scheduled sync task for Drive %s of account %s.', repo.drive.id, repo.account_id)
if update_subscription_for_repo(repo) is None:
logging.warning('Failed to create webhook. Will deep sync again in %d sec.',
context.config['scan_interval_sec'])
context.loop.call_later(context.config['scan_interval_sec'],
gen_start_repo_tasks, all_accounts, task_pool)
else:
logging.info('Will use webhook to trigger sync events.')


def delete_temp_files(all_accounts):
Expand All @@ -94,6 +124,27 @@ def delete_temp_files(all_accounts):
os.system('find "%s" -type f -name "%s" -delete' % (repo.local_root, repo.path_filter.get_temp_name('*')))


def init_task_pool_and_workers():
global task_pool
task_pool = od_task.TaskPool()
for i in range(context.config['num_workers']):
w = od_threads.TaskWorkerThread(name='Worker-%d' % len(task_workers), task_pool=task_pool)
w.start()
task_workers.add(w)


def repo_updated_callback(repo):
global task_pool, webhook_server
if task_pool:
item_request = repo.authenticator.client.item(drive=repo.drive.id, path='/')
task_pool.add_task(merge_dir.MergeDirectoryTask(
repo=repo, task_pool=task_pool, rel_path='', item_request=item_request,
assume_remote_unchanged=True, parent_remote_unchanged=False))
logging.info('Added task to check delta update for Drive %s.', repo.drive.id)
else:
logging.error('Uninitialized task pool reference.')


@click.command(cls=daemonocle.cli.DaemonCLI,
daemon_params={
'uid': context.user_uid,
Expand All @@ -102,6 +153,8 @@ def delete_temp_files(all_accounts):
'workdir': os.getcwd()
})
def main():
global task_pool, webhook_server, webhook_worker

# Exit program when receiving SIGTERM or SIGINT.
signal.signal(signal.SIGTERM, shutdown_callback)

Expand All @@ -111,22 +164,31 @@ def main():
else:
context.set_logger(min_level=logging.INFO, path=context.config[context.KEY_LOGFILE_PATH])

if context.config['start_delay_sec'] > 0:
logging.info('Wait for %d seconds before starting.', context.config['start_delay_sec'])
import time
time.sleep(context.config['start_delay_sec'])

# Initialize account information.
all_accounts = get_repo_table(context)
delete_temp_files(all_accounts)

# Start task pool and task worker.
global task_pool
task_pool = od_task.TaskPool()
for i in range(context.config['num_workers']):
w = od_threads.TaskWorkerThread(name='Worker-%d' % len(task_workers), task_pool=task_pool)
w.start()
task_workers.add(w)
init_task_pool_and_workers()

# Start webhook.
webhook_server = od_webhook.get_webhook_server(context)
webhook_worker = od_webhook.WebhookWorkerThread(webhook_url=webhook_server.webhook_url)
webhook_worker.set_callback_func(repo_updated_callback)
webhook_server.set_worker(webhook_worker)
webhook_worker.start()
webhook_server.start()

context.watcher = LocalRepositoryWatcher(task_pool=task_pool, loop=context.loop)
for repo in itertools.chain.from_iterable(all_accounts.values()):
context.watcher.add_repo(repo)

context.loop.set_debug(True)
try:
context.loop.call_soon(gen_start_repo_tasks, all_accounts, task_pool)
context.loop.run_forever()
Expand Down
68 changes: 68 additions & 0 deletions onedrived/od_models/webhook_notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
webhook_notification.py
Implementation of the datatypes used in OneDrive webhook notification, which is absent
from official OneDrive Python SDK.
:copyright: (c) Xiangyu Bu <[email protected]>
:license: MIT
"""

from .. import od_dateutils


class WebhookNotification:

""" https://dev.onedrive.com/resources/webhookNotifiation.htm """

def __init__(self, prop_dict):
self._prop_dict = prop_dict

@property
def context(self):
"""
:return str | None:
An optional string value that is passed back in the notification message for this subscription.
"""
if 'context' in self._prop_dict:
return self._prop_dict['context']
return None

@property
def expiration_datetime(self):
"""
:return arrow.Arrow: The date and time when the subscription will expire if not updated or renewed.
"""
return od_dateutils.str_to_datetime(self._prop_dict['expirationDateTime'])

@property
def resource(self):
"""
:return str: URL to the item where the subscription is registered.
"""
return self._prop_dict['resource']

@property
def subscription_id(self):
"""
:return str: The unique identifier for the subscription resource.
"""
return self._prop_dict['subscriptionId']

@property
def tenant_id(self):
"""
:return str:
Unique identifier for the tenant which generated this notification.
This is only returned for OneDrive for Business and SharePoint.
"""
if 'tenantId' in self._prop_dict:
return self._prop_dict['tenantId']
return None

@property
def user_id(self):
"""
:return str: Unique identifier for the drive which generated this notification.
"""
return self._prop_dict['userId']
4 changes: 2 additions & 2 deletions onedrived/od_pref.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python3

import os
import urllib
import urllib.parse

import click
import keyring
Expand Down Expand Up @@ -341,7 +341,7 @@ def set_drive(drive_id=None, email=None, local_root=None, ignore_file=None):
error(str(e))
return

d = context.add_drive(drive_config.LocalDriveConfig(drive_id, account_id, ignore_file, local_root))
d = context.add_drive(drive_config.LocalDriveConfig(drive_id, account_id, ignore_file, local_root, None))
save_context(context)
click.echo(click.style('\nSuccessfully configured Drive %s of account %s (%s):' % (
d.drive_id, account_profile.account_email, d.account_id), fg='green'))
Expand Down
12 changes: 9 additions & 3 deletions onedrived/od_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,27 @@ class ItemRecordStatus:
MARKED = 255


class RepositoryType:
PERSONAL = 0
BUSINESS = 1


class OneDriveLocalRepository:
SESSION_EXPIRE_THRESHOLD_SEC = 120

def __init__(self, context, authenticator, drive, drive_config):
"""
:param od_context.UserContext context:
:param od_auth.OneDriveAuthenticator authenticator:
:param onedrived.od_context.UserContext context:
:param onedrived.od_auth.OneDriveAuthenticator authenticator:
:param onedrivesdk.model.drive.Drive drive:
:param models.drive_config.LocalDriveConfig drive_config:
:param onedrived.od_models.drive_config.LocalDriveConfig drive_config:
"""
self.context = context
self.authenticator = authenticator
self.drive = drive
self.account_id = drive_config.account_id
self.local_root = drive_config.localroot_path
self.type = RepositoryType.BUSINESS if drive.drive_type == 'business' else RepositoryType.PERSONAL
self._lock = threading.Lock()
self._init_path_filter(ignore_file=drive_config.ignorefile_path)
self._init_item_store()
Expand Down
Loading

0 comments on commit 1cdcc99

Please sign in to comment.