Skip to main content

Bring Your Own Store (BYOS)

The IndexerCluster provides a convenient way to get started with PostgreSQL, but you might want to use a different database or storage system. This requires using the manual Indexer class and implementing custom Store and Connection traits from sui-indexer-alt-framework-store-traits.

Click to open

lib.rs in sui-indexer-alt-framework-store-traits

/// Represents a database connection that can be used by the indexer framework to manage watermark
/// operations, agnostic of the underlying store implementation.
#[async_trait]
pub trait Connection: Send {
/// If no existing watermark record exists, initializes it with `default_next_checkpoint`.
/// Returns the committer watermark `checkpoint_hi_inclusive`.
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>>;

/// Given a `pipeline_task` representing either a pipeline name or a pipeline with an associated
/// task (formatted as `{pipeline}{Store::DELIMITER}{task}`), return the committer watermark
/// from the `Store`. The indexer fetches this value for each pipeline added to determine which
/// checkpoint to resume processing from.
async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<CommitterWatermark>>;

/// Given a pipeline, return the reader watermark from the database. This is used by the indexer
/// to determine the new `reader_lo` or inclusive lower bound of available data.
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>>;

/// Get the bounds for the region that the pruner is allowed to prune, and the time in
/// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
/// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
/// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
/// minimizes the possibility for the pruner to delete data still expected by inflight read
/// requests.
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<PrunerWatermark>>;

/// Upsert the high watermark for the `pipeline_task` - representing either a pipeline name or a
/// pipeline with an associated task (formatted as `{pipeline}{Store::DELIMITER}{task}`) - as
/// long as it raises the watermark stored in the database. Returns a boolean indicating whether
/// the watermark was actually updated or not.
async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: CommitterWatermark,
) -> anyhow::Result<bool>;

/// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
/// will reference this as the inclusive lower bound of available data for the corresponding
/// pipeline.
///
/// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
/// the watermark entry to the current time. Ideally, this would be from the perspective of the
/// store. If this is not possible, then it should come from some other common source of time
/// between the indexer and its readers. This timestamp is critical to the indexer's operations,
/// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
/// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
/// before beginning to prune.
///
/// Returns a boolean indicating whether the watermark was actually updated or not.
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool>;

/// Update the pruner watermark, returns true if the watermark was actually updated.
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool>;
}

/// A storage-agnostic interface that provides database connections for both watermark management
/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
/// watermarks operations through its associated `Connection` type. This store is also passed to the
/// pipeline handlers to perform arbitrary writes to the store.
#[async_trait]
pub trait Store: Send + Sync + 'static + Clone {
type Connection<'c>: Connection
where
Self: 'c;

/// Delimiter used to separate pipeline names from task identifiers when reading or writing the
/// committer watermark.
const DELIMITER: &'static str = "@";

async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
}

When to use BYOS:

  • Different database: MongoDB, CouchDB, or other non-PostgreSQL databases. This also applies if you prefer to use PostgreSQL but without the default Diesel ORM.
  • Custom requirements: Specialized storage logic, partitioning, or performance optimizations.

Core implementation requirements

To implement BYOS, you need to:

  1. Define your Store and Connection struct that manages connections.
  2. Implement the Store trait for connection management.
  3. Implement the Connection trait for watermark operations.
  4. Use manual Indexer instead of IndexerCluster.

Step 1: Define your store structure

use sui_indexer_alt_framework::store::{Store, Connection};
use async_trait::async_trait;

#[derive(Clone)]
pub struct MyCustomStore {
// Your database connection details
connection_pool: MyDatabasePool,
config: MyConfig,
}

pub struct MyCustomConnection<'a> {
// A connection instance
conn: MyDatabaseConnection<'a>,
}

Step 2: Implement the Store trait

The Store trait manages the connection lifecycle:

#[async_trait]
impl Store for MyCustomStore {
type Connection<'c> = MyCustomConnection<'c>;

async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
// Your implementation
}
}

Step 3: Implement the Connection trait

The Connection trait handles watermark operations for pipeline coordination:

#[async_trait]
impl Connection for MyCustomConnection<'_> {
// Get the highest checkpoint processed by a pipeline
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>> {
// Query your database for watermark data
todo!("Implement based on your storage system")
}

// Get the lowest available checkpoint for readers
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>> {
// Implementation depends on your database schema
todo!("Implement based on your storage system")
}

// Implement other required methods...
}

For a complete reference, study the sui-pg-db implementation on Connection:

#[async_trait]
impl store::Connection for Connection<'_> {
async fn init_watermark(
&mut self,
pipeline_task: &str,
default_next_checkpoint: u64,
) -> anyhow::Result<Option<u64>> {
let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
// Do not create a watermark record with checkpoint_hi_inclusive = -1.
return Ok(self
.committer_watermark(pipeline_task)
.await?
.map(|w| w.checkpoint_hi_inclusive));
};

let stored_watermark = StoredWatermark {
pipeline: pipeline_task.to_string(),
epoch_hi_inclusive: 0,
checkpoint_hi_inclusive: checkpoint_hi_inclusive as i64,
tx_hi: 0,
timestamp_ms_hi_inclusive: 0,
reader_lo: default_next_checkpoint as i64,
pruner_timestamp: Utc::now().naive_utc(),
pruner_hi: default_next_checkpoint as i64,
};

use diesel::pg::upsert::excluded;
let checkpoint_hi_inclusive: i64 = diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
// Use `do_update` instead of `do_nothing` to return the existing row with `returning`.
.do_update()
// When using `do_update`, at least one change needs to be set, so set the pipeline to itself (nothing changes).
// `excluded` is a virtual table containing the existing row that there was a conflict with.
.set(watermarks::pipeline.eq(excluded(watermarks::pipeline)))
.returning(watermarks::checkpoint_hi_inclusive)
.get_result(self)
.await?;

Ok(Some(checkpoint_hi_inclusive as u64))
}

async fn committer_watermark(
&mut self,
pipeline_task: &str,
) -> anyhow::Result<Option<store::CommitterWatermark>> {
let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
.select((
watermarks::epoch_hi_inclusive,
watermarks::checkpoint_hi_inclusive,
watermarks::tx_hi,
watermarks::timestamp_ms_hi_inclusive,
))
.filter(watermarks::pipeline.eq(pipeline_task))
.first(self)
.await
.optional()?;

if let Some(watermark) = watermark {
Ok(Some(store::CommitterWatermark {
epoch_hi_inclusive: watermark.0 as u64,
checkpoint_hi_inclusive: watermark.1 as u64,
tx_hi: watermark.2 as u64,
timestamp_ms_hi_inclusive: watermark.3 as u64,
}))
} else {
Ok(None)
}
}

async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<store::ReaderWatermark>> {
let watermark: Option<(i64, i64)> = watermarks::table
.select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;

if let Some(watermark) = watermark {
Ok(Some(store::ReaderWatermark {
checkpoint_hi_inclusive: watermark.0 as u64,
reader_lo: watermark.1 as u64,
}))
} else {
Ok(None)
}
}

async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<store::PrunerWatermark>> {
// |---------- + delay ---------------------|
// |--- wait_for ---|
// |-----------------------|----------------|
// ^ ^
// pruner_timestamp NOW()
let wait_for = sql!(as BigInt,
"CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
delay.as_millis() as i64,
);

let watermark: Option<(i64, i64, i64)> = watermarks::table
.select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;

if let Some(watermark) = watermark {
Ok(Some(store::PrunerWatermark {
wait_for_ms: watermark.0,
pruner_hi: watermark.1 as u64,
reader_lo: watermark.2 as u64,
}))
} else {
Ok(None)
}
}

async fn set_committer_watermark(
&mut self,
pipeline_task: &str,
watermark: store::CommitterWatermark,
) -> anyhow::Result<bool> {
// Create a StoredWatermark directly from CommitterWatermark
let stored_watermark = StoredWatermark {
pipeline: pipeline_task.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
tx_hi: watermark.tx_hi as i64,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
reader_lo: 0,
pruner_timestamp: DateTime::UNIX_EPOCH.naive_utc(),
pruner_hi: 0,
};

use diesel::query_dsl::methods::FilterDsl;
Ok(diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
.do_update()
.set((
watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
watermarks::tx_hi.eq(stored_watermark.tx_hi),
watermarks::timestamp_ms_hi_inclusive
.eq(stored_watermark.timestamp_ms_hi_inclusive),
))
.filter(
watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
)
.execute(self)
.await?
> 0)
}

async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set((
watermarks::reader_lo.eq(reader_lo as i64),
watermarks::pruner_timestamp.eq(diesel::dsl::now),
))
.filter(watermarks::pipeline.eq(pipeline))
.filter(watermarks::reader_lo.lt(reader_lo as i64))
.execute(self)
.await?
> 0)
}

async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set(watermarks::pruner_hi.eq(pruner_hi as i64))
.filter(watermarks::pipeline.eq(pipeline))
.execute(self)
.await?
> 0)
}
}

Step 4: Use manual indexer

Replace IndexerCluster with manual Indexer:

use sui_indexer_alt_framework::{Indexer, IndexerArgs};
use sui_indexer_alt_framework::ingestion::{
ClientArgs, IngestionConfig,
ingestion_client::IngestionClientArgs,
};

async fn main() -> anyhow::Result<()> {
// Initialize your custom store
let store = MyCustomStore::new(config).await?;

// Configure indexer manually
let indexer = Indexer::new(
store,
IndexerArgs::default(),
ClientArgs {
ingestion: IngestionClientArgs {
remote_store_url: Some("https://checkpoints.testnet.sui.io".to_string()),
..Default::default()
},
..Default::default()
},
IngestionConfig::default(),
&prometheus::Registry::new(),
tokio_util::sync::CancellationToken::new(),
).await?;

// Add your pipelines
indexer.concurrent_pipeline(
YourHandler::default(),
ConcurrentConfig::default(),
).await?;

// Start the indexer
indexer.run().await?;
Ok(())
}

Example: ClickHouse implementation

For a complete working example of BYOS with ClickHouse (a high-performance columnar database for analytics), see the example project in the Sui repo.

Click to open

ClickHouse example README

ClickHouse Sui Indexer

A simple example of how to build a custom Sui indexer that writes transaction data to ClickHouse.

Quick Start

1. Start ClickHouse

docker run -d --name clickhouse-dev -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse/clickhouse-server

2. Set up database user

docker exec clickhouse-dev clickhouse-client --query "CREATE USER IF NOT EXISTS dev IDENTIFIED WITH no_password"
docker exec clickhouse-dev clickhouse-client --query "GRANT CREATE, INSERT, SELECT, ALTER, UPDATE, DELETE ON *.* TO dev"

3. Run the indexer

cargo run -- --remote-store-url https://checkpoints.testnet.sui.io --last-checkpoint=10

That’s it! The indexer will:

  • Create the necessary tables automatically
  • Fetch checkpoints from the Sui testnet
  • Write transaction data to ClickHouse

Verify Data

Check that data was written:

docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT COUNT(*) FROM transactions"
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT * FROM transactions LIMIT 5"

Clean Up

Stop and remove the ClickHouse container:

docker stop clickhouse-dev && docker rm clickhouse-dev

What This Example Shows

  • Custom Store Implementation: How to implement the Store trait for ClickHouse
  • Concurrent Pipeline: Uses the concurrent pipeline for better pruning and watermark testing
  • Watermark Management: Tracking indexer progress with committer, reader, and pruner watermarks
  • Transaction Processing: Extracting and storing transaction digests from checkpoints
  • Simple Setup: Minimal configuration for local development

Architecture

Sui Network → Checkpoints → Concurrent Pipeline → ClickHouse Store → ClickHouse DB

The indexer uses a concurrent pipeline that processes checkpoints out-of-order with separate reader, committer, and pruner components. This is ideal for testing watermark functionality and pruning behavior.

This example demonstrates:

  • Custom store implementation using the ClickHouse Rust client.
  • Watermark persistence with ClickHouse-specific SQL syntax.
  • Transaction digest indexing similar to the built-in PostgreSQL handler.

The example includes 3 main components:

  1. store.rs - ClickHouseStore implementing Store and Connection traits.

    Click to open

    store.rs

    use anyhow::Result;
    use async_trait::async_trait;
    use chrono::Utc;
    use clickhouse::{Client, Row};
    use scoped_futures::ScopedBoxFuture;
    use serde::{Deserialize, Serialize};
    use std::time::Duration;
    use sui_indexer_alt_framework::store::{
    CommitterWatermark, Connection, PrunerWatermark, ReaderWatermark, Store, TransactionalStore,
    };
    use url::Url;

    #[derive(Clone)]
    pub struct ClickHouseStore {
    client: Client,
    }

    pub struct ClickHouseConnection {
    pub client: Client,
    }

    /// Row structure for watermark table operations
    #[derive(Row, Serialize, Deserialize, Debug, Default)]
    struct WatermarkRow {
    pipeline_task: String,
    epoch_hi_inclusive: u64,
    checkpoint_hi_inclusive: u64,
    tx_hi: u64,
    timestamp_ms_hi_inclusive: u64,
    reader_lo: u64,
    pruner_hi: u64,
    pruner_timestamp: u64, // Unix timestamp in milliseconds
    }

    impl ClickHouseStore {
    pub fn new(url: Url) -> Self {
    let client = Client::default()
    .with_url(url.as_str())
    .with_user("dev") // Simple user for local development
    .with_compression(clickhouse::Compression::Lz4);
    Self { client }
    }

    /// Create tables if they don't exist
    pub async fn create_tables_if_not_exists(&self) -> Result<()> {
    // Create watermarks table for pipeline state management
    self.client
    .query(
    "
    CREATE TABLE IF NOT EXISTS watermarks
    (
    pipeline String,
    epoch_hi_inclusive UInt64,
    checkpoint_hi_inclusive UInt64,
    tx_hi UInt64,
    timestamp_ms_hi_inclusive UInt64,
    reader_lo UInt64,
    pruner_hi UInt64,
    pruner_timestamp UInt64
    )
    ENGINE = MergeTree()
    ORDER BY pipeline
    ",
    )
    .execute()
    .await?;

    // Create transactions table for the actual indexing data
    self.client
    .query(
    "
    CREATE TABLE IF NOT EXISTS transactions
    (
    checkpoint_sequence_number UInt64,
    transaction_digest String,
    indexed_at DateTime64(3, 'UTC') DEFAULT now()
    )
    ENGINE = MergeTree()
    ORDER BY checkpoint_sequence_number
    ",
    )
    .execute()
    .await?;

    Ok(())
    }
    }

    #[async_trait]
    impl Store for ClickHouseStore {
    type Connection<'c> = ClickHouseConnection;

    async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>> {
    Ok(ClickHouseConnection {
    client: self.client.clone(),
    })
    }
    }

    #[async_trait]
    impl TransactionalStore for ClickHouseStore {
    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
    where
    R: Send + 'a,
    F: Send + 'a,
    F: for<'r> FnOnce(
    &'r mut Self::Connection<'_>,
    ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
    {
    let mut conn = self.connect().await?;
    f(&mut conn).await
    }
    }

    #[async_trait]
    impl Connection for ClickHouseConnection {
    async fn init_watermark(
    &mut self,
    pipeline_task: &str,
    default_next_checkpoint: u64,
    ) -> anyhow::Result<Option<u64>> {
    let existing = self.committer_watermark(pipeline_task).await?;

    let Some(checkpoint_hi_inclusive) = default_next_checkpoint.checked_sub(1) else {
    return Ok(existing.map(|w| w.checkpoint_hi_inclusive));
    };

    if let Some(existing) = existing {
    return Ok(Some(existing.checkpoint_hi_inclusive));
    }

    let mut inserter = self.client.inserter("watermarks")?;
    inserter.write(&WatermarkRow {
    pipeline_task: pipeline_task.to_string(),
    epoch_hi_inclusive: 0,
    checkpoint_hi_inclusive,
    tx_hi: 0,
    timestamp_ms_hi_inclusive: 0,
    reader_lo: default_next_checkpoint,
    pruner_hi: default_next_checkpoint,
    pruner_timestamp: 0,
    })?;

    inserter.end().await?;
    Ok(Some(checkpoint_hi_inclusive))
    }

    async fn committer_watermark(&mut self, pipeline: &str) -> Result<Option<CommitterWatermark>> {
    let mut cursor = self
    .client
    .query(
    "SELECT epoch_hi_inclusive, checkpoint_hi_inclusive, tx_hi, timestamp_ms_hi_inclusive
    FROM watermarks
    WHERE pipeline = ?
    ORDER BY pruner_timestamp DESC
    LIMIT 1"
    )
    .bind(pipeline)
    .fetch::<(u64, u64, u64, u64)>()?;

    let row: Option<(u64, u64, u64, u64)> = cursor.next().await?;
    Ok(row.map(
    |(epoch_hi, checkpoint_hi, tx_hi, timestamp_hi)| CommitterWatermark {
    epoch_hi_inclusive: epoch_hi,
    checkpoint_hi_inclusive: checkpoint_hi,
    tx_hi,
    timestamp_ms_hi_inclusive: timestamp_hi,
    },
    ))
    }

    async fn reader_watermark(
    &mut self,
    pipeline: &'static str,
    ) -> Result<Option<ReaderWatermark>> {
    let mut cursor = self
    .client
    .query(
    "SELECT checkpoint_hi_inclusive, reader_lo
    FROM watermarks
    WHERE pipeline = ?
    ORDER BY pruner_timestamp DESC
    LIMIT 1",
    )
    .bind(pipeline)
    .fetch::<(u64, u64)>()?;

    let row: Option<(u64, u64)> = cursor.next().await?;
    Ok(row.map(|(checkpoint_hi, reader_lo)| ReaderWatermark {
    checkpoint_hi_inclusive: checkpoint_hi,
    reader_lo,
    }))
    }

    async fn pruner_watermark(
    &mut self,
    pipeline: &'static str,
    delay: Duration,
    ) -> Result<Option<PrunerWatermark>> {
    // Follow PostgreSQL pattern: calculate wait_for_ms on database side
    // We do this so that we can rely on the database to keep a consistent sense of time.
    // Using own clocks can potentially be subject to some clock skew.
    let delay_ms = delay.as_millis() as i64;
    let mut cursor = self
    .client
    .query(
    "SELECT reader_lo, pruner_hi,
    toInt64(? + (pruner_timestamp - toUnixTimestamp64Milli(now64()))) as wait_for_ms
    FROM watermarks
    WHERE pipeline = ?
    ORDER BY pruner_timestamp DESC
    LIMIT 1"
    )
    .bind(delay_ms)
    .bind(pipeline)
    .fetch::<(u64, u64, i64)>()?;

    let row: Option<(u64, u64, i64)> = cursor.next().await?;
    Ok(
    row.map(|(reader_lo, pruner_hi, wait_for_ms)| PrunerWatermark {
    wait_for_ms,
    reader_lo,
    pruner_hi,
    }),
    )
    }

    async fn set_committer_watermark(
    &mut self,
    pipeline: &str,
    watermark: CommitterWatermark,
    ) -> Result<bool> {
    // Follow PostgreSQL pattern: check if row exists, then UPDATE or INSERT accordingly

    // First check if pipeline exists and get current checkpoint
    let mut cursor = self
    .client
    .query("SELECT checkpoint_hi_inclusive FROM watermarks WHERE pipeline = ? LIMIT 1")
    .bind(pipeline)
    .fetch::<u64>()?;

    let existing_checkpoint: Option<u64> = cursor.next().await?;

    if let Some(existing_checkpoint) = existing_checkpoint {
    // Row exists - only update if checkpoint advances
    if existing_checkpoint < watermark.checkpoint_hi_inclusive {
    self.client
    .query(
    "ALTER TABLE watermarks
    UPDATE
    epoch_hi_inclusive = ?,
    checkpoint_hi_inclusive = ?,
    tx_hi = ?,
    timestamp_ms_hi_inclusive = ?
    WHERE pipeline = ?",
    )
    .bind(watermark.epoch_hi_inclusive)
    .bind(watermark.checkpoint_hi_inclusive)
    .bind(watermark.tx_hi)
    .bind(watermark.timestamp_ms_hi_inclusive)
    .bind(pipeline)
    .execute()
    .await?;
    }
    } else {
    // No existing row - insert new one
    let mut inserter = self.client.inserter("watermarks")?;
    inserter.write(&WatermarkRow {
    pipeline_task: pipeline.to_string(),
    epoch_hi_inclusive: watermark.epoch_hi_inclusive,
    checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
    tx_hi: watermark.tx_hi,
    timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive,
    reader_lo: 0, // Will be updated by reader
    pruner_hi: 0, // Will be updated by pruner
    pruner_timestamp: Utc::now().timestamp_millis() as u64,
    })?;
    inserter.end().await?;
    }

    Ok(true)
    }

    async fn set_reader_watermark(
    &mut self,
    pipeline: &'static str,
    reader_lo: u64,
    ) -> Result<bool> {
    // Follow PostgreSQL pattern: simple UPDATE with timestamp update and advancement check
    self.client
    .query(
    "ALTER TABLE watermarks
    UPDATE reader_lo = ?, pruner_timestamp = toUnixTimestamp64Milli(now64())
    WHERE pipeline = ? AND reader_lo < ?",
    )
    .bind(reader_lo)
    .bind(pipeline)
    .bind(reader_lo)
    .execute()
    .await?;

    Ok(true)
    }

    async fn set_pruner_watermark(
    &mut self,
    pipeline: &'static str,
    pruner_hi: u64,
    ) -> Result<bool> {
    // Follow PostgreSQL pattern: simple UPDATE statement
    self.client
    .query(
    "ALTER TABLE watermarks
    UPDATE pruner_hi = ?
    WHERE pipeline = ?",
    )
    .bind(pruner_hi)
    .bind(pipeline)
    .execute()
    .await?;

    Ok(true)
    }
    }
  2. handlers.rs - TxDigest handler processing checkpoint data.

    Click to open

    handlers.rs

    use anyhow::Result;
    use clickhouse::Row;
    use serde::Serialize;
    use std::sync::Arc;

    use sui_indexer_alt_framework::{
    FieldCount,
    pipeline::{
    Processor,
    concurrent::{BatchStatus, Handler},
    },
    store::Store,
    types::full_checkpoint_content::Checkpoint,
    };

    use crate::store::ClickHouseStore;

    /// Structure representing a transaction digest record in ClickHouse
    /// Aligned with sui-indexer-alt's StoredTxDigest structure
    #[derive(Row, Serialize, Clone, Debug, FieldCount)]
    pub struct StoredTxDigest {
    pub tx_sequence_number: i64,
    pub tx_digest: Vec<u8>,
    }

    /// Handler that processes checkpoint data and extracts transaction digests
    /// Named to align with sui-indexer-alt's TxDigests handler
    #[derive(Clone, Default)]
    pub struct TxDigests;

    #[async_trait::async_trait]
    impl Processor for TxDigests {
    const NAME: &'static str = "tx_digests";
    type Value = StoredTxDigest;

    async fn process(&self, checkpoint: &Arc<Checkpoint>) -> Result<Vec<Self::Value>> {
    let Checkpoint {
    transactions,
    summary,
    ..
    } = checkpoint.as_ref();

    let first_tx = summary.network_total_transactions as usize - transactions.len();

    Ok(transactions
    .iter()
    .enumerate()
    .map(|(i, tx)| StoredTxDigest {
    tx_sequence_number: (first_tx + i) as i64,
    tx_digest: tx.transaction.digest().inner().to_vec(),
    })
    .collect())
    }
    }

    #[async_trait::async_trait]
    impl Handler for TxDigests {
    type Store = ClickHouseStore;
    type Batch = Vec<Self::Value>;

    fn batch(
    &self,
    batch: &mut Self::Batch,
    values: &mut std::vec::IntoIter<Self::Value>,
    ) -> BatchStatus {
    batch.extend(values);
    BatchStatus::Pending
    }

    async fn commit<'a>(
    &self,
    values: &Self::Batch,
    conn: &mut <Self::Store as Store>::Connection<'a>,
    ) -> Result<usize> {
    let row_count = values.len();
    if row_count == 0 {
    return Ok(0);
    }

    // Use ClickHouse inserter for efficient bulk inserts
    let mut inserter = conn.client.inserter("tx_digests")?;
    for tx_digest in values {
    inserter.write(tx_digest)?;
    }
    inserter.end().await?;

    Ok(row_count)
    }
    }
  3. main.rs - Manual indexer setup with ClickHouse backend.

    Click to open

    main.rs

    mod handlers;
    mod store;

    use anyhow::{Result, bail};
    use clap::Parser;
    use sui_indexer_alt_framework::{
    Indexer, IndexerArgs,
    ingestion::{ClientArgs, IngestionConfig},
    pipeline::concurrent::ConcurrentConfig,
    service::Error,
    };
    use url::Url;

    use handlers::TxDigests;
    use store::ClickHouseStore;

    #[derive(clap::Parser, Debug, Clone)]
    struct Args {
    #[clap(flatten)]
    pub indexer_args: IndexerArgs,

    #[clap(flatten)]
    pub client_args: ClientArgs,
    }

    #[tokio::main]
    async fn main() -> Result<()> {
    // Initialize crypto provider for HTTPS connections (needed for remote checkpoint fetching)
    rustls::crypto::ring::default_provider()
    .install_default()
    .expect("Failed to install crypto provider");

    // Parse command-line arguments
    let args = Args::parse();

    // ClickHouse connection (uses 'dev' user by default for local development)
    let clickhouse_url = "http://localhost:8123".parse::<Url>()?;

    println!("Connecting to ClickHouse at: {}", clickhouse_url);

    // Create our custom ClickHouse store
    let store = ClickHouseStore::new(clickhouse_url);

    // Ensure the database tables are created before starting the indexer
    store.create_tables_if_not_exists().await?;

    // Manually build the indexer with our custom ClickHouse store
    // This is the key difference from basic-sui-indexer which uses IndexerCluster::builder()
    let mut indexer = Indexer::new(
    store.clone(),
    args.indexer_args,
    args.client_args,
    IngestionConfig::default(),
    None, // No metrics prefix
    &Default::default(), // Empty prometheus registry
    )
    .await?;

    // Register our concurrent pipeline handler (better for testing pruning)
    // This processes checkpoints with separate reader and pruner components
    indexer
    .concurrent_pipeline(
    TxDigests,
    // ConcurrentConfig default comes with no pruning.
    ConcurrentConfig::default(),
    )
    .await?;

    println!("Starting ClickHouse Sui indexer...");

    // Start the indexer and wait for it to complete
    match indexer.run().await?.main().await {
    Ok(()) | Err(Error::Terminated) => Ok(()),
    Err(Error::Aborted) => {
    bail!("Indexer aborted due to an unexpected error")
    }
    Err(Error::Task(e)) => {
    bail!(e)
    }
    }
    }

Deserializing Move events

When Move smart contracts execute on Sui, they can emit events using the sui::event module. These events are stored in checkpoints as BCS-serialized bytes that your indexer needs to deserialize to extract meaningful data.

Why deserialization is needed

Move contracts emit events like the following:

// Move smart contract
use sui::event;

public fun transfer_balance(...) {
event::emit(BalanceEvent {
balance_manager_id: id,
asset: asset_id,
amount: 100,
deposit: true
});
}

In checkpoint data, these events arrive as raw BCS bytes that need to be converted back to Rust structs for processing.

Step-by-step deserialization

  1. Add BCS dependency.

    [dependencies]
    bcs = "0.1.6"
    serde = { version = "1.0", features = ["derive"] }
  2. Define the Event struct in Rust.

    Define the same structure in Rust as declared in Move. You can do this manually or use move-binding to auto-generate it from on-chain packages.

    use serde::Deserialize;
    use sui_indexer_alt_framework::types::::base_types::ObjectID;

    #[derive(Deserialize, Debug)]
    struct BalanceEvent {
    balance_manager_id: ObjectID,
    asset: ObjectID,
    amount: u64,
    deposit: bool,
    }
    important

    Field order and types must match the Move event exactly.

  3. Extract event bytes in your processor.

    impl Processor for YourHandler {
    fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
    let mut results = Vec::new();

    for transaction in &checkpoint.transactions {
    for event in &transaction.events {
    // Get the raw BCS bytes
    let event_bytes = &event.contents;

    // Deserialize to your Rust struct
    if let Ok(balance_event) = bcs::from_bytes::<BalanceEvent>(event_bytes) {
    // Do something
    }
    }
    }

    Ok(results)
    }
    }
Sui Indexer Alt

The sui-indexer-alt crate in the Sui repo.

Move Registry

The indexer that the Move Registry (MVR) implements.

DeepBook Indexer

The indexer that DeepBook implements.

Custom Indexers

The sui-indexer-alt-framework is a powerful Rust framework for building high-performance, custom blockchain indexers on Sui. It provides customizable, production-ready components for data ingestion, processing, and storage.

Indexer Pipeline Architecture

The sui-indexer-alt-framework provides two distinct pipeline architectures. Understand the differences between the sequential and concurrent pipelines that the sui-indexer-alt-framework provides to decide which best suits your project needs.