

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 中的並行 適用於 Rust 的 AWS SDK
<a name="concurrency"></a>

 適用於 Rust 的 AWS SDK 不提供並行控制，但使用者有許多實作自己的選項。

## 條款
<a name="conc-terms"></a>

與此主題相關的術語很容易混淆，有些術語已經成為同義詞，即使它們最初代表不同的概念。在本指南中，我們將定義以下內容：
+  **任務**：您的程式將執行到完成的某些「工作單位」，或嘗試執行到完成。
+  **循序運算**：依序執行數個任務時。
+  **並行運算**：在重疊時段執行多個任務時。
+  **並行**：電腦以任意順序完成多個任務的能力。
+  **多工作業**：電腦同時執行數個任務的能力。
+  **競賽條件**：當程式的行為根據任務啟動的時間或處理任務所需的時間而變更。
+  **爭用**：對共用資源的存取發生衝突。當兩個或多個任務想要同時存取資源時，該資源是「爭用中」。
+  **Deadlock**：無法再進行進度的狀態。這通常是因為兩個任務想要取得彼此的資源，但在另一個任務的資源可用之前，這兩個任務都不會釋出其資源。死鎖會導致程式部分或完全沒有回應。

## 簡單範例
<a name="conc-simple"></a>

我們的第一個範例是循序程式。在稍後的範例中，我們將使用並行技術來變更此程式碼。稍後的範例會重複使用相同的`build_client_and_list_objects_to_download()`方法，並在 中進行變更`main()`。執行下列命令，將相依性新增至您的專案：
+ `cargo add aws-sdk-s3`
+ `cargo add aws-config tokio --features tokio/full`

下列範例任務是下載 Amazon Simple Storage Service 儲存貯體中的所有檔案：

1.  首先列出所有檔案。將金鑰儲存在清單中。

1.  逐一查看清單，依序下載每個檔案 

```
use aws_sdk_s3::{Client, Error};
const EXAMPLE_BUCKET: &str = "amzn-s3-demo-bucket";  // Update to name of bucket you own.

// This initialization function won't be reproduced in
// examples following this one, in order to save space.
async fn build_client_and_list_objects_to_download() -> (Client, Vec<String>) {
    let cfg = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
    let client = Client::new(&cfg);
    let objects_to_download: Vec<_> = client
        .list_objects_v2()
        .bucket(EXAMPLE_BUCKET)
        .send()
        .await
        .expect("listing objects succeeds")
        .contents()
        .into_iter()
        .flat_map(aws_sdk_s3::types::Object::key)
        .map(ToString::to_string)
        .collect();
         
    (client, objects_to_download)
}
```

```
#[tokio::main]
async fn main() {
    let (client, objects_to_download) =
        build_client_and_list_objects_to_download().await;
    
    for object in objects_to_download {
        let res = client
            .get_object()
            .key(&object)
            .bucket(EXAMPLE_BUCKET)
            .send()
            .await
            .expect("get_object succeeds");
        let body = res.body.collect().await.expect("reading body succeeds").into_bytes();
        std::fs::write(object, body).expect("write succeeds");
    }
}
```

**注意**  
 在這些範例中，我們不會處理錯誤，而且我們假設範例儲存貯體沒有具有類似檔案路徑之金鑰的物件。因此，我們不會涵蓋建立巢狀目錄。

由於現代電腦的架構，我們可以重寫此程式以提高效率。我們會在稍後的範例中執行此作業，但首先，讓我們進一步了解一些概念。

## 擁有權和可變性
<a name="conc-ownership"></a>

Rust 中的每個值都有單一擁有者。當擁有者超出範圍時，也會捨棄其擁有的所有值。擁有者可以提供一或多個不可變的值參考**或**單一可變參考。Rust 編譯器負責確保沒有任何參考超過其擁有者。

當多個任務需要可變存取相同的資源時，需要額外的規劃和設計。在循序運算中，每個任務都可以在不爭用的情況下可變存取相同的資源，因為它們會依序執行。不過，在並行運算中，任務可以同時以任何順序執行。因此，我們必須進一步向編譯器證明無法進行多個可變參考 （或至少在發生時當機）。

Rust 標準程式庫提供許多工具來協助我們達成此目標。如需這些主題的詳細資訊，請參閱《Rust 程式設計語言書》中的[變數和可變動性和](https://doc.rust-lang.org/book/ch03-01-variables-and-mutability.html)[了解擁有權](https://doc.rust-lang.org/book/ch04-00-understanding-ownership.html)。

## 更多詞彙！
<a name="conc-moreTerms"></a>

以下是「同步物件」的清單。總之，它們是說服編譯器所需的工具，我們的並行程式不會破壞擁有權規則。

 [https://doc.rust-lang.org/std/sync/index.html](https://doc.rust-lang.org/std/sync/index.html)：
+ [https://doc.rust-lang.org/std/sync/struct.Arc.html](https://doc.rust-lang.org/std/sync/struct.Arc.html)：***A** tomically **R** eference-**C** ounted* 指標。在 中包裝資料時`Arc`，可以自由共用資料，而無需擔心任何特定擁有者提早捨棄該值。在此意義上，值的擁有權會變成「共用」。內的值`Arc`不能可變，但可能具有[內部可變性](https://doc.rust-lang.org/reference/interior-mutability.html)。
+ [https://doc.rust-lang.org/std/sync/struct.Barrier.html](https://doc.rust-lang.org/std/sync/struct.Barrier.html)：確保多個執行緒會等待彼此到達程式中的某個點，然後再一起繼續執行。
+ [https://doc.rust-lang.org/std/sync/struct.Condvar.html](https://doc.rust-lang.org/std/sync/struct.Condvar.html)：***Cond** ition **Var** iable*，可在等待事件發生時封鎖執行緒。
+ [https://doc.rust-lang.org/std/sync/struct.Mutex.html](https://doc.rust-lang.org/std/sync/struct.Mutex.html)：***一種 Mut** ual **Ex** clusion* 機制，可確保一次最多有一個執行緒能夠存取一些資料。一般而言，絕不應將`Mutex`鎖定保留在程式碼中的某個`.await`點。

 [https://docs.rs/tokio/latest/tokio/sync/index.html](https://docs.rs/tokio/latest/tokio/sync/index.html)：

雖然 AWS SDKs 旨在與 `async`-runtime-agnostic 無關，但我們建議在特定情況下使用`tokio`同步物件。
+ [https://docs.rs/tokio/latest/tokio/sync/struct.Mutex.html](https://docs.rs/tokio/latest/tokio/sync/struct.Mutex.html)：類似於標準程式庫的 `Mutex`，但成本略高。與標準 不同`Mutex`，這個可以保留在程式碼中的某個`.await`點。
+ [https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html](https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html)：變數，用於控制多個任務對常見資源的存取。

## 重寫我們的範例以提高效率 （單執行緒並行）
<a name="conc_singleThread"></a>

在下列修改範例中，我們使用 [https://docs.rs/futures-util/latest/futures_util/future/fn.join_all.html](https://docs.rs/futures-util/latest/futures_util/future/fn.join_all.html) 來同時執行**所有**`get_object`請求。執行下列命令，將新的相依性新增至您的專案：
+ `cargo add futures-util`

```
#[tokio::main]
async fn main() {
    let (client, objects_to_download) =
        build_client_and_list_objects_to_download().await;
        
    let get_object_futures = objects_to_download.into_iter().map(|object| {
        let req = client
            .get_object()
            .key(&object)
            .bucket(EXAMPLE_BUCKET);

        async {
            let res = req
                .send()
                .await
                .expect("get_object succeeds");
            let body = res.body.collect().await.expect("body succeeds").into_bytes();
           // Note that we MUST use the async runtime's preferred way
           // of writing files. Otherwise, this call would block,
           // potentially causing a deadlock.
            tokio::fs::write(object, body).await.expect("write succeeds");
        }
    });

    futures_util::future::join_all(get_object_futures).await;
}
```

 這是受益於並行的最簡單方法，但它也有幾個問題，一開始可能並不明顯：

1.  我們會同時建立所有請求輸入。如果我們沒有足夠的記憶體來保留所有`get_object`請求輸入，則會遇到「out-of-memory」配置錯誤。

1.  我們同時建立和等待所有未來。如果我們一次嘗試下載太多，Amazon S3 會調節請求。

若要修正這兩個問題，我們必須限制我們一次傳送的請求數量。我們會使用`tokio`[旗號](https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html)來執行此操作：

```
use std::sync::Arc;
use tokio::sync::Semaphore;
const CONCURRENCY_LIMIT: usize = 50; 

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let (client, objects_to_download) =
        build_client_and_list_objects_to_download().await;
    let concurrency_semaphore = Arc::new(Semaphore::new(CONCURRENCY_LIMIT));

    let get_object_futures = objects_to_download.into_iter().map(|object| {
        // Since each future needs to acquire a permit, we need to clone
        // the Arc'd semaphore before passing it in.
        let semaphore = concurrency_semaphore.clone();
        // We also need to clone the client so each task has its own handle.
        let client = client.clone();
        async move {
            let permit = semaphore
                .acquire()
                .await
                .expect("we'll get a permit if we wait long enough");
            let res = client
                .get_object()
                .key(&object)
                .bucket(EXAMPLE_BUCKET)
                .send()
                .await
                .expect("get_object succeeds");
            let body = res.body.collect().await.expect("body succeeds").into_bytes();
            tokio::fs::write(object, body).await.expect("write succeeds");
            std::mem::drop(permit);
        }
    });

    futures_util::future::join_all(get_object_futures).await;
}
```

我們已將請求建立移至 `async`區塊，以修正潛在的記憶體用量問題。如此一來，在傳送請求之前，系統不會建立請求。

**注意**  
 如果您有記憶體，一次建立所有請求輸入並將其保留在記憶體中，直到準備好傳送為止，可能會更有效率。若要嘗試這麼做，請將請求輸入建立移至`async`區塊之外。

 我們也修正了同時傳送太多請求的問題，方法是限制傳送至 的請求`CONCURRENCY_LIMIT`。

**注意**  
 每個專案的正確值`CONCURRENCY_LIMIT`都不同。建構和傳送自己的請求時，請嘗試盡可能將其設定為最高，而不會遇到調節錯誤。雖然可以根據服務傳回的成功與調節回應比例動態更新並行限制，但由於其複雜性，因此超出本指南的範圍。

## 重寫我們的範例以提高效率 （多執行緒並行）
<a name="conc-multiThread"></a>

 在前兩個範例中，我們同時執行了請求。雖然這比同步執行它們更有效率，但我們可以透過使用多執行緒讓事情更有效率。若要使用 執行此操作`tokio`，我們需要將其產生為個別任務。

**注意**  
 此範例要求您使用多執行緒`tokio`執行時間。此執行時間位於 `rt-multi-thread`功能後方。當然，您需要在多核心機器上執行程式。

執行下列命令，將新的相依性新增至您的專案：
+ `cargo add tokio --features=rt-multi-thread`

```
// Set this based on the amount of cores your target machine has.
const THREADS: usize = 8; 

#[tokio::main(flavor = "multi_thread")]
async fn main() {
    let (client, objects_to_download) =
        build_client_and_list_objects_to_download().await;
    let concurrency_semaphore = Arc::new(Semaphore::new(THREADS));

    let get_object_task_handles = objects_to_download.into_iter().map(|object| {
        // Since each future needs to acquire a permit, we need to clone
        // the Arc'd semaphore before passing it in.
        let semaphore = concurrency_semaphore.clone();
        // We also need to clone the client so each task has its own handle.
        let client = client.clone();
        
        // Note this difference! We're using `tokio::task::spawn` to
        // immediately begin running these requests.
        tokio::task::spawn(async move {
            let permit = semaphore
                .acquire()
                .await
                .expect("we'll get a permit if we wait long enough");
            let res = client
                .get_object()
                .key(&object)
                .bucket(EXAMPLE_BUCKET)
                .send()
                .await
                .expect("get_object succeeds");
            let body = res.body.collect().await.expect("body succeeds").into_bytes();
            tokio::fs::write(object, body).await.expect("write succeeds");
            std::mem::drop(permit);
        })
    });

    futures_util::future::join_all(get_object_task_handles).await;
}
```

將工作劃分為任務可能很複雜。執行 I/O (*輸入/輸出*) 通常會封鎖。執行期可能無法平衡長時間執行任務與短期執行任務的需求。無論您選擇哪個執行時間，請務必閱讀他們的建議，以最有效率的方式將您的工作劃分為任務。如需`tokio`執行時間建議，請參閱[模組 `tokio::task`](https://docs.rs/tokio/latest/tokio/task/index.html)。

## 偵錯多執行緒應用程式
<a name="conc-debug"></a>

可依任何順序同時執行的任務。因此，並行程式的日誌很難讀取。在適用於 Rust 的 開發套件中，建議使用 `tracing` 記錄系統。無論日誌何時執行，它都可以將日誌與其特定任務分組。如需準則，請參閱[在適用於 Rust 的 AWS SDK 中設定和使用記錄](logging.md)。

識別鎖定任務的實用工具是 [https://github.com/tokio-rs/console](https://github.com/tokio-rs/console)，這是非同步 Rust 程式的診斷和偵錯工具。透過檢測和執行您的程式，然後執行`tokio-console`應用程式，您可以查看程式正在執行之任務的即時檢視。此檢視包含有用的資訊，例如任務等待取得共用資源所花費的時間，或輪詢的時間。