# Indexer Pipeline Architecture

*[Documentation index](/llms.txt) · [Full index](/llms-full.txt)*

The `sui-indexer-alt-framework` provides two distinct pipeline architectures. Understand their differences to choose the right approach.

## Sequential versus concurrent pipelines

[Sequential pipelines](#sequential-pipeline-architecture) commit complete checkpoints in order. Each checkpoint is fully committed before the next one, ensuring straightforward, consistent reads.

[Concurrent pipelines](#concurrent-pipeline-architecture) commit out-of-order and can commit individual checkpoints partially. This allows you to process multiple checkpoints simultaneously for higher throughput, but requires reads to check which data is fully committed to ensure consistency.

## When to use each pipeline

Both pipeline types can handle updates in place, aggregations, and complex business logic. While sequential pipelines have throughput limitations compared to concurrent, base your decision primarily on engineering complexity rather than performance needs.

### Recommended: Sequential pipeline

Start here for most use cases. Provides more straightforward implementation and maintenance.

✓ You want straightforward implementation with direct commits and basic queries.
✓ Team prefers predictable, easy-to-debug behavior.
✓ Current performance meets your requirements.
✓ Operational simplicity is valued.

### Concurrent pipeline

Consider implementing a concurrent pipeline when:

✓ You need performance optimization.
✓ Sequential processing does not keep up with your data volume.
✓ Your team is willing to handle the additional implementation complexity for the performance benefits.

Out-of-order commits introduce a few additional complexities to your pipeline:

- Watermark-aware queries: Check which data the pipeline fully committed in all reads. See [the watermark system](#watermark-system) section for details.
- Complex application logic: Commit data in pieces rather than handle complete checkpoints.

## Decision framework

If you are unsure of which pipeline to choose for your project, start with a sequential pipeline as it is easier to implement and debug. Then, measure performance under a realistic load. If the sequential pipeline cannot meet your project's requirements, switch to a concurrent pipeline.

While not an exhaustive list, some specific scenarios where a sequential pipeline might not meet requirements include:

- Your pipeline benefits from chunking and out-of-order commits for data produced in each checkpoint. Individual checkpoints can produce lots of data or individual writes that might add latency.
- You produce a lot of data that needs pruning. In this case, you must use a concurrent pipeline.

Beyond the decision of which pipeline to use, you also need to consider scaling. If you are indexing multiple kinds of data, then consider using multiple pipelines and watermarks.

## Common pipeline components

Both sequential and concurrent pipelines share common components and concepts. Understanding these shared elements helps clarify how the two architectures differ.

### Processor component {#processor}

The `Processor` is the concurrent processing engine, handling multiple tasks running at the same time for maximum throughput. Its primary responsibility is to convert raw checkpoint data into database-ready rows using parallel workers.

The component handles this task by spawning `FANOUT` worker tasks (default: 10) for parallel processing. The `FANOUT` is the key configuration as it controls parallel processing capacity.

```
// Source: crates/sui-indexer-alt-framework/src/pipeline/processor.rs
```

Each worker calls your `Handler::process()` method independently.

```
// Source: crates/sui-indexer-alt-framework/src/pipeline/processor.rs
```

Each of these workers can process different checkpoints simultaneously and in any order. The workers send their processed data to the `Collector` with checkpoint metadata.

The `Processor` component works identically in both sequential and concurrent pipelines. It receives checkpoint data from the `Broadcaster`, transforms it using your custom logic, and forwards the processed results to the next stage in the pipeline.

### Watermark concepts summary

Before diving into pipeline-specific architectures, understand the three types of watermarks used for coordination:

| Watermark | Purpose | Pipeline type |
|-----------|---------|---------------|
| `checkpoint_hi_inclusive` | Highest checkpoint where all data is committed (no gaps) | Both pipelines for recovery and progress tracking |
| `reader_lo` | Lowest checkpoint guaranteed to be available for queries | Concurrent pipelines with pruning enabled |
| `pruner_hi` | Highest checkpoint that has been pruned (deleted) | Concurrent pipelines with pruning enabled |

These watermarks work together to enable safe out-of-order processing, automatic data cleanup, and recovery from failures.

## The watermark system {#watermark-system}

For each pipeline, the indexer tracks at minimum the highest checkpoint where all data up to that point is committed. The indexer tracks this through the `checkpoint_hi_inclusive` committer watermark. Both concurrent and sequential pipelines rely on `checkpoint_hi_inclusive` to understand where to resume processing on restarts.

Optionally, the pipeline tracks `reader_lo` and `pruner_hi`, which define safe lower bounds for reading and pruning operations, if pruning is enabled. These watermarks are particularly crucial for concurrent pipelines to maintain data integrity while enabling out-of-order processing.

### Safe pruning

The watermark system creates a robust data lifecycle management system:

1. **Guaranteed data availability:** Enforcing checkpoint data availability rules ensures readers perform safe queries.
1. **Automatic cleanup process:** The pipeline frequently cleans unpruned checkpoints to ensure storage does not grow indefinitely while maintaining the retention guarantee. The pruning process runs with a safety delay to avoid race conditions.
1. **Balanced approach:** The system strikes a balance between safety and efficiency.
    - Storage efficiency: Old data gets automatically deleted.
    - Data availability: Always maintains retention amount of complete data.
    - Safety guarantees: Readers never encounter missing data gaps.
    - Performance: Out-of-order processing maximizes throughput.

This watermark system is what makes concurrent pipelines both high-performance and reliable, enabling massive throughput while maintaining strong data availability guarantees and automatic storage management.

### Scenario 1: Basic watermark (no pruning)

With pruning disabled, the indexer reports only each pipeline committer `checkpoint_hi_inclusive`. Consider the following timeline, where a number of checkpoints are being processed and some are committed out of order.

```sh
Checkpoint Processing Timeline:

[1000] [1001] [1002] [1003] [1004] [1005]
  ✓      ✓      ✗      ✓      ✗      ✗
         ^
  checkpoint_hi_inclusive = 1001

✓ = Committed (all data written)
✗ = Not Committed (processing or failed)
```

In this scenario, the `checkpoint_hi_inclusive` is at 1001, even though checkpoint 1003 is committed, because there is still a gap at 1002. The indexer reports the high watermark at 1001 to satisfy the guarantee that all data from start to `checkpoint_hi_inclusive` is available.

After you commit checkpoint 1002, you can safely read data up to 1003.

```sh
[1000] [1001] [1002] [1003] [1004] [1005]
  ✓      ✓      ✓      ✓      ✗       ✗
[---- SAFE TO READ -------]
(start   →   checkpoint_hi_inclusive at 1003)
```

### Scenario 2: Pruning enabled

You enable pruning for pipelines configured with a retention policy. For example, if your table is growing too large and you want to keep only the last 4 checkpoints, then `retention = 4`. This means that the indexer periodically updates `reader_lo` as the difference between `checkpoint_hi_inclusive` and the configured retention. A separate pruning task prunes data between `[pruner_hi, reader_lo]`.

```sh
[998] [999] [1000] [1001] [1002] [1003] [1004] [1005] [1006]
 🗑️    🗑️     ✓      ✓      ✓      ✓      ✗      ✗      ✗
              ^                    ^
       reader_lo = 1000       checkpoint_hi_inclusive = 1003

🗑️ = Pruned (deleted)
✓ = Committed
✗ = Not Committed
```

Current watermarks:

- `checkpoint_hi_inclusive` = 1003:
       - All data from start to 1003 exists (no gaps).
       - Cannot advance to 1005 because 1004 is not yet committed (gap).

- `reader_lo` = 1000:
       - Lowest checkpoint the pipeline guarantees is available.
       - Calculated as: `reader_lo = checkpoint_hi_inclusive - retention + 1`.
       - `reader_lo` = 1003 - 4 + 1 = 1000.

- `pruner_hi` = 1000:
       - Highest exclusive checkpoint the pipeline deleted.
       - The pruner deleted checkpoints 998 and 999 to save space.

Clear safe zones:

```sh
[998] [999] [1000] [1001] [1002] [1003] [1004] [1005] [1006]
 🗑️    🗑️     ✓      ✓      ✓      ✓      ✗      ✗      ✓

[--PRUNED--][--- Safe Reading Zone ---] [--- Processing ---]
```

### How watermarks progress over time

**Step 1:** Checkpoint 1004 completes.

```sh
[999] [1000] [1001] [1002] [1003] [1004] [1005] [1006] [1007]
 🗑️     ✓      ✓      ✓      ✓      ✓      ✗      ✓      ✗
        ^                           ^
 reader_lo = 1000           checkpoint_hi_inclusive = 1004 (advanced by 1)
 pruner_hi = 1000
```

With checkpoint 1004 now committed, `checkpoint_hi_inclusive` advances from 1003 to 1004 because no gaps exist up to 1004. The `reader_lo` and `pruner_hi` values have not changed yet.

**Step 2:** Reader watermark updates periodically.

```sh
[999] [1000] [1001] [1002] [1003] [1004] [1005] [1006] [1007]
 🗑️     ✓      ✓      ✓      ✓      ✓      ✗      ✓      ✗
               ^                   ^
        reader_lo = 1001    checkpoint_hi_inclusive = 1004
        (1004 - 4 + 1 = 1001)

pruner_hi = 1000 (unchanged as pruner hasn't run yet)
```

A separate reader watermark update task (running periodically, configurable) advances `reader_lo` to 1001 (calculated as `1004 - 4 + 1 = 1001`) based on the retention policy. However, the pruner hasn't run yet, so `pruner_hi` remains at 1000.

**Step 3:** Pruner runs after safety delay.

```sh
[999] [1000] [1001] [1002] [1003] [1004] [1005] [1006] [1007]
 🗑️     🗑️     ✓      ✓      ✓      ✓      ✗      ✓      ✗
               ^                   ^
        reader_lo = 1001    checkpoint_hi_inclusive = 1004
        pruner_hi = 1001
```

When `pruner_hi` (1000) < `reader_lo` (1001), the pruner detects checkpoints outside the retention window, cleans up all elements up to `reader_lo` (deleting checkpoint 1000), and updates `pruner_hi` to `reader_lo` (1001).

:::info
Checkpoints older than `reader_lo` might still temporarily exist because:
- Intentional delay protecting in-flight queries
- Pruner not completing cleanup yet
:::

## Sequential pipeline architecture {#sequential-pipeline-architecture}

Sequential pipelines provide a more straightforward yet powerful architecture for indexing that prioritizes ordered processing. While they sacrifice some throughput compared to concurrent pipelines, they offer stronger guarantees and are often easier to reason about.

### Architecture overview

The sequential pipeline is significantly simpler than the concurrent pipeline's six-component architecture: it has a `Processor`, a `Collector`, and a `Committer`.

![Sequential pipeline diagram](../images/architecture_sequential-pipeline_v1.png)

The `Broadcaster` and `Processor` components use identical backpressure mechanisms, adaptive parallel processing, and `processor()` implementations to the concurrent pipeline. The `Processor` component is described in detail in the [Common pipeline components](#processor) section.

The `Collector` orders out-of-order checkpoints, assembles them into batches through your `batch()` logic, and hands those batches to the `Committer`, which writes them to the database in strict checkpoint order through your `commit()` logic. The two stages run as independent tasks connected by a bounded channel (`pipeline_depth`), so the `Collector` can prepare the next batch while the `Committer` flushes the current one. Concurrent pipelines share the same `Collector` + `Committer` split but additionally have `CommitterWatermark`, `ReaderWatermark`, and `Pruner` components. None of those extra components are required in the sequential pipeline because commits and watermark updates happen in a single transaction and there is no pruning.

### Sequential pipeline components {#sequential-components}

Sequential pipelines have two pipeline-specific components in addition to the shared [Processor](#processor): the [`Collector`](#seq-collector) and the [`Committer`](#seq-committer).

#### `Collector` {#seq-collector}

The sequential `Collector` receives out-of-order processed data from the `Processor`, orders it by checkpoint sequence, and assembles batches using your `batch()` logic. The `Collector` dispatches a batch when either `collect_interval_ms` elapses or `MIN_EAGER_ROWS` have accumulated and the next expected checkpoint has arrived. The `Collector` also bounds how many checkpoints contribute to a single batch (`MAX_BATCH_CHECKPOINTS`). `MAX_PENDING_ROWS` is a soft cap: when exceeded, the `Collector` yields from eagerly draining its input channel to flush what it can. Receive itself is never hard-gated, because a missing predecessor for the next commit might still be sitting in the input channel and blocking receive would risk deadlock.

`batch()`: Data merging logic.

```rust
fn batch(&self, batch: &mut Self::Batch, values: std::vec::IntoIter<Self::Value>);

    /// Take a batch of values and commit them to the database, returning the number of rows
    /// affected.
    async fn commit<'a>(
        &self,
        batch: &Self::Batch,
        conn: &mut <Self::Store as Store>::Connection<'a>,
    ) -> anyhow::Result<usize>;
}
```

#### `Committer` {#seq-committer}

The sequential `Committer` receives fully-assembled batches from the `Collector` and writes them to the database one at a time (strict ordering is required so that watermarks advance monotonically). Each commit runs inside a single transaction that includes both the row updates and the watermark update, so commits and watermark advances are atomic. On commit failure, the `Committer` retries the same batch under exponential backoff.

`commit()`: Database write logic.

```rust
async fn commit<'a>(
        &self,
        batch: &Self::Batch,
        conn: &mut <Self::Store as Store>::Connection<'a>,
    ) -> anyhow::Result<usize>;
}
```

### Sequential pipeline backpressure mechanisms

Sequential pipelines use two layers of backpressure to prevent memory overflow and ordering-related deadlocks:

![Backpressure sequential pipeline architecture](../images/architecture_sequential-backpressure_v1.png)

#### Channel-based backpressure

Sequential pipelines use the same bounded-channel backpressure model as concurrent pipelines:

- **Broadcaster → Processor:** bounded channel with `subscriber_channel_size` slots. Send blocks when full, and the broadcaster's adaptive controller reads the channel's `len / capacity` and cuts ingest concurrency as the subscriber falls behind.
- **Processor → Collector:** `processor_channel_size` slots (defaults to `num_cpus / 2`). Drives the processor's adaptive `fanout` controller.
- **Collector → Committer:** `pipeline_depth` slots (defaults to `max(num_cpus / 2, 4)`). Lets the collector stage the next batch while the committer flushes the current one. When full, the collector blocks on send.

Downstream pressure propagates backward: the committer slows, the collector-to-committer channel fills, the collector stops draining its reorder buffer, the processor-to-collector channel fills and collapses `fanout` to the minimum, and finally the broadcaster-to-processor channel fills and the broadcaster cuts `ingest_concurrency`. The collector itself holds an unbounded in-memory buffer of pending checkpoints so that out-of-order arrivals can still form a contiguous prefix to commit.

### Performance tuning

Sequential pipelines have a more basic configuration but do require critical tuning parameters:

```rust
use sui_indexer_alt_framework::config::ConcurrencyConfig;

let config = SequentialConfig {
    committer: CommitterConfig {
        // Not applicable to sequential pipelines
        write_concurrency: 1,

        // Batch collection frequency in ms (default: 500)
        collect_interval_ms: 1000,
    },

    // Adaptive concurrency (default). Starts at 1 and scales up to num_cpus.
    fanout: None,
    // Or use fixed concurrency:
    // fanout: Some(ConcurrencyConfig::Fixed { value: 20 }),
    // Or customize adaptive bounds:
    // fanout: Some(ConcurrencyConfig::Adaptive {
    //     initial: 5,
    //     min: 1,
    //     max: 32,
    // }),

    min_eager_rows: None,
    max_pending_rows: None,
    max_batch_checkpoints: None,
    processor_channel_size: None, // defaults to num_cpus / 2
    pipeline_depth: None,         // defaults to max(num_cpus / 2, 4)

    // Per-pipeline overrides for the ingestion layer's defaults.
    ingestion: pipeline::IngestionConfig {
        // None falls back to the built-in default (max(num_cpus / 2, 4)).
        subscriber_channel_size: None,
    },
};
```

- `collect_interval_ms`: Higher values allow more checkpoints per batch, improving efficiency.
- `write_concurrency`: Not applicable to sequential pipelines (always single-threaded writes).
- `fanout`: By default, processor concurrency is adaptive: it starts at 1 and scales up to the number of CPUs based on downstream channel pressure. The controller monitors the fill fraction of the processor-to-collector channel and adjusts concurrency using a dead band between 60% and 85% fill. You can override this with fixed concurrency (`ConcurrencyConfig::Fixed`) or customize the adaptive bounds (`ConcurrencyConfig::Adaptive`). The default max of `num_cpus` is for CPU-bound processors. If your processor performs IO (for example, fetching data from an external service), you might want a higher max. The adaptive controller also exposes a `dead_band` parameter to override the fill-fraction thresholds, but the defaults should work well for most workloads.
- `ingestion.subscriber_channel_size`: Capacity of the bounded broadcaster-to-processor channel. Defaults to `max(num_cpus / 2, 4)` when `None`. A pipeline that occasionally sees bursts can raise its own capacity, but note that a larger value makes this pipeline appear "less full" to the shared controller, so it triggers throttling later than its peers.
- `processor_channel_size`: Controls the size of the channel between the processor and the collector. Defaults to `num_cpus / 2`. This channel is also the signal that drives the adaptive concurrency controller.
- `pipeline_depth`: Controls the size of the channel between the collector and the committer. Defaults to `max(num_cpus / 2, 4)`. Larger values let the collector stay further ahead of a slow committer (absorbing bursts); `1` means one batch can be staged while another is committing.
- `max_pending_rows`: Soft cap on how many rows the collector buffers before yielding to the flush phase. Unlike the concurrent pipeline's `MAX_PENDING_ROWS`, this is not a hard backpressure gate. Receive is never blocked on it, because a missing predecessor for the next commit might still be sitting in the input channel. It only bounds receive-to-flush latency in the happy path.

## Concurrent pipeline architecture {#concurrent-pipeline-architecture}

Concurrent pipelines transform raw checkpoint data into indexed database records through a sophisticated multi-stage architecture designed for maximum throughput while maintaining data integrity. The watermark system covered in the [watermark system section](#watermark-system) is fundamental to how every component coordinates.

### Architecture overview

![Concurrent Pipeline Diagram](../images/architecture_concurrent-pipeline_v1.png)

Key design principles:

- **Watermark coordination:** Safe out-of-order processing with consistency guarantees.
- **Handler abstraction:** Where your business logic plugs into the framework.
- **Automatic storage management:** Framework handles watermark tracking and data cleanup within the `Watermark` database.

### Concurrent pipeline components {#concurrent-components}

Concurrent pipelines have five pipeline-specific components in addition to the shared [Processor](#processor):

1. [`Collector`](#con-collector)
1. [`Committer`](#con-committer)
1. [`CommitterWatermark`](#con-commit-watermark)
1. [`ReaderWatermark`](#con-reader-watermark)
1. [`Pruner`](#con-pruner)

#### `Collector` {#con-collector}

The primary responsibility of the `Collector` is to buffer processed data and create user-configurable batches for database writes.

The `Collector` receives out-of-order processed data from multiple `Processor` workers. It then buffers data until reaching optimal batch size (`MIN_EAGER_ROWS`) or until a timeout is met (to preserve forward progress for quiet pipelines).

---
*Content truncated for size. See the full page on the documentation site.*
