AWS IoT 使用适用于 Python 的 SDK (Boto3) 的示例 - AWS SDK 代码示例

文档 AWS SDK 示例 GitHub 存储库中还有更多 S AWS DK 示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AWS IoT 使用适用于 Python 的 SDK (Boto3) 的示例

以下代码示例向您展示了如何使用with来执行操作和实现常见场景 AWS IoT。 适用于 Python (Boto3) 的 AWS SDK

基本功能是向您展示如何在服务中执行基本操作的代码示例。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

每个示例都包含一个指向完整源代码的链接,您可以从中找到有关如何在上下文中设置和运行代码的说明。

开始使用

以下代码示例展示了如何开始使用 AWS IoT。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

def hello_iot(): """ Use the AWS SDK for Python (Boto3) to create an AWS IoT client and list up to 10 things in your AWS IoT account. This example uses the default settings specified in your shared credentials and config files. """ try: iot_client = boto3.client("iot") response = iot_client.list_things(maxResults=10) things = response.get("things", []) print("Hello, AWS IoT! Here are your things:") if things: for i, thing in enumerate(things, 1): print(f"{i}. {thing['thingName']}") else: print("No things found in your AWS IoT account.") except ClientError as e: if e.response["Error"]["Code"] == "UnauthorizedException": print("You don't have permission to access AWS IoT.") else: print(f"Couldn't access AWS IoT. Error: {e}") except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") except Exception as e: print(f"An unexpected error occurred: {e}")
  • 有关 API 详细信息,请参阅《AWS SDK for Python(Boto3)API 参考》中的 listThings

基本功能

以下代码示例展示了如何:

  • 创建 AWS IoT 事物。

  • 生成设备证书。

  • 使用属性更新 AWS IoT 事物。

  • 返回一个唯一的端点。

  • 列出您的 AWS IoT 证书。

  • 更新 AWS IoT 阴影。

  • 写出状态信息。

  • 创建规则。

  • 列出您的规则。

  • 使用事物名称搜索事物。

  • 删除 AWS IoT 事物。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

创建 IoT 包装器类来管理操作。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def create_thing(self, thing_name): """ Creates an AWS IoT thing. :param thing_name: The name of the thing to create. :return: The name and ARN of the created thing. """ try: response = self.iot_client.create_thing(thingName=thing_name) logger.info("Created thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceAlreadyExistsException": logger.info("Thing %s already exists. Skipping creation.", thing_name) return None logger.error( "Couldn't create thing %s. Here's why: %s: %s", thing_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def list_things(self): """ Lists AWS IoT things. :return: The list of things. """ try: things = [] paginator = self.iot_client.get_paginator("list_things") for page in paginator.paginate(): things.extend(page["things"]) logger.info("Retrieved %s things.", len(things)) return things except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list things. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def create_keys_and_certificate(self): """ Creates keys and a certificate for an AWS IoT thing. :return: The certificate ID, ARN, and PEM. """ try: response = self.iot_client.create_keys_and_certificate(setAsActive=True) logger.info("Created certificate %s.", response["certificateId"]) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't create keys and certificate. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def attach_thing_principal(self, thing_name, principal): """ Attaches a certificate to an AWS IoT thing. :param thing_name: The name of the thing. :param principal: The ARN of the certificate. """ try: self.iot_client.attach_thing_principal( thingName=thing_name, principal=principal ) logger.info("Attached principal %s to thing %s.", principal, thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot attach principal. Resource not found.") return logger.error( "Couldn't attach principal to thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def describe_endpoint(self, endpoint_type="iot:Data-ATS"): """ Gets the AWS IoT endpoint. :param endpoint_type: The endpoint type. :return: The endpoint. """ try: response = self.iot_client.describe_endpoint(endpointType=endpoint_type) logger.info("Retrieved endpoint %s.", response["endpointAddress"]) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't describe endpoint. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["endpointAddress"] def list_certificates(self): """ Lists AWS IoT certificates. :return: The list of certificates. """ try: certificates = [] paginator = self.iot_client.get_paginator("list_certificates") for page in paginator.paginate(): certificates.extend(page["certificates"]) logger.info("Retrieved %s certificates.", len(certificates)) return certificates except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list certificates. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def detach_thing_principal(self, thing_name, principal): """ Detaches a certificate from an AWS IoT thing. :param thing_name: The name of the thing. :param principal: The ARN of the certificate. """ try: self.iot_client.detach_thing_principal( thingName=thing_name, principal=principal ) logger.info("Detached principal %s from thing %s.", principal, thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot detach principal. Resource not found.") return logger.error( "Couldn't detach principal from thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_certificate(self, certificate_id): """ Deletes an AWS IoT certificate. :param certificate_id: The ID of the certificate to delete. """ try: self.iot_client.update_certificate( certificateId=certificate_id, newStatus="INACTIVE" ) self.iot_client.delete_certificate(certificateId=certificate_id) logger.info("Deleted certificate %s.", certificate_id) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot delete certificate. Resource not found.") return logger.error( "Couldn't delete certificate. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def create_topic_rule(self, rule_name, topic, sns_action_arn, role_arn): """ Creates an AWS IoT topic rule. :param rule_name: The name of the rule. :param topic: The MQTT topic to subscribe to. :param sns_action_arn: The ARN of the SNS topic to publish to. :param role_arn: The ARN of the IAM role. """ try: self.iot_client.create_topic_rule( ruleName=rule_name, topicRulePayload={ "sql": f"SELECT * FROM '{topic}'", "actions": [ {"sns": {"targetArn": sns_action_arn, "roleArn": role_arn}} ], }, ) logger.info("Created topic rule %s.", rule_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceAlreadyExistsException": logger.info("Topic rule %s already exists. Skipping creation.", rule_name) return logger.error( "Couldn't create topic rule. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def list_topic_rules(self): """ Lists AWS IoT topic rules. :return: The list of topic rules. """ try: rules = [] paginator = self.iot_client.get_paginator("list_topic_rules") for page in paginator.paginate(): rules.extend(page["rules"]) logger.info("Retrieved %s topic rules.", len(rules)) return rules except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list topic rules. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def search_index(self, query): """ Searches the AWS IoT index. :param query: The search query. :return: The list of things found. """ try: response = self.iot_client.search_index(queryString=query) logger.info("Found %s things.", len(response.get("things", []))) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't search index. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response.get("things", []) def update_indexing_configuration(self): """ Updates the AWS IoT indexing configuration to enable thing indexing. """ try: self.iot_client.update_indexing_configuration( thingIndexingConfiguration={"thingIndexingMode": "REGISTRY"} ) logger.info("Updated indexing configuration.") except ClientError as err: logger.error( "Couldn't update indexing configuration. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_thing(self, thing_name): """ Deletes an AWS IoT thing. :param thing_name: The name of the thing to delete. """ try: self.iot_client.delete_thing(thingName=thing_name) logger.info("Deleted thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot delete thing. Resource not found.") return logger.error( "Couldn't delete thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_topic_rule(self, rule_name): """ Deletes an AWS IoT topic rule. :param rule_name: The name of the rule to delete. """ try: self.iot_client.delete_topic_rule(ruleName=rule_name) logger.info("Deleted topic rule %s.", rule_name) except ClientError as err: logger.error( "Couldn't delete topic rule. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def update_thing_shadow(self, thing_name, shadow_state): """ Updates the shadow for an AWS IoT thing. :param thing_name: The name of the thing. :param shadow_state: The shadow state as a dictionary. """ import json try: self.iot_data_client.update_thing_shadow( thingName=thing_name, payload=json.dumps(shadow_state) ) logger.info("Updated shadow for thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot update thing shadow. Resource not found.") return logger.error( "Couldn't update thing shadow. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_thing_shadow(self, thing_name): """ Gets the shadow for an AWS IoT thing. :param thing_name: The name of the thing. :return: The shadow state as a dictionary. """ import json try: response = self.iot_data_client.get_thing_shadow(thingName=thing_name) shadow = json.loads(response["payload"].read()) logger.info("Retrieved shadow for thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot get thing shadow. Resource not found.") return None logger.error( "Couldn't get thing shadow. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return shadow

运行演示 IoT 基础知识的交互式场景。

class IoTScenario: """Runs an interactive scenario that shows how to use AWS IoT.""" is_interactive = True def __init__(self, iot_wrapper, iot_data_client, cfn_client, stack_name="IoTBasicsStack", template_path=None): """ :param iot_wrapper: An instance of the IoTWrapper class. :param iot_data_client: A Boto3 IoT Data Plane client. :param cfn_client: A Boto3 CloudFormation client. :param stack_name: Name for the CloudFormation stack. :param template_path: Path to the CloudFormation template file. """ self.iot_wrapper = iot_wrapper self.iot_data_client = iot_data_client self.cfn_client = cfn_client self.thing_name = None self.certificate_arn = None self.certificate_id = None self.rule_name = None self.stack_name = stack_name self.template_path = template_path or "../../../scenarios/basics/iot/iot_usecase/resources/cfn_template.yaml" def _deploy_stack(self): """Deploy CloudFormation stack and return outputs.""" with open(self.template_path, "r") as f: template_body = f.read() try: self.cfn_client.create_stack( StackName=self.stack_name, TemplateBody=template_body, Capabilities=["CAPABILITY_NAMED_IAM"] ) waiter = self.cfn_client.get_waiter("stack_create_complete") waiter.wait(StackName=self.stack_name) response = self.cfn_client.describe_stacks(StackName=self.stack_name) outputs = {output["OutputKey"]: output["OutputValue"] for output in response["Stacks"][0]["Outputs"]} return outputs["SNSTopicArn"], outputs["RoleArn"] except ClientError as err: if err.response["Error"]["Code"] == "AlreadyExistsException": response = self.cfn_client.describe_stacks(StackName=self.stack_name) outputs = {output["OutputKey"]: output["OutputValue"] for output in response["Stacks"][0]["Outputs"]} return outputs["SNSTopicArn"], outputs["RoleArn"] raise def _cleanup_stack(self): """Delete CloudFormation stack.""" try: self.cfn_client.delete_stack(StackName=self.stack_name) waiter = self.cfn_client.get_waiter("stack_delete_complete") waiter.wait(StackName=self.stack_name) print("CloudFormation stack deleted successfully.") except ClientError as err: logger.error(f"Failed to delete stack: {err}") def run_scenario(self, thing_name, rule_name): """ Runs the IoT basics scenario. :param thing_name: The name of the thing to create. :param rule_name: The name of the topic rule to create. """ print("-" * 88) print("Welcome to the AWS IoT basics scenario!") print("-" * 88) print( "This scenario demonstrates how to interact with AWS IoT using the AWS SDK for Python (Boto3).\n" "AWS IoT provides secure, bi-directional communication between Internet-connected devices\n" "and the AWS cloud. You can manage device connections, process device data, and build IoT applications.\n" ) self.thing_name = thing_name self.rule_name = rule_name try: print("\nDeploying CloudFormation stack...") sns_topic_arn, role_arn = self._deploy_stack() print(f"Stack deployed. SNS Topic: {sns_topic_arn}") input("\nNext, we'll create an AWS IoT thing. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("1. Create an AWS IoT thing") print("-" * 88) response = self.iot_wrapper.create_thing(thing_name) print(f"Created thing: {response['thingName']}") print(f"Thing ARN: {response['thingArn']}") input("\nNext, we'll list things. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("2. List things") print("-" * 88) things = self.iot_wrapper.list_things() print(f"Found {len(things)} thing(s) in your account") for thing in things[:5]: # Show first 5 print(f" Thing name: {thing['thingName']}") input("\nNext, we'll generate a device certificate. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("3. Generate a device certificate") print("-" * 88) cert_response = self.iot_wrapper.create_keys_and_certificate() self.certificate_arn = cert_response["certificateArn"] self.certificate_id = cert_response["certificateId"] print(f"Created certificate: {self.certificate_id}") input("\nNext, we'll attach the certificate to the thing. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("4. Attach the certificate to the thing") print("-" * 88) self.iot_wrapper.attach_thing_principal(thing_name, self.certificate_arn) print(f"Attached certificate to thing: {thing_name}") input("\nNext, we'll update the thing shadow. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("5. Update the thing shadow") print("-" * 88) shadow_state = {"state": {"reported": {"temperature": 25, "humidity": 50}}} self.iot_wrapper.update_thing_shadow(thing_name, shadow_state) print(f"Updated shadow for thing: {thing_name}") input("\nNext, we'll get the thing shadow. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("6. Get the thing shadow") print("-" * 88) shadow = self.iot_wrapper.get_thing_shadow(thing_name) print(f"Shadow state: {json.dumps(shadow['state'], indent=2)}") input("\nNext, we'll get the AWS IoT endpoint. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("7. Get the AWS IoT endpoint") print("-" * 88) endpoint = self.iot_wrapper.describe_endpoint() print(f"IoT endpoint: {endpoint}") input("\nNext, we'll list certificates. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("8. List certificates") print("-" * 88) certificates = self.iot_wrapper.list_certificates() print(f"Found {len(certificates)} certificate(s)") for cert in certificates: print(f" Certificate ID: {cert['certificateId']}") print(f" Certificate ARN: {cert['certificateArn']}") print(f" Status: {cert['status']}") input("\nNext, we'll create a topic rule. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("9. Create a topic rule") print("-" * 88) self.iot_wrapper.create_topic_rule( rule_name, f"device/{thing_name}/data", sns_topic_arn, role_arn ) print(f"Created topic rule: {rule_name}") input("\nNext, we'll list topic rules. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("10. List topic rules") print("-" * 88) rules = self.iot_wrapper.list_topic_rules() print(f"Found {len(rules)} topic rule(s)") for rule in rules: print(f" Rule name: {rule['ruleName']}") print(f" Rule ARN: {rule['ruleArn']}") input("\nNext, we'll configure thing indexing. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("11. Configure thing indexing") print("-" * 88) self.iot_wrapper.update_indexing_configuration() print("Enabled thing indexing") print("Waiting for indexing to be ready...") time.sleep(10) input("\nNext, we'll search for things. Press Enter to continue...") if self.is_interactive else None print("\n" + "-" * 88) print("12. Search for things") print("-" * 88) try: things = self.iot_wrapper.search_index(f"thingName:{thing_name}") if things: print(f"Found {len(things)} thing(s) matching the query") for thing in things: print(f" Thing name: {thing.get('thingName', 'N/A')}") print(f" Thing ID: {thing.get('thingId', 'N/A')}") else: print("No things found. Indexing may take a few minutes.") except ClientError as err: if err.response["Error"]["Code"] in [ "IndexNotReadyException", "InvalidRequestException", ]: print("Search index not ready yet. This is expected.") else: raise except ClientError as err: logger.error( "Scenario failed: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise finally: self._cleanup() def _cleanup(self): """Cleans up resources created during the scenario.""" if not self.thing_name: return print("\n" + "-" * 88) print("Cleanup") print("-" * 88) if q.ask("Do you want to delete the resources? (y/n) ", q.is_yesno): try: if self.certificate_arn: print(f"Detaching certificate from thing: {self.thing_name}") self.iot_wrapper.detach_thing_principal( self.thing_name, self.certificate_arn ) if self.certificate_id: print(f"Deleting certificate: {self.certificate_id}") self.iot_wrapper.delete_certificate(self.certificate_id) if self.thing_name: print(f"Deleting thing: {self.thing_name}") self.iot_wrapper.delete_thing(self.thing_name) if self.rule_name: print(f"Deleting topic rule: {self.rule_name}") self.iot_wrapper.delete_topic_rule(self.rule_name) self._cleanup_stack() print("Resources deleted successfully.") except ClientError as err: logger.error( "Cleanup failed: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) else: print("Resources will remain in your account.") print("\n" + "-" * 88) print("Thanks for using AWS IoT!") print("-" * 88)

操作

以下代码示例演示了如何使用 AttachThingPrincipal

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def attach_thing_principal(self, thing_name, principal): """ Attaches a certificate to an AWS IoT thing. :param thing_name: The name of the thing. :param principal: The ARN of the certificate. """ try: self.iot_client.attach_thing_principal( thingName=thing_name, principal=principal ) logger.info("Attached principal %s to thing %s.", principal, thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot attach principal. Resource not found.") return logger.error( "Couldn't attach principal to thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关 API 的详细信息,请参阅适用AttachThingPrincipalPython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 CreateKeysAndCertificate

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def create_keys_and_certificate(self): """ Creates keys and a certificate for an AWS IoT thing. :return: The certificate ID, ARN, and PEM. """ try: response = self.iot_client.create_keys_and_certificate(setAsActive=True) logger.info("Created certificate %s.", response["certificateId"]) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't create keys and certificate. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response

以下代码示例演示了如何使用 CreateThing

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def create_thing(self, thing_name): """ Creates an AWS IoT thing. :param thing_name: The name of the thing to create. :return: The name and ARN of the created thing. """ try: response = self.iot_client.create_thing(thingName=thing_name) logger.info("Created thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceAlreadyExistsException": logger.info("Thing %s already exists. Skipping creation.", thing_name) return None logger.error( "Couldn't create thing %s. Here's why: %s: %s", thing_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关 API 的详细信息,请参阅适用CreateThingPython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 CreateTopicRule

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def create_topic_rule(self, rule_name, topic, sns_action_arn, role_arn): """ Creates an AWS IoT topic rule. :param rule_name: The name of the rule. :param topic: The MQTT topic to subscribe to. :param sns_action_arn: The ARN of the SNS topic to publish to. :param role_arn: The ARN of the IAM role. """ try: self.iot_client.create_topic_rule( ruleName=rule_name, topicRulePayload={ "sql": f"SELECT * FROM '{topic}'", "actions": [ {"sns": {"targetArn": sns_action_arn, "roleArn": role_arn}} ], }, ) logger.info("Created topic rule %s.", rule_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceAlreadyExistsException": logger.info("Topic rule %s already exists. Skipping creation.", rule_name) return logger.error( "Couldn't create topic rule. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关 API 的详细信息,请参阅适用CreateTopicRulePython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 DeleteCertificate

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def delete_certificate(self, certificate_id): """ Deletes an AWS IoT certificate. :param certificate_id: The ID of the certificate to delete. """ try: self.iot_client.update_certificate( certificateId=certificate_id, newStatus="INACTIVE" ) self.iot_client.delete_certificate(certificateId=certificate_id) logger.info("Deleted certificate %s.", certificate_id) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot delete certificate. Resource not found.") return logger.error( "Couldn't delete certificate. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关 API 的详细信息,请参阅适用DeleteCertificatePython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 DeleteThing

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def delete_thing(self, thing_name): """ Deletes an AWS IoT thing. :param thing_name: The name of the thing to delete. """ try: self.iot_client.delete_thing(thingName=thing_name) logger.info("Deleted thing %s.", thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot delete thing. Resource not found.") return logger.error( "Couldn't delete thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关 API 的详细信息,请参阅适用DeleteThingPython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 DeleteTopicRule

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def delete_topic_rule(self, rule_name): """ Deletes an AWS IoT topic rule. :param rule_name: The name of the rule to delete. """ try: self.iot_client.delete_topic_rule(ruleName=rule_name) logger.info("Deleted topic rule %s.", rule_name) except ClientError as err: logger.error( "Couldn't delete topic rule. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关 API 的详细信息,请参阅适用DeleteTopicRulePython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 DescribeEndpoint

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def describe_endpoint(self, endpoint_type="iot:Data-ATS"): """ Gets the AWS IoT endpoint. :param endpoint_type: The endpoint type. :return: The endpoint. """ try: response = self.iot_client.describe_endpoint(endpointType=endpoint_type) logger.info("Retrieved endpoint %s.", response["endpointAddress"]) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't describe endpoint. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["endpointAddress"]
  • 有关 API 的详细信息,请参阅适用DescribeEndpointPython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 DetachThingPrincipal

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def detach_thing_principal(self, thing_name, principal): """ Detaches a certificate from an AWS IoT thing. :param thing_name: The name of the thing. :param principal: The ARN of the certificate. """ try: self.iot_client.detach_thing_principal( thingName=thing_name, principal=principal ) logger.info("Detached principal %s from thing %s.", principal, thing_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error("Cannot detach principal. Resource not found.") return logger.error( "Couldn't detach principal from thing. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关 API 的详细信息,请参阅适用DetachThingPrincipalPython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 ListCertificates

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def list_certificates(self): """ Lists AWS IoT certificates. :return: The list of certificates. """ try: certificates = [] paginator = self.iot_client.get_paginator("list_certificates") for page in paginator.paginate(): certificates.extend(page["certificates"]) logger.info("Retrieved %s certificates.", len(certificates)) return certificates except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list certificates. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关 API 的详细信息,请参阅适用ListCertificatesPython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 ListThings

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def list_things(self): """ Lists AWS IoT things. :return: The list of things. """ try: things = [] paginator = self.iot_client.get_paginator("list_things") for page in paginator.paginate(): things.extend(page["things"]) logger.info("Retrieved %s things.", len(things)) return things except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't list things. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关 API 的详细信息,请参阅适用ListThingsPython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 SearchIndex

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def search_index(self, query): """ Searches the AWS IoT index. :param query: The search query. :return: The list of things found. """ try: response = self.iot_client.search_index(queryString=query) logger.info("Found %s things.", len(response.get("things", []))) except ClientError as err: if err.response["Error"]["Code"] == "ThrottlingException": logger.error("Request throttled. Please try again later.") else: logger.error( "Couldn't search index. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response.get("things", [])
  • 有关 API 的详细信息,请参阅适用SearchIndexPython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 UpdateIndexingConfiguration

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

class IoTWrapper: """Encapsulates AWS IoT actions.""" def __init__(self, iot_client, iot_data_client=None): """ :param iot_client: A Boto3 AWS IoT client. :param iot_data_client: A Boto3 AWS IoT Data Plane client. """ self.iot_client = iot_client self.iot_data_client = iot_data_client @classmethod def from_client(cls): iot_client = boto3.client("iot") iot_data_client = boto3.client("iot-data") return cls(iot_client, iot_data_client) def update_indexing_configuration(self): """ Updates the AWS IoT indexing configuration to enable thing indexing. """ try: self.iot_client.update_indexing_configuration( thingIndexingConfiguration={"thingIndexingMode": "REGISTRY"} ) logger.info("Updated indexing configuration.") except ClientError as err: logger.error( "Couldn't update indexing configuration. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise