

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 为 OIDC 身份验证设置一个 AWS Lambda 授权机构
<a name="dicomweb-oidc-requirements"></a>

本指南假设您已经将所选的身份提供商 (IdP) 配置为提供与 HealthImaging OIDC 身份验证功能要求兼容的访问令牌。

## 1. 配置用于 DICOMWeb API 访问的 IAM 角色
<a name="dicomweb-oidc-iam-roles"></a>

在配置 Lambda 授权方之前，请创建 HealthImaging 要在处理 DICOMWeb API 请求时代入的 IAM 角色。授权方 Lambda 函数在成功进行令牌验证后返回其中一个角色 ARN， HealthImaging 允许使用适当的权限执行请求。

1. 创建定义所需的 DICOMWeb API 权限的 IAM 策略。有关可用权限，请参阅 HealthImaging 文档的 DICOMweb “[使用](https://docs.aws.amazon.com/healthimaging/latest/devguide/using-dicomweb.html)” 部分。

1. 创建可执行以下操作的 IAM 角色：
   + 附上这些政策
   + 包括允许 AWS HealthImaging 服务委托人 (`medical-imaging.amazonaws.com`) 担任这些角色的信任关系。

以下是允许关联角色访问 HealthImaging DICOMWeb 只读 API 的策略示例：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "MedicalImagingDicomWebOperations",
            "Effect": "Allow",
            "Action": [
                "medical-imaging:SearchDICOMInstances",
                "medical-imaging:GetImageSetMetadata",
                "medical-imaging:GetDICOMSeriesMetadata",
                "medical-imaging:SearchDICOMStudies",
                "medical-imaging:GetDICOMBulkdata",
                "medical-imaging:SearchDICOMSeries",
                "medical-imaging:GetDICOMInstanceMetadata",
                "medical-imaging:GetDICOMInstance",
                "medical-imaging:GetDICOMInstanceFrames"
            ],
            "Resource": "arn:aws:medical-imaging:us-east-1:123456789012:datastore/datastore-123"
        }
    ]
}
```

------

以下是应与角色关联的信任关系策略的示例：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "OIDCRoleFederation",
            "Effect": "Allow",
            "Principal": {
                "Service": "medical-imaging.amazonaws.com"
        },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

您将在下一步中创建的 Lambda 授权者可以评估令牌声明并返回相应角色的 ARN。然后，AWS HealthImaging 将模拟此角色以相应的权限执行 DICOMWeb API 请求。

例如：
+ 具有 “管理员” 声明的令牌可能会返回具有完全访问权限的角色的 ARN
+ 具有 “读者” 声明的令牌可能会为具有只读访问权限的角色返回 ARN
+ 具有 “department\$1A” 声明的令牌可能会返回特定于该部门访问级别的角色的 ARN

此机制允许您通过 IAM 角色将 IdP 的授权模型映射到特定的 AWS HealthImaging 权限。

## 2. 创建和配置 Lambda 授权器函数
<a name="dicomweb-oidc-configure-lambda"></a>

创建一个 Lambda 函数，该函数将验证 JWT 令牌，并根据令牌声明评估返回相应的 IAM 角色 ARN。此函数由运行状况映像服务调用，并传递了一个包含 HealthImaging 数据存储 ID、 DICOMWeb 操作和在 HTTP 请求中找到的访问令牌的事件：

```
{
  "datastoreId": "{datastore id}",
  "operation": "{Healthimaging API name e.g. GetDICOMInstance}",
  "bearerToken": "{access token}"
}
```

Lambda 授权方函数必须返回具有以下结构的 JSON 响应：

```
{
  "isTokenValid": {true or false},
  "roleArn": "{role arn or empty string meaning to deny the request explicitly}"
}
```

您可以参考实现示例以了解更多信息。

**注意**  
由于只有在 lambda 授权者验证访问令牌后才会回复 DICOMWeb 请求，因此必须尽可能快地执行此函数，以提供最佳 DICOMWeb API 响应时间。

要授权 HealthImaging 服务调用 lambda 授权器函数，它必须具有允许 HealthImaging 服务调用该函数的资源策略。此资源策略可以在 lambda 配置选项卡的权限菜单中创建，也可以使用 AWS CLI：

```
aws lambda add-permission \
    --function-name YourAuthorizerFunctionName \
    --statement-id HealthImagingInvoke \
    --action lambda:InvokeFunction \
    --principal medical-imaging.amazonaws.com
```

此资源策略允许 HealthImaging 服务在对 API 请求进行身份验证 DICOMWeb 时调用您的 Lambda 授权方。

**注意**  
稍后可以更新 lambda 资源策略，条件与ArnLike特定数据存储的 ARN 相匹配。 HealthImaging 

以下是 lambda 资源策略的示例：

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Id": "default",
  "Statement": [
    {
      "Sid": "LambaAuthorizer-HealthImagingInvokePermission",
      "Effect": "Allow",
      "Principal": {
        "Service": "medical-imaging.amazonaws.com"
      },
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:us-east-1:123456789012::function:{LambdaAuthorizerFunctionName}",
      "Condition": {
        "ArnLike": {
          "AWS:SourceArn": "arn:aws:medical-imaging:us-east-1:123456789012:datastore/datastore-123"
        }
      }
    }
  ]
}
```

------

## 3. 使用 OIDC 身份验证创建新的数据存储
<a name="dicomweb-oidc-datastore"></a>

要启用 OIDC 身份验证，必须使用参数为 “ AWS CLI ” 的新数据存储库。lambda-authorizer-arn如果不联系 Support，则无法在现有数据存储上启用 OIDC 身份验证。 AWS 

以下是如何创建启用 OIDC 身份验证的新数据存储的示例：

```
aws medical-imaging create-datastore \
    --datastore-name YourDatastoreName \
    --lambda-authorizer-arn YourAuthorizerFunctionArn
```

您可以使用 AWS CLI get-datastore 命令检查特定数据存储是否启用了 OIDC 身份验证功能，并验证属性 “” 是否存在：lambdaAuthorizerArn

```
aws medical-imaging get-datastore --datastore-id YourDatastoreId
```

```
{
    "datastoreProperties": {
        "datastoreId": YourdatastoreId,
        "datastoreName": YourDatastoreName,
        "datastoreStatus": "ACTIVE",
        "lambdaAuthorizerArn": YourAuthorizerFunctionArn,
        "datastoreArn": YourDatastoreArn,
        "createdAt": "2025-09-30T14:16:04.015000-05:00",
        "updatedAt": "2025-09-30T14:16:04.015000-05:00"
    }
}
```

**注意**  
 AWS CLI 数据存储库创建命令的执行角色必须具有相应的权限才能调用 Lambda 授权器函数。这可以缓解权限升级攻击，在这种攻击中，恶意用户可以通过数据存储授权器配置执行未经授权的 Lambda 函数。

## 异常代码
<a name="dicomweb-oidc-exceptions"></a>

如果身份验证失败， HealthImaging 则返回以下 HTTP 错误响应代码和正文消息：


| 条件 | AHI 回应 | 
| --- | --- | 
| Lambda 授权器不存在或无效 | 424 授权器配置错误 | 
| 由于执行失败，授权方终止 | 424 授权器失败 | 
| 任何其他未映射的授权方错误 | 424 授权器失败 | 
| 授权方返回的响应无效/格式不正确 | 424 授权器配置错误 | 
| Authorizer 跑了 1 秒以上 | 408 授权器超时 | 
| 令牌已过期或因其他原因无效 | 403 令牌无效或已过期 | 
| 由于授权方配置错误，AHI 无法联合返回的 IAM 角色 | 424 授权器配置错误 | 
| 授权方返回了一个空角色 | 403 访问被拒绝 | 
| 返回的角色不可调用（假设角色/信任错误配置） | 424 授权器配置错误 | 
| 请求速率超过 DICOMweb 网关限制 | 429 请求太多 | 
| 跨区域数据存储、返回角色或授权者 Account/Cross  | 424 授权者跨区域访问 Account/Cross  | 

## 实现示例
<a name="dicomweb-oidc-implementation"></a>

此 Python 示例演示了一个 lambda 授权器函数，该函数验证 HealthImaging 来自事件的 C AWS ognito 访问令牌并返回具有适当权限的 IAM 角色 ARN。 DICOMWeb 

Lambda 授权方实现了两种缓存机制，以减少外部调用和响应延迟。JWKS（JSON Web Key Set）每小时提取一次，并存储在函数的临时文件夹中，允许后续函数调用在本地读取它，而不是从公共网络获取。您还会注意到 token\$1cache 字典对象是在此 Lambda 函数的全局上下文中实例化的。所有重用相同预热 Lambda 上下文的调用都共享全局变量。因此，成功验证的令牌可以存储在此字典中，并在下次执行相同的 Lambda 函数时快速查找。缓存方法代表了一种通用的方法，可以适合大多数身份提供商发放的访问令牌。[有关 AWS Cognito 特定的缓存选项，请参阅 Cognito AWS 文档的[管理用户池](https://docs.aws.amazon.com/cognito/latest/developerguide/managing-users.html)部分和[缓存部分](https://docs.aws.amazon.com/cognito/latest/developerguide/amazon-cognito-user-pools-using-tokens-caching-tokens.html)。](https://docs.aws.amazon.com/cognito/latest/developerguide/what-is-amazon-cognito.html)

```
import json
import os
import time
import logging
from jose import jwk, jwt
from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError
import requests
import tempfile

# Configure logging
logger = logging.getLogger()
log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper()
logger.setLevel(getattr(logging, log_level, logging.WARNING))

# Global token cache with TTL
token_cache = {}

# JWKS cache file path
JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json')
JWKS_CACHE_TTL = 3600  # 1 hour

# Load environment variables once
USER_POOL_ID = os.environ['USER_POOL_ID']
CLIENT_ID = os.environ['CLIENT_ID']
ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '')

def cleanup_expired_tokens():
    """Remove expired tokens from cache"""
    now = int(time.time())
    expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']]
    for token in expired_keys:
        del token_cache[token]

def get_cached_jwks():
    """Get JWKS from cache file if valid, otherwise return None """
    try:
        if os.path.exists(JWKS_CACHE_FILE):
            # Check if cache file is still valid
            cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE)
            if cache_age < JWKS_CACHE_TTL:
                with open(JWKS_CACHE_FILE, 'r') as f:
                    jwks = json.load(f)
                    logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)')
                    return jwks
            else:
                logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)')
    except Exception as e:
        logger.debug(f'Error reading JWKS cache: {e}')
    
    return None

def cache_jwks(jwks):
    """Cache JWKS to file"""
    try:
        with open(JWKS_CACHE_FILE, 'w') as f:
            json.dump(jwks, f)
        logger.debug('JWKS cached successfully')
    except Exception as e:
        logger.debug(f'Error caching JWKS: {e}')

def fetch_jwks(jwks_url):
    """Fetch JWKS from URL and cache it"""
    logger.debug('Fetching JWKS from URL')
    jwks = requests.get(jwks_url, timeout=10).json()
    # Convert to dict for faster lookups
    jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']}
    cache_jwks(jwks)
    return jwks

def is_token_cached(token):
    if token not in token_cache:
        return None
    
    cached = token_cache[token]
    now = int(time.time())
    
    if now > cached['cache_expiry']:
        del token_cache[token]
        return None
    
    return cached

def cache_token(token, payload):
    now = int(time.time())
    token_exp = payload.get('exp')
    cache_expiry = min(now + 60, token_exp)  # 1 minute or token expiry, whichever is sooner
    
    token_cache[token] = {
        'payload': payload,
        'cache_expiry': cache_expiry,
        'role_arn': ROLE_ARN
    }

def handler(event, context):
    cleanup_expired_tokens() # start be removing expired tokens from the cache
    try:
        # Extract token from bearerToken or authorizationToken field
        token = event.get('bearerToken')
        if not token:
            raise Exception('No token provided')
        
        # Check cache first
        cached = is_token_cached(token)
        if cached:
            logger.debug('Token found in cache, skipping verification')
            return {
                'isTokenValid': True,
                'roleArn': cached['role_arn']
            }
        
        # Get Cognito configuration
        region = context.invoked_function_arn.split(':')[3]
        
        # Get JWKS (cached or fresh)
        jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json'
        jwks = get_cached_jwks()
        if not jwks:
            jwks = fetch_jwks(jwks_url)
        
        # Decode token header to get kid
        headers = jwt.get_unverified_headers(token)
        kid = headers['kid']
        
        # Find the correct key
        key = None
        for jwk_key in jwks['keys']:
            if jwk_key['kid'] == kid:
                key = jwk_key
                break
        
        if not key:
            # Key not found - try refreshing JWKS in case of key rotation
            logger.debug('Key not found in cached JWKS, fetching fresh JWKS')
            jwks = fetch_jwks(jwks_url)
            for jwk_key in jwks['keys']:
                if jwk_key['kid'] == kid:
                    key = jwk_key
                    break
        
        if not key:
            raise Exception('Public key not found')
        
        # Construct the public key
        public_key = jwk.construct(key)
        
        # Verify and decode the token (includes expiry validation)
        payload = jwt.decode(
            token,
            public_key,
            algorithms=['RS256'],
            audience=CLIENT_ID,
            issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}'
        )
        
        logger.debug('Token validated successfully')
        logger.debug('User: %s', payload.get('username', 'unknown'))
        
        # Cache the validated token
        cache_token(token, payload)
        
        # Return authorization response
        return {
            'isTokenValid': True,
            'roleArn': ROLE_ARN
        }
        
    except ExpiredSignatureError:
        logger.debug('Token expired')
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
    except JWTClaimsError:
        logger.debug('Invalid token claims')
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
    except JWTError as e:
        logger.debug('JWT validation error: %s', e)
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
    except Exception as e:
        logger.debug('Authorization failed: %s', e)
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
```