Skip to content

Latest commit

 

History

History
341 lines (242 loc) · 13.5 KB

iot-hub-python-python-schedule-jobs.md

File metadata and controls

341 lines (242 loc) · 13.5 KB
title description author manager ms.service services ms.devlang ms.topic ms.date ms.author
Schedule jobs with Azure IoT Hub (Python) | Microsoft Docs
How to schedule an Azure IoT Hub job to invoke a direct method on multiple devices. You use the Azure IoT SDKs for Python to implement the simulated device apps and a service app to run the job.
kgremban
timlt
iot-hub
iot-hub
python
conceptual
02/16/2018
kgremban

Schedule and broadcast jobs (Python)

[!INCLUDE iot-hub-selector-schedule-jobs]

Azure IoT Hub is a fully managed service that enables a back-end app to create and track jobs that schedule and update millions of devices. Jobs can be used for the following actions:

  • Update desired properties
  • Update tags
  • Invoke direct methods

Conceptually, a job wraps one of these actions and tracks the progress of execution against a set of devices, which is defined by a device twin query. For example, a back-end app can use a job to invoke a reboot method on 10,000 devices, specified by a device twin query and scheduled at a future time. That application can then track progress as each of those devices receive and execute the reboot method.

Learn more about each of these capabilities in these articles:

[!INCLUDE iot-hub-basic]

This tutorial shows you how to:

  • Create a Python simulated device app that has a direct method, which enables lockDoor, which can be called by the solution back end.
  • Create a Python console app that calls the lockDoor direct method in the simulated device app using a job and updates the desired properties using a device job.

At the end of this tutorial, you have two Python apps:

simDevice.py, which connects to your IoT hub with the device identity and receives a lockDoor direct method.

scheduleJobService.py, which calls a direct method in the simulated device app and updates the device twin's desired properties using a job.

To complete this tutorial, you need the following:

Note

The Azure IoT SDK for Python does not directly support Jobs functionality. Instead this tutorial offers an alternate solution utilizing asynchronous threads and timers. For further updates, see the Service Client SDK feature list on the Azure IoT SDK for Python page.

Create an IoT hub

[!INCLUDE iot-hub-include-create-hub]

Retrieve connection string for IoT hub

[!INCLUDE iot-hub-include-find-connection-string]

Register a new device in the IoT hub

[!INCLUDE iot-hub-include-create-device]

Create a simulated device app

In this section, you create a Python console app that responds to a direct method called by the cloud, which triggers a simulated lockDoor method.

  1. At your command prompt, run the following command to install the azure-iot-device-client package:

    pip install azure-iothub-device-client
    
  2. Using a text editor, create a new simDevice.py file in your working directory.

  3. Add the following import statements and variables at the start of the simDevice.py file. Replace deviceConnectionString with the connection string of the device you created above:

    import time
    import sys
    
    import iothub_client
    from iothub_client import IoTHubClient, IoTHubClientError, IoTHubTransportProvider, IoTHubClientResult
    from iothub_client import IoTHubError, DeviceMethodReturnValue
    
    METHOD_CONTEXT = 0
    TWIN_CONTEXT = 0
    WAIT_COUNT = 10
    
    PROTOCOL = IoTHubTransportProvider.MQTT
    CONNECTION_STRING = "{deviceConnectionString}"
  4. Add the following function callback to handle the lockDoor method:

    def device_method_callback(method_name, payload, user_context):
        if method_name == "lockDoor":
            print ( "Locking Door!" )
    
            device_method_return_value = DeviceMethodReturnValue()
            device_method_return_value.response = "{ \"Response\": \"lockDoor called successfully\" }"
            device_method_return_value.status = 200
            return device_method_return_value
  5. Add another function callback to handle device twins updates:

    def device_twin_callback(update_state, payload, user_context):
        print ( "")
        print ( "Twin callback called with:")
        print ( "payload: %s" % payload )
  6. Add the following code to register the handler for the lockDoor method. Also include the main routine:

    def iothub_jobs_sample_run():
        try:
            client = IoTHubClient(CONNECTION_STRING, PROTOCOL)
            client.set_device_method_callback(device_method_callback, METHOD_CONTEXT)
            client.set_device_twin_callback(device_twin_callback, TWIN_CONTEXT)
    
            print ( "Direct method initialized." )
            print ( "Device twin callback initialized." )
            print ( "IoTHubClient waiting for commands, press Ctrl-C to exit" )
    	
            while True:
                status_counter = 0
                while status_counter <= WAIT_COUNT:
                    time.sleep(10)
                    status_counter += 1
            
        except IoTHubError as iothub_error:
            print ( "Unexpected error %s from IoTHub" % iothub_error )
            return
        except KeyboardInterrupt:
            print ( "IoTHubClient sample stopped" )
    
    if __name__ == '__main__':
        print ( "Starting the IoT Hub Python jobs sample..." )
        print ( "    Protocol %s" % PROTOCOL )
        print ( "    Connection string=%s" % CONNECTION_STRING )
    
        iothub_jobs_sample_run()
  7. Save and close the simDevice.py file.

Note

To keep things simple, this tutorial does not implement any retry policy. In production code, you should implement retry policies (such as an exponential backoff), as suggested in the article, Transient Fault Handling.

Schedule jobs for calling a direct method and updating a device twin's properties

In this section, you create a Python console app that initiates a remote lockDoor on a device using a direct method and update the device twin's properties.

  1. At your command prompt, run the following command to install the azure-iot-service-client package:

    pip install azure-iothub-service-client
    
  2. Using a text editor, create a new scheduleJobService.py file in your working directory.

  3. Add the following import statements and variables at the start of the scheduleJobService.py file:

    import sys
    import time
    import threading
    import uuid
    
    import iothub_service_client
    from iothub_service_client import IoTHubRegistryManager, IoTHubRegistryManagerAuthMethod
    from iothub_service_client import IoTHubDeviceTwin, IoTHubDeviceMethod, IoTHubError
    
    CONNECTION_STRING = "{IoTHubConnectionString}"
    DEVICE_ID = "{deviceId}"
    
    METHOD_NAME = "lockDoor"
    METHOD_PAYLOAD = "{\"lockTime\":\"10m\"}"
    UPDATE_JSON = "{\"properties\":{\"desired\":{\"building\":43,\"floor\":3}}}"
    TIMEOUT = 60
    WAIT_COUNT = 5
  4. Add the following function that is used to query for devices:

    def query_condition(device_id):
        iothub_registry_manager = IoTHubRegistryManager(CONNECTION_STRING)
    
        number_of_devices = 10
        dev_list = iothub_registry_manager.get_device_list(number_of_devices)
    
        for device in range(0, number_of_devices):
            if dev_list[device].deviceId == device_id:
                return 1
    
        print ( "Device not found" )
        return 0
  5. Add the following methods to run the jobs that call the direct method and device twin:

    def device_method_job(job_id, device_id, wait_time, execution_time):
        print ( "" )
        print ( "Scheduling job: " + str(job_id) )
        time.sleep(wait_time)
    
        if query_condition(device_id):
            iothub_device_method = IoTHubDeviceMethod(CONNECTION_STRING)
    
            response = iothub_device_method.invoke(device_id, METHOD_NAME, METHOD_PAYLOAD, TIMEOUT)
    	
            print ( "" )
            print ( "Direct method " + METHOD_NAME + " called." )
    	
    def device_twin_job(job_id, device_id, wait_time, execution_time):
        print ( "" )
        print ( "Scheduling job " + str(job_id) )
        time.sleep(wait_time)
    
        if query_condition(device_id):
            iothub_twin_method = IoTHubDeviceTwin(CONNECTION_STRING)
    
            twin_info = iothub_twin_method.update_twin(DEVICE_ID, UPDATE_JSON)
    	
            print ( "" )
            print ( "Device twin updated." )
  6. Add the following code to schedule the jobs and update job status. Also include the main routine:

    def iothub_jobs_sample_run():
        try:
            method_thr_id = uuid.uuid4()
            method_thr = threading.Thread(target=device_method_job, args=(method_thr_id, DEVICE_ID, 20, TIMEOUT), kwargs={})
            method_thr.start()
    	
            print ( "" )
            print ( "Direct method called with Job Id: " + str(method_thr_id) )
    	
            twin_thr_id = uuid.uuid4()
            twin_thr = threading.Thread(target=device_twin_job, args=(twin_thr_id, DEVICE_ID, 10, TIMEOUT), kwargs={})
            twin_thr.start()
    	
            print ( "" )
            print ( "Device twin called with Job Id: " + str(twin_thr_id) )
    	
            while True:
                print ( "" )
    		
                if method_thr.is_alive():
                    print ( "...job " + str(method_thr_id) + " still running." )
                else:
                    print ( "...job " + str(method_thr_id) + " complete." )
    		
                if twin_thr.is_alive():
                    print ( "...job " + str(twin_thr_id) + " still running." )
                else:
                    print ( "...job " + str(twin_thr_id) + " complete." )
    			
                print ( "Job status posted, press Ctrl-C to exit" )
    
                status_counter = 0
                while status_counter <= WAIT_COUNT:
                    time.sleep(1)
                    status_counter += 1
    
        except IoTHubError as iothub_error:
            print ( "" )
            print ( "Unexpected error {0}" % iothub_error )
            return
        except KeyboardInterrupt:
            print ( "" )
            print ( "IoTHubService sample stopped" )
    
    if __name__ == '__main__':
        print ( "Starting the IoT Hub jobs Python sample..." )
        print ( "    Connection string = {0}".format(CONNECTION_STRING) )
        print ( "    Device ID         = {0}".format(DEVICE_ID) )
    
        iothub_jobs_sample_run()
  7. Save and close the scheduleJobService.py file.

Run the applications

You are now ready to run the applications.

  1. At the command prompt in your working directory, run the following command to begin listening for the reboot direct method:

    python simDevice.py
    
  2. At another command prompt in your working directory, run the following command to trigger the jobs to lock the door and update the twin:

    python scheduleJobService.py
    
  3. You see the device responses to the direct method and device twins update in the console.

    device output

    service output

Next steps

In this tutorial, you used a job to schedule a direct method to a device and the update of the device twin's properties.

To continue getting started with IoT Hub and device management patterns such as remote over the air firmware update, see:

Tutorial: How to do a firmware update

To continue getting started with IoT Hub, see Getting started with Azure IoT Edge.