Amazon Web Services ブログ

Apache Iceberg on AWS Glue Data Catalog における同時書き込み競合の管理

本記事は、2025/4/8 に公開された Manage concurrent write conflicts in Apache Iceberg on the AWS Glue Data Catalog を翻訳したものです。翻訳は Solutions Architect の深見が担当しました。

現代的なデータアーキテクチャにおいて、Apache Iceberg はデータレイクの人気のあるテーブルフォーマットとして台頭しており、ACID トランザクションと同時書き込みサポートなどの重要な機能を備えています。これらの機能は強力ですが、本番環境で効果的に実装するには、慎重な検討を要する独自の課題があります。
一般的なシナリオを考えてみましょう。ストリーミングパイプラインが継続的に Iceberg テーブルにデータを書き込み、スケジュールされたメンテナンスジョブがコンパクション操作を実行しています。Iceberg には同時書き込みを処理するための組み込みメカニズムがありますが、ストリーミング更新とコンパクション操作の間のような特定の競合シナリオでは、トランザクションが失敗し、特別な処理パターンが必要になる可能性があります。
この記事では、Iceberg テーブルで信頼性の高い同時書き込み処理メカニズムを実装する方法を示します。Iceberg の同時実行モデルを探り、一般的な競合シナリオを検討し、自動再試行メカニズムと、独自の競合解決ロジックが必要な状況の実用的な実装パターンを提供して、耐障害性の高いデータパイプラインを構築します。また、AWS Glue Data Catalog テーブル最適化による自動コンパクションと併用した設計パターンについても説明します。

一般的な競合シナリオ

多くの組織がデータパイプラインで直面する、データ競合が最も頻繁に発生する具体的な運用シナリオについて、この節で説明します。

重複したパーティションへの UPDATE/DELETE の同時実行

複数のプロセスが同時に同じパーティションを変更しようとすると、データの競合が発生する可能性があります。例えば、ここで両方の操作が同じパーティションのcustomer_id を対象にした場合、重複するデータセットを変更することで競合が発生する可能性があります。両方の操作は customer_id に基づいて同じパーティションを対象としているため、重複するデータセットを変更しているので競合が発生する可能性があります。このような競合は、大規模なデータクリーンアップ操作で特に一般的です。

コンパクション vs ストリーミング書き込み

テーブルのメンテナンス操作中に、典型的な競合シナリオが発生する可能性があります。リアルタイムのイベントデータを取り込むストリーミングパイプラインと、ファイルサイズを最適化するためにスケジュールされたコンパクションジョブが同時に実行されている場合を考えてみましょう。ストリーミングプロセスが新しいレコードをパーティションに書き込んでいる間に、コンパクションジョブが同じパーティション内の既存ファイルを結合しようとしているかもしれません。このシナリオは、特に Glue Data Catalog の自動最適化機能を利用している際に一般的に起こりうるシナリオです。自動コンパクションが継続的なデータ取り込みと同時に実行される可能性があるためです。

MERGE の同時実行操作

MERGE 操作は、データの読み取りと書き込みの両方を含むため、特に競合が発生しやすくなります。例えば、ある時間単位のジョブがソースシステムからの顧客プロファイル更新をマージしている一方で、別のジョブが別のシステムからの設定更新をマージしている場合、両方のジョブが同じ顧客レコードを変更しようとすると、それぞれの操作が異なる現在のデータ状態に基づいて変更を行うため、競合が発生する可能性があります。

一般的なテーブルの同時更新

複数のトランザクションが同時に発生すると、他のトランザクションの干渉により、一部のトランザクションがカタログにコミットできない可能性があります。Iceberg にはこのシナリオを処理するメカニズムがあり、多くの場合、同時トランザクションに対応できます。ただし、更新の基準となるメタデータバージョンが確立された後にメタデータが最新化されると、コミットが失敗する可能性があります。このシナリオは、Iceberg テーブルのあらゆる種類の更新に適用されます。

Iceberg の同時実行モデルと競合の種類

特定の実装パターンに入る前に、Iceberg がテーブルアーキテクチャとトランザクションモデルを通じて同時書き込みをどのように管理するかを理解することが不可欠です。Iceberg は、テーブルの状態とデータを管理するために階層アーキテクチャを使用しています。

  • カタログレイヤー – 現在のテーブルメタデータファイルへのポインターを維持し、テーブル状態の単一の情報源として機能します。Glue Data Catalog は Iceberg カタログと同様の機能を提供します。
  • メタデータレイヤー – テーブルの履歴、スキーマの進化、スナップショット情報を追跡するメタデータファイルが含まれています。これらのファイルは Amazon Simple Storage Service (Amazon S3) に格納されています。
  • データレイヤー – 実際のデータファイルと削除ファイル (Merge-on-Read 操作用) が格納されています。これらのファイルも Amazon S3 に格納されています。

次の図はこのアーキテクチャを示しています。

このアーキテクチャは Iceberg の楽観的同時実行制御の基本であり、複数のライターが同時に操作を進めることができ、競合はコミット時に検出されます。

書き込みトランザクションの流れ

Iceberg での典型的な書き込みトランザクションは、次のキーステップに従います。

  1. 現在の状態を読み取ります。OVERWRITE、MERGE、DELETE などの多くの操作では、クエリエンジンがどのファイルまたは行が関連するかを知る必要があるため、現在のテーブルスナップショットを読み取ります。INSERT などの操作ではこの手順はオプションです。
  2. トランザクションが行う変更を確定し、新しいデータファイルを書き込みます。
  3. テーブルの最新のメタデータを読み込み、更新の基準となるメタデータバージョンを判断します。
  4. ステップ 2 で準備した変更が、ステップ 3 の最新のテーブルデータと互換性があるかどうかを確認します。互換性がないことが検出された場合、トランザクションは停止する必要があります。
  5. 新しいメタデータファイルを生成します。
  6. メタデータファイルをカタログにコミットします。コミットに失敗した場合は、ステップ 3 から再試行します。再試行回数は設定によって異なります。

次の図はこのワークフローを示しています。
Iceberg write transaction flow
競合が発生する可能性がある重要な 2 つのポイントは次のとおりです。

  • データ更新の競合 – データの競合をチェックする際の検証時 (ステップ 4)
  • カタログコミットの競合 – カタログポインタを更新しようとするコミット時 (ステップ 6)

Iceberg テーブルを扱う際、発生しうる競合の種類とその処理方法を理解することは、信頼性の高いデータパイプラインを構築する上で非常に重要です。主な 2 種類の競合とその特徴について説明しましょう。

カタログコミットの競合

カタログコミット競合は、複数のライターが同時にテーブルメタデータを更新しようとしたときに発生します。コミット競合が発生すると、Iceberg はテーブルの書き込みプロパティに基づいて自動的に操作を再試行します。再試行プロセスはメタデータのコミットのみを繰り返すため、安全で効率的です。再試行に失敗すると、トランザクションは CommitFailedException で失敗します。
次の図では、2 つのトランザクションが同時に実行されています。トランザクション 1 は、Iceberg カタログ内のテーブルの最新のスナップショットを 0 から 1 に正常に更新しました。一方、トランザクション 2 はスナップショット 0 から 1 への更新を試みましたが、カタログへの変更をコミットしようとしたときに、最新のスナップショットがすでにトランザクション 1 によって 1 に変更されていたことがわかりました。その結果、トランザクション 2 はステップ 3 から再試行する必要がありました。
Catalog commit conflicts1
これらの競合は一時的なものであり、再試行によって自動的に解決できます。オプションで、コミットの再試行動作を制御する書き込みプロパティを構成できます。より詳細な構成については、Iceberg ドキュメントの 書き込みプロパティ を参照してください。
現在の状態を読み取るときに使用されるメタデータ (ステップ 1) と、更新のベースメタデータとして使用されるスナップショット (ステップ 3) は異なる可能性があります。ステップ 1 とステップ 3 の間に別のトランザクションが最新のスナップショットを更新しても、現在のトランザクションはデータ競合チェック (ステップ 4) に合格すれば、カタログに変更をコミットできます。つまり、変更の計算とデータファイルの書き込み (ステップ 1 から 2) に長い時間がかかり、その間に他のトランザクションが変更を加えた場合でも、トランザクションはカタログへのコミットを試行できます。これは、Iceberg の賢明な同時実行制御メカニズムを示しています。
次の図はこのワークフローを示しています。
Catalog commit conflicts2

データ更新の競合

データ更新の競合は、より複雑で、同時実行されるトランザクションが重複するデータを変更しようとしたときに発生します。書き込みトランザクション中、クエリエンジンはトランザクション分離ルールに従って、書き込まれるスナップショットと最新のスナップショットの整合性をチェックします。不整合が検出された場合、トランザクションは ValidationException で失敗します。
次の図では、2 つのトランザクションが idnamesalary 列を含む従業員テーブルで同時に実行されています。トランザクション 1 は、スナップショット 0 に基づいてレコードを更新しようとし、この変更を正常にコミットしてスナップショットのバージョンを 1 に更新しました。一方、トランザクション 2 も同じレコードをスナップショット 0 に基づいて更新しようとしました。トランザクション 2 が最初にデータをスキャンした時点では、最新のスナップショットは 0 でしたが、その後トランザクション 1 によって 1 に更新されていました。データ競合チェック中に、トランザクション 2 はその変更がスナップショット 1 と競合していることを検出し、トランザクションが失敗しました。
data conflict
この競合シナリオでは、更新の元となるテーブルの状態が変更されているため、トランザクションを再施行した場合に全体のデータ整合性が維持されるかどうかが不確かになるため、Iceberg のライブラリでは自動的に再試行できません。この種の競合は、個別のユースケースと要件に基づいて対処する必要があります。
次の表は、各書き込みパターン別に競合が発生する可能性の有無をまとめたものです。

書き込みパターン カタログコミット競合
(自動再試行可能)
データ競合
(再試行なし)
INSERT (AppendFiles) Yes Never
UPDATE/DELETE with Copy-on-Write または Merge-on-Read (OverwriteFiles) Yes Yes
Compaction (RewriteFiles) Yes Yes

Iceberg テーブルの分離レベル
Iceberg テーブルは、Serializable 分離Snapshot 分離の 2 つの分離レベルをサポートしています。どちらも、テーブルの一貫したビューを読み取り、リーダーがコミットされたデータのみを参照することを保証します。Serializable 分離は、同時実行操作が逐次的に実行されたかのように処理されることを保証します。Snapshot 分離は保証が弱くなりますが、同時に多数の書き込みクライアントが存在する環境でのパフォーマンスが向上します。Snapshot 分離では、同時実行トランザクションが条件に一致する可能性のあるレコードを含む新しいファイルを追加した場合でも、データ競合チェックが合格する可能性があります。
デフォルトでは、Iceberg テーブルはSerializable 分離を使用します。テーブルプロパティを使用して、特定の操作の分離レベルを構成できます。

tbl_properties = {
    'write.delete.isolation-level' = 'serializable',
    'write.update.isolation-level' = 'serializable',
    'write.merge.isolation-level' = 'serializable'
}

ユースケースに基づいて適切な分離レベルを選択する必要があります。最も一般的な競合シナリオの一つであるストリーミングでの書き込みとコンパクション操作の同時実行では、スナップショット分離を選んだとしても、競合を緩和する観点でシリアライザブル分離に対する追加のメリットはない点に留意してください。より詳細な設定については、IsolationLevel を参照してください。

実装パターン

Iceberg で堅牢な同時書き込み処理を実装するには、競合の種類とユースケースに応じて異なる戦略が必要です。このセクションでは、一般的なシナリオを処理するための実証済みのパターンを共有します。

カタログコミット競合の管理

カタログコミットの競合は、テーブルプロパティを通じて比較的簡単に処理できます。以下の構成は、ワークロードのパターンと要件に応じて調整できる、初期のベースライン設定として機能します。
頻繁な同時書き込み (ストリーミングによる書き込みなど) の場合:

tbl_properties = {
    'commit.retry.num-retries': '10',
    'commit.retry.min-wait-ms': '100',
    'commit.retry.max-wait-ms': '10000',
    'commit.retry.total-timeout-ms': '1800000'
}

メンテナンス操作 (コンパクション等) の場合:

tbl_properties = {
    'commit.retry.num-retries': '4',
    'commit.retry.min-wait-ms': '1000',
    'commit.retry.max-wait-ms': '60000',
    'commit.retry.total-timeout-ms': '1800000'
}

データ更新の競合管理

データ更新の競合は自動的に再試行できないため、適切なエラー処理を伴うカスタムの再試行メカニズムを実装する必要があります。一般的なシナリオは、ストリームでの UPSERT 取り込みと同時実行のコンパクション操作が競合する場合です。このような場合、ストリーム取り込みジョブは通常、データを処理するために再試行を実装する必要があります。適切なエラー処理がないと、ジョブは ValidationException で失敗します。
Iceberg ストリーミングジョブでのデータ競合に対するエラー処理の実用的な実装を示す 2 つのサンプルスクリプトを紹介します。このコードは、適切な Java と Python を相互に利用する際に不可欠な Py4JJavaError 処理を通じて、ValidationException を捕捉しています。また、エクスポネンシャルバックオフとジッター戦略を実装し、各再試行間隔に 0 〜 25% のランダムな遅延を追加しています。たとえば、指数関数的バックオフ時間の基準が 4 秒の場合、実際の再試行の遅延は 4 〜 5 秒の間になり、即座の再試行の嵐を防ぎながら、合理的な待ち時間を維持できます。
このサンプルでは、一意の識別子として 'value' を使用し、その範囲を人為的に制限することで、同じレコードに対する頻繁な MERGE 操作のシナリオを作成しています。モジュロ演算 (value % 20) を適用することで、すべての値を 0 〜 19 の範囲に制限しています。これにより、複数の更新が同じレコードを対象とすることになります。たとえば、元のストリームに値 0、20、40、60 が含まれている場合、すべて 0 にマッピングされるため、同じレコードに対して複数の MERGE 操作が行われます。次に、groupBy と max 集約を使用して、一般的な UPSERT パターンをシミュレートします。ここでは、各値の最新のレコードを保持します。変換されたデータは一時ビューに格納され、MERGE ステートメントのソーステーブルとして機能します。これにより、'value' を一致条件として使用して UPDATE 操作を実行できます。このセットアップにより、同時実行トランザクションが同じレコードを変更しようとしたときに発生する ValidationExceptions に対するリトライメカニズムの動作を確認できます。
最初のサンプルでは、20 秒のトリガー間隔でデータを生成するレートソースを使用する Spark Structured Streaming を使用して、同時操作によるデータ競合が発生したときの再試行メカニズムの動作を示します。<database_name>は実際のデータベース名、<table_name>は実際のテーブル名、amzn-s3-demo-bucket は 実際の S3 バケット名にそれぞれ置き換えてください。

import time 
 import random 
 from pyspark.sql import SparkSession 
 from py4j.protocol import Py4JJavaError 
 from pyspark.sql.functions import max as max_ 

 CATALOG = "glue_catalog"
 DATABASE = ""
 TABLE = "<table>"
 BUCKET = "amzn-s3-demo-bucket"

 spark = SparkSession.builder \ 
    .appName("IcebergUpsertExample") \ 
    .config(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \ 
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ 
    .config(f"spark.sql.catalog.{CATALOG}.io-impl","org.apache.iceberg.aws.s3.S3FileIO") \ 
    .config("spark.sql.defaultCatalog", CATALOG) \ 
    .config(f"spark.sql.catalog.{CATALOG}.type", "glue") \ 
    .getOrCreate()
    
 spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE}.{TABLE} (
        timestamp TIMESTAMP,
        value LONG 
    )
    USING iceberg 
    LOCATION 's3://{BUCKET}/warehouse'
""")

 def backoff(attempt):
    """Exponential backoff with jitter"""
    exp_backoff = min(2 ** attempt, 60)
    jitter = random.uniform(0, 0.25 * exp_backoff)
    return exp_backoff + jitter 

 def is_validation_exception(java_exception):
    """Check if exception is ValidationException"""
    cause = java_exception 
    while cause is not None:
        if "org.apache.iceberg.exceptions.ValidationException" in str(cause.getClass().getName()):
            return True 
        cause = cause.getCause()
    return False 

 def upsert_with_retry(microBatchDF, batchId):
    max_retries = 5 
    attempt = 0 
    
    # Use a narrower key range to intentionally increase updates for the same value in MERGE 
    transformedDF = microBatchDF \ 
        .selectExpr("timestamp", "value % 20 AS value") \ 
        .groupBy("value") \ 
        .agg(max_("timestamp").alias("timestamp"))
        
    view_name = f"incoming_data_{batchId}"
    transformedDF.createOrReplaceGlobalTempView(view_name)
    
    while attempt &lt; max_retries:
        try:
            spark.sql(f"""
                MERGE INTO {DATABASE}.{TABLE} AS t 
                USING global_temp.{view_name} AS i 
                ON t.value = i.value 
                WHEN MATCHED THEN 
                  UPDATE SET 
                    t.timestamp = i.timestamp,
                    t.value     = i.value 
                WHEN NOT MATCHED THEN 
                  INSERT (timestamp, value)
                  VALUES (i.timestamp, i.value)
            """)
            
            print(f"[SUCCESS] Batch {batchId} processed successfully")
            return 
            
        except Py4JJavaError as e:
            if is_validation_exception(e.java_exception):
                attempt += 1 
                if attempt &lt; max_retries:
                    delay = backoff(attempt)
                    print(f"[RETRY] Batch {batchId} failed with ValidationException. "
                          f"Retrying in {delay} seconds. Attempt {attempt}/{max_retries}")
                    time.sleep(delay)
                else:
                    print(f"[FAILED] Batch {batchId} failed after {max_retries} attempts")
                    raise 

# Sample streaming query setup 
 df = spark.readStream \ 
    .format("rate") \ 
    .option("rowsPerSecond", 10) \ 
    .load()

# Start streaming query 
 query = df.writeStream \ 
    .trigger(processingTime="20 seconds") \ 
    .option("checkpointLocation", f"s3://{BUCKET}/checkpointLocation") \ 
    .foreachBatch(upsert_with_retry) \ 
    .start()

 query.awaitTermination()

2 つ目のサンプルは、AWS Glue Streaming ジョブで利用可能な GlueContext.forEachBatch を使用しています。リトライメカニズムの実装パターンは同じですが、主な違いは GlueContext を使った初期設定と、ストリーミング DataFrame を作成する方法です。このサンプルでは spark.readStream をレートソースと共に使用していますが、実際の AWS Glue Streaming ジョブでは、通常 glueContext.create_data_frame.from_catalog を使用して、Amazon Kinesis や Kafka などのソースからストリーミング DataFrame を作成します。詳細については、AWS Glue ストリーミング 接続を参照してください。

<database_name>は実際のデータベース名、<table_name>は実際のテーブル名、amzn-s3-demo-bucket は 実際の S3 バケット名にそれぞれ置き換えてください。

import time
import random
from py4j.protocol import Py4JJavaError
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import max as max_

CATALOG = "glue_catalog"
DATABASE = "<database_name>"
TABLE = "<table_name>"
BUCKET = "amzn-s3-demo-bucket"

spark = SparkSession.builder \
    .appName("IcebergUpsertExample") \
    .config(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config(f"spark.sql.catalog.{CATALOG}.io-impl","org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.defaultCatalog", CATALOG) \
    .config(f"spark.sql.catalog.{CATALOG}.type", "glue") \
    .getOrCreate()

sc = spark.sparkContext
glueContext = GlueContext(sc)

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {DATABASE}.{TABLE} (
        timestamp TIMESTAMP,
        value LONG
    )
    USING iceberg
    LOCATION 's3://{BUCKET}/warehouse'
""")

def backoff(attempt):
    exp_backoff = min(2 ** attempt, 60)
    jitter = random.uniform(0, 0.25 * exp_backoff)
    return exp_backoff + jitter

def is_validation_exception(java_exception):
    cause = java_exception
    while cause is not None:
        if "org.apache.iceberg.exceptions.ValidationException" in str(cause.getClass().getName()):
            return True
        cause = cause.getCause()
    return False

def upsert_with_retry(batch_df, batchId):
    max_retries = 5
    attempt = 0
    transformedDF = batch_df.selectExpr("timestamp", "value % 20 AS value") \
                           .groupBy("value") \
                           .agg(max_("timestamp").alias("timestamp"))
                           
    view_name = f"incoming_data_{batchId}"
    transformedDF.createOrReplaceGlobalTempView(view_name)
    
    while attempt < max_retries:
        try:
            spark.sql(f"""
                MERGE INTO {DATABASE}.{TABLE} AS t
                USING global_temp.{view_name} AS i
                ON t.value = i.value
                WHEN MATCHED THEN
                  UPDATE SET
                    t.timestamp = i.timestamp,
                    t.value     = i.value
                WHEN NOT MATCHED THEN
                  INSERT (timestamp, value)
                  VALUES (i.timestamp, i.value)
            """)
            print(f"[SUCCESS] Batch {batchId} processed successfully")
            return
        except Py4JJavaError as e:
            if is_validation_exception(e.java_exception):
                attempt += 1
                if attempt < max_retries:
                    delay = backoff(attempt)
                    print(f"[RETRY] Batch {batchId} failed with ValidationException. "
                          f"Retrying in {delay} seconds. Attempt {attempt}/{max_retries}")
                    time.sleep(delay)
                else:
                    print(f"[FAILED] Batch {batchId} failed after {max_retries} attempts")
                    raise

# Sample streaming query setup
streaming_df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# In actual Glue Streaming jobs, you would typically create a streaming DataFrame like this:
"""
streaming_df = glueContext.create_data_frame.from_catalog(
    database = "database",
    table_name = "table_name",
    transformation_ctx = "streaming_df",
    additional_options = {
        "startingPosition": "TRIM_HORIZON",
        "inferSchema": "false"
    }
)
"""

glueContext.forEachBatch(
    frame=streaming_df,
    batch_function=upsert_with_retry,
    options={
        "windowSize": "20 seconds",
        "checkpointLocation": f"s3://{BUCKET}/checkpointLocation"
    }
)

操作のスコープを絞り、競合の可能性を最小化する

メンテナンス操作 (コンパクションや更新など) を実行する際は、他の操作との重複を最小限に抑えるため、スコープを絞ることをお勧めします。たとえば、日付でパーティション分割されたテーブルで、ストリーミングジョブが最新の日付のデータを継続的に上書きする場合を考えてみましょう。以下は、テーブル全体をコンパクションするために rewrite_data_files プロシージャを実行するサンプルスクリプトです。

# Example of broad scope compaction
spark.sql("""
   CALL catalog_name.system.rewrite_data_files(
       table => 'db.table_name'
   )
""")

where 句でデータ分割フィルターを使用してコンパクション範囲を狭めることで、ストリーミングデータ取り込みとコンパクション操作の競合を回避できます。ストリーミングジョブは最新のパーティションで動作を続けられる一方で、コンパクションは過去のデータを処理できます。

# Narrow down the scope by partition
spark.sql("""
    CALL catalog_name.system.rewrite_data_files(
        table => 'db.table_name',
        where => 'date_partition < current_date'
    )
""")

結論

Iceberg での同時書き込みを適切に管理するには、テーブルのアーキテクチャと様々な競合シナリオを理解する必要があります。この投稿では、本番環境で信頼できる競合処理メカニズムを実装する方法を探りました。
最も重要な概念は、カタログコミット競合とデータ競合の違いを覚えておくことです。カタログコミット競合は自動再試行とテーブルプロパティの設定で対処できますが、データ競合には独自の処理ロジックを慎重に実装する必要があります。これは、rewrite_data_fileswhere 句を使用してスコープを絞ることで競合の可能性を大幅に低減できるため、コンパクション (compaction) などのメンテナンス操作を実装する際に特に重要になります。
ストリーミングパイプラインでの成功の鍵は、競合の種類を区別し、適切に対応するためのエラー処理を実装することにあります。これには、テーブルプロパティを通じて適切な再試行設定を構成し、ワークロードの特性に合わせたバックオフ戦略を実装することが含まれます。適切なタイミングでメンテナンス操作を組み合わせることで、同時書き込みを確実に処理できる、耐障害性の高いデータパイプラインを構築できます。
これらのパターンを適用し、Iceberg の同時実行モデルの基本的なメカニズムを理解することで、データの整合性と信頼性を維持しながら、同時書き込みシナリオを効果的に処理できるロバストなデータパイプラインを構築できます。


著者について

疋田 宗太郎: アナリティクスソリューションアーキテクトです。幅広い業界の顧客に対し、アナリティクスプラットフォームの構築と運用をより効果的に行えるようサポートしています。特に、ビッグデータ技術とオープンソースソフトウェアに情熱を持っています。

関山 宜孝: AWS Glue チームのプリンシパル ビッグデータ アーキテクトです。東京を拠点に活動しています。顧客を支援するためのソフトウェアアーティファクトの構築を担当しています。余暇時間には、ロードバイクで走ることを楽しんでいます。