

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 設定 OIDC 身分驗證的 AWS Lambda 授權方
<a name="dicomweb-oidc-requirements"></a>

本指南假設您已設定您選擇的身分提供者 (IdP)，以提供與 HealthImaging OIDC 身分驗證功能要求相容的存取權杖。

## 1. 設定 DICOMWeb API 存取的 IAM 角色
<a name="dicomweb-oidc-iam-roles"></a>

在設定 Lambda 授權方之前，請為 HealthImaging 建立 IAM 角色，以在處理 DICOMWeb API 請求時擔任。授權方 Lambda 函數會在權杖驗證成功後傳回其中一個角色 ARN，允許 HealthImaging 以適當的許可執行請求。

1. 建立定義所需 DICOMWeb API 權限的 IAM 政策。如需可用的許可，請參閱 HealthImaging 文件的「[使用 DICOMweb](https://docs.aws.amazon.com/healthimaging/latest/devguide/using-dicomweb.html)」一節。

1. 建立符合下列條件的 IAM 角色：
   + 連接這些政策
   + 包含信任關係，允許 AWS HealthImaging 服務主體 (`medical-imaging.amazonaws.com`) 擔任這些角色。

以下是允許相關角色存取 HealthImaging DICOMWeb 唯讀 API 的政策範例：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "MedicalImagingDicomWebOperations",
            "Effect": "Allow",
            "Action": [
                "medical-imaging:SearchDICOMInstances",
                "medical-imaging:GetImageSetMetadata",
                "medical-imaging:GetDICOMSeriesMetadata",
                "medical-imaging:SearchDICOMStudies",
                "medical-imaging:GetDICOMBulkdata",
                "medical-imaging:SearchDICOMSeries",
                "medical-imaging:GetDICOMInstanceMetadata",
                "medical-imaging:GetDICOMInstance",
                "medical-imaging:GetDICOMInstanceFrames"
            ],
            "Resource": "arn:aws:medical-imaging:us-east-1:123456789012:datastore/datastore-123"
        }
    ]
}
```

------

以下是應該與角色建立關聯的信任關係政策範例 （應該）：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "OIDCRoleFederation",
            "Effect": "Allow",
            "Principal": {
                "Service": "medical-imaging.amazonaws.com"
        },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

您將在下一個步驟中建立的 Lambda 授權方可以評估字符宣告，並傳回適當角色的 ARN。然後，AWS HealthImaging 會模擬此角色，以執行具有對應許可的 DICOMWeb API 請求。

例如：
+ 具有「admin」宣告的權杖可能會傳回具有完整存取權之角色的 ARN
+ 具有「讀取器」宣告的字符可能會傳回具有唯讀存取權之角色的 ARN
+ 具有「department\$1A」宣告的權杖可能會傳回該部門存取層級特定角色的 ARN

此機制可讓您透過 IAM 角色，將 IdP 的授權模型映射至特定 AWS HealthImaging 許可。

## 2. 建立和設定 Lambda 授權方函數
<a name="dicomweb-oidc-configure-lambda"></a>

建立 Lambda 函數來驗證 JWT 字符，並根據字符宣告評估傳回適當的 IAM 角色 ARN。運作狀態影像服務會叫用此函數，並傳遞事件，其中包含在 HTTP 請求中找到的 HealthImaging 資料存放區 ID、DICOMWeb 操作和存取字符：

```
{
  "datastoreId": "{datastore id}",
  "operation": "{Healthimaging API name e.g. GetDICOMInstance}",
  "bearerToken": "{access token}"
}
```

Lambda 授權方函數必須傳回具有下列結構的 JSON 回應：

```
{
  "isTokenValid": {true or false},
  "roleArn": "{role arn or empty string meaning to deny the request explicitly}"
}
```

如需詳細資訊，請參閱實作範例。

**注意**  
由於 DICOMWeb 請求只有在 Lambda 授權方驗證存取權杖後才會得到回應，因此請務必盡快執行此函數，以提供最佳的 DICOMWeb API 回應時間。

若要讓 HealthImaging 服務有權叫用 lambda 授權方函數，它必須具有允許 HealthImaging 服務叫用它的資源政策。此資源政策可以在 Lambda 組態索引標籤的許可選單中建立，或使用 AWS CLI：

```
aws lambda add-permission \
    --function-name YourAuthorizerFunctionName \
    --statement-id HealthImagingInvoke \
    --action lambda:InvokeFunction \
    --principal medical-imaging.amazonaws.com
```

此資源政策允許 HealthImaging 服務在驗證 DICOMWeb API 請求時調用您的 Lambda 授權方。

**注意**  
稍後可以使用符合特定 HealthImaging 資料存放區的 ARN 的「ArnLike」條件來更新 lambda 資源政策。

以下是 lambda 資源政策的範例：

------
#### [ JSON ]

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Id": "default",
  "Statement": [
    {
      "Sid": "LambaAuthorizer-HealthImagingInvokePermission",
      "Effect": "Allow",
      "Principal": {
        "Service": "medical-imaging.amazonaws.com"
      },
      "Action": "lambda:InvokeFunction",
      "Resource": "arn:aws:lambda:us-east-1:123456789012::function:{LambdaAuthorizerFunctionName}",
      "Condition": {
        "ArnLike": {
          "AWS:SourceArn": "arn:aws:medical-imaging:us-east-1:123456789012:datastore/datastore-123"
        }
      }
    }
  ]
}
```

------

## 3. 使用 OIDC 身分驗證建立新的資料存放區
<a name="dicomweb-oidc-datastore"></a>

若要啟用 OIDC 身分驗證，您必須使用 AWS CLI 參數為 "lambda-authorizer-arn" 的 建立新的資料存放區。在未聯絡 AWS Support 的情況下，無法在現有的資料存放區上啟用 OIDC 身分驗證。

以下是如何在啟用 OIDC 身分驗證的情況下建立新的資料存放區的範例：

```
aws medical-imaging create-datastore \
    --datastore-name YourDatastoreName \
    --lambda-authorizer-arn YourAuthorizerFunctionArn
```

您可以使用 get-datastore 命令來檢查特定資料存放區是否已啟用 AWS CLI OIDC 身分驗證功能，並驗證屬性 "lambdaAuthorizerArn" 是否存在：

```
aws medical-imaging get-datastore --datastore-id YourDatastoreId
```

```
{
    "datastoreProperties": {
        "datastoreId": YourdatastoreId,
        "datastoreName": YourDatastoreName,
        "datastoreStatus": "ACTIVE",
        "lambdaAuthorizerArn": YourAuthorizerFunctionArn,
        "datastoreArn": YourDatastoreArn,
        "createdAt": "2025-09-30T14:16:04.015000-05:00",
        "updatedAt": "2025-09-30T14:16:04.015000-05:00"
    }
}
```

**注意**  
 AWS CLI 資料存放區建立命令的執行角色必須具有適當的許可，才能叫用 Lambda 授權方函數。這可減少惡意使用者可以透過資料存放區授權方組態執行未經授權 Lambda 函數的權限提升攻擊。

## 例外代碼
<a name="dicomweb-oidc-exceptions"></a>

如果身分驗證失敗，HealthImaging 會傳回下列 HTTP 錯誤回應代碼和內文訊息：


| 條件 | AHI 回應 | 
| --- | --- | 
| Lambda 授權方不存在或無效 | 424 授權方設定錯誤 | 
| 授權方因執行失敗而終止 | 424 授權方失敗 | 
| 任何其他未映射的授權方錯誤 | 424 授權方失敗 | 
| 授權方傳回無效/格式錯誤的回應 | 424 授權方設定錯誤 | 
| 授權方執行超過 1 秒 | 408 授權方逾時 | 
| 字符已過期或無效 | 403 無效的權杖或過期權杖 | 
| 由於授權方組態錯誤，AHI 無法聯合傳回的 IAM 角色 | 424 授權方設定錯誤 | 
| 授權方傳回空的角色 | 403 存取遭拒 | 
| 傳回的角色不可呼叫 (assume-role/trust misconfig) | 424 授權方設定錯誤 | 
| 請求率超過 DICOMweb Gateway 限制 | 429 太多請求 | 
| 資料存放區、傳回角色或授權方跨帳戶/跨區域 | 424 授權方跨帳戶/跨區域存取 | 

## 實作範例
<a name="dicomweb-oidc-implementation"></a>

此 Python 範例示範 lambda 授權方函數，可驗證來自 HealthImaging 事件的 AWS Cognito 存取字符，並傳回具有適當 DICOMWeb 權限的 IAM 角色 ARN。

Lambda 授權方實作兩種快取機制，以減少外部呼叫和回應延遲。JWKS (JSON Web 金鑰集） 會每小時擷取一次，並存放在函數的暫時資料夾中，允許後續函數叫用在本機讀取，而不是從公有網路擷取。您也會注意到 Token\$1cache 字典物件在此 Lambda 函數的全域內容中執行個體化。全域變數由重複使用相同暖 Lambda 內容的所有調用共用。因此，已成功驗證的字符可以存放在此字典中，並在下次執行這個相同的 Lambda 函數時快速查詢。快取方法代表一種通才方法，可以適應大多數身分提供者發出的存取權杖。如需 AWS Cognito 特定快取選項，請參閱 [AWS Cognito 文件](https://docs.aws.amazon.com/cognito/latest/developerguide/what-is-amazon-cognito.html)的[管理使用者集](https://docs.aws.amazon.com/cognito/latest/developerguide/managing-users.html)區區段和[快取區段](https://docs.aws.amazon.com/cognito/latest/developerguide/amazon-cognito-user-pools-using-tokens-caching-tokens.html)。

```
import json
import os
import time
import logging
from jose import jwk, jwt
from jose.exceptions import ExpiredSignatureError, JWTClaimsError, JWTError
import requests
import tempfile

# Configure logging
logger = logging.getLogger()
log_level = os.environ.get('LOG_LEVEL', 'WARNING').upper()
logger.setLevel(getattr(logging, log_level, logging.WARNING))

# Global token cache with TTL
token_cache = {}

# JWKS cache file path
JWKS_CACHE_FILE = os.path.join(tempfile.gettempdir(), 'jwks.json')
JWKS_CACHE_TTL = 3600  # 1 hour

# Load environment variables once
USER_POOL_ID = os.environ['USER_POOL_ID']
CLIENT_ID = os.environ['CLIENT_ID']
ROLE_ARN = os.environ.get('AHIDICOMWEB_READONLY_ROLE_ARN', '')

def cleanup_expired_tokens():
    """Remove expired tokens from cache"""
    now = int(time.time())
    expired_keys = [token for token, data in token_cache.items() if now > data['cache_expiry']]
    for token in expired_keys:
        del token_cache[token]

def get_cached_jwks():
    """Get JWKS from cache file if valid, otherwise return None """
    try:
        if os.path.exists(JWKS_CACHE_FILE):
            # Check if cache file is still valid
            cache_age = time.time() - os.path.getmtime(JWKS_CACHE_FILE)
            if cache_age < JWKS_CACHE_TTL:
                with open(JWKS_CACHE_FILE, 'r') as f:
                    jwks = json.load(f)
                    logger.debug(f'Using cached JWKS (age: {int(cache_age)}s)')
                    return jwks
            else:
                logger.debug(f'JWKS cache expired (age: {int(cache_age)}s)')
    except Exception as e:
        logger.debug(f'Error reading JWKS cache: {e}')
    
    return None

def cache_jwks(jwks):
    """Cache JWKS to file"""
    try:
        with open(JWKS_CACHE_FILE, 'w') as f:
            json.dump(jwks, f)
        logger.debug('JWKS cached successfully')
    except Exception as e:
        logger.debug(f'Error caching JWKS: {e}')

def fetch_jwks(jwks_url):
    """Fetch JWKS from URL and cache it"""
    logger.debug('Fetching JWKS from URL')
    jwks = requests.get(jwks_url, timeout=10).json()
    # Convert to dict for faster lookups
    jwks['keys_by_kid'] = {key['kid']: key for key in jwks['keys']}
    cache_jwks(jwks)
    return jwks

def is_token_cached(token):
    if token not in token_cache:
        return None
    
    cached = token_cache[token]
    now = int(time.time())
    
    if now > cached['cache_expiry']:
        del token_cache[token]
        return None
    
    return cached

def cache_token(token, payload):
    now = int(time.time())
    token_exp = payload.get('exp')
    cache_expiry = min(now + 60, token_exp)  # 1 minute or token expiry, whichever is sooner
    
    token_cache[token] = {
        'payload': payload,
        'cache_expiry': cache_expiry,
        'role_arn': ROLE_ARN
    }

def handler(event, context):
    cleanup_expired_tokens() # start be removing expired tokens from the cache
    try:
        # Extract token from bearerToken or authorizationToken field
        token = event.get('bearerToken')
        if not token:
            raise Exception('No token provided')
        
        # Check cache first
        cached = is_token_cached(token)
        if cached:
            logger.debug('Token found in cache, skipping verification')
            return {
                'isTokenValid': True,
                'roleArn': cached['role_arn']
            }
        
        # Get Cognito configuration
        region = context.invoked_function_arn.split(':')[3]
        
        # Get JWKS (cached or fresh)
        jwks_url = f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}/.well-known/jwks.json'
        jwks = get_cached_jwks()
        if not jwks:
            jwks = fetch_jwks(jwks_url)
        
        # Decode token header to get kid
        headers = jwt.get_unverified_headers(token)
        kid = headers['kid']
        
        # Find the correct key
        key = None
        for jwk_key in jwks['keys']:
            if jwk_key['kid'] == kid:
                key = jwk_key
                break
        
        if not key:
            # Key not found - try refreshing JWKS in case of key rotation
            logger.debug('Key not found in cached JWKS, fetching fresh JWKS')
            jwks = fetch_jwks(jwks_url)
            for jwk_key in jwks['keys']:
                if jwk_key['kid'] == kid:
                    key = jwk_key
                    break
        
        if not key:
            raise Exception('Public key not found')
        
        # Construct the public key
        public_key = jwk.construct(key)
        
        # Verify and decode the token (includes expiry validation)
        payload = jwt.decode(
            token,
            public_key,
            algorithms=['RS256'],
            audience=CLIENT_ID,
            issuer=f'https://cognito-idp.{region}.amazonaws.com/{USER_POOL_ID}'
        )
        
        logger.debug('Token validated successfully')
        logger.debug('User: %s', payload.get('username', 'unknown'))
        
        # Cache the validated token
        cache_token(token, payload)
        
        # Return authorization response
        return {
            'isTokenValid': True,
            'roleArn': ROLE_ARN
        }
        
    except ExpiredSignatureError:
        logger.debug('Token expired')
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
    except JWTClaimsError:
        logger.debug('Invalid token claims')
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
    except JWTError as e:
        logger.debug('JWT validation error: %s', e)
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
    except Exception as e:
        logger.debug('Authorization failed: %s', e)
        return {
            'isTokenValid': False,
            'roleArn': ''
        }
```