Skip to main content

Indexer Data and Integration

Building a custom indexer on Sui lets you take full control of data ingestion, storage, and processing. You can choose from multiple checkpoint data sources such as remote store, local files, or direct RPC API access, depending on whether you're indexing Mainnet data, testing against known checkpoints, or working on a local network.

For storage, you can either use the built-in IndexerCluster with PostgreSQL or implement your own Store and Connection traits to integrate a different database or storage backend. After connected, you can wire up a manual indexer, add your custom pipelines, and handle watermark coordination to keep data in sync.

Finally, you need to deserialize Move events from raw BCS bytes into Rust structs, using bcs and serde, so that your pipelines can work with strongly-typed data. This gives you a reproducible, end-to-end setup that you can tune for performance, reliability, and custom analytics.

Checkpoint data sources

The sui-indexer-alt-framework supports three data sources for accessing Sui blockchain data. For all data sources, the indexing framework is using a polling technique to check for new checkpoints. Data sources are configured through command-line arguments.

Remote store

The most direct stream source is a subscription to a remote checkpoint store. Sui provides the following endpoints:

  • Testnet: https://checkpoints.testnet.sui.io
  • Mainnet: https://checkpoints.mainnet.sui.io
$ cargo run -- --remote-store-url https://checkpoints.testnet.sui.io

Local path

Local checkpoint files are useful for development and testing scenarios:

$ cargo run -- --local-ingestion-path /path/to/checkpoints

Common use cases:

  • Unit and integration testing with known checkpoint data
  • Development workflows with reproducible datasets

RPC API

RPC API access queries full nodes directly using gRPC:

$ cargo run -- --rpc-api-url https://fullnode.testnet.sui.io:443

Endpoint format: https://fullnode.NETWORK.sui.io:443 where NETWORK is one of the available networks:

  • testnet
  • devnet
  • mainnet

When to use RPC API:

  • Networks without official remote store (devnet, localnet, custom networks)
  • Development against local Sui networks

Bring your own store (BYOS)

The IndexerCluster provides a convenient way to get started with PostgreSQL, but you might want to use a different database or storage system. This requires using the manual Indexer class and implementing custom Store and Connection traits from sui-indexer-alt-framework-store-traits.

Click to open

lib.rs in sui-indexer-alt-framework-store-traits

/// Represents a database connection that can be used by the indexer framework to manage watermark
/// operations, agnostic of the underlying store implementation.
#[async_trait]
pub trait Connection: Send {
/// Given a pipeline, return the committer watermark from the `Store`. This is used by the
/// indexer on startup to determine which checkpoint to resume processing from.
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>>;

/// Given a pipeline, return the reader watermark from the database. This is used by the indexer
/// to determine the new `reader_lo` or inclusive lower bound of available data.
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>>;

/// Get the bounds for the region that the pruner is allowed to prune, and the time in
/// milliseconds the pruner must wait before it can begin pruning data for the given `pipeline`.
/// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
/// `reader_lo` (exclusive) after waiting until `pruner_timestamp + delay` has passed. This
/// minimizes the possibility for the pruner to delete data still expected by inflight read
/// requests.
async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<PrunerWatermark>>;

/// Upsert the high watermark as long as it raises the watermark stored in the database. Returns
/// a boolean indicating whether the watermark was actually updated or not.
async fn set_committer_watermark(
&mut self,
pipeline: &'static str,
watermark: CommitterWatermark,
) -> anyhow::Result<bool>;

/// Update the `reader_lo` of an existing watermark entry only if it raises `reader_lo`. Readers
/// will reference this as the inclusive lower bound of available data for the corresponding
/// pipeline.
///
/// If an update is to be made, some timestamp (i.e `pruner_timestamp`) should also be set on
/// the watermark entry to the current time. Ideally, this would be from the perspective of the
/// store. If this is not possible, then it should come from some other common source of time
/// between the indexer and its readers. This timestamp is critical to the indexer's operations,
/// as it determines when the pruner can safely begin pruning data. When `pruner_watermark` is
/// called by the indexer, it will retrieve this timestamp to determine how much longer to wait
/// before beginning to prune.
///
/// Returns a boolean indicating whether the watermark was actually updated or not.
async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool>;

/// Update the pruner watermark, returns true if the watermark was actually updated
async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool>;
}

/// A storage-agnostic interface that provides database connections for both watermark management
/// and arbitrary writes. The indexer framework accepts this `Store` implementation to manage
/// watermarks operations through its associated `Connection` type. This store is also passed to the
/// pipeline handlers to perform arbitrary writes to the store.
#[async_trait]
pub trait Store: Send + Sync + 'static + Clone {
type Connection<'c>: Connection
where
Self: 'c;

async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>, anyhow::Error>;
}

When to use BYOS:

  • Different database: MongoDB, CouchDB, or other non-PostgreSQL databases. This also applies if you prefer to use PostgreSQL but without the default Diesel ORM.
  • Custom requirements: Specialized storage logic, partitioning, or performance optimizations.

Core implementation requirements

To implement BYOS, you need to:

  1. Define your Store and Connection struct that manages connections.
  2. Implement the Store trait for connection management.
  3. Implement the Connection trait for watermark operations.
  4. Use manual Indexer instead of IndexerCluster.

Step 1: Define your store structure

use sui_indexer_alt_framework::store::{Store, Connection};
use async_trait::async_trait;

#[derive(Clone)]
pub struct MyCustomStore {
// Your database connection details
connection_pool: MyDatabasePool,
config: MyConfig,
}

pub struct MyCustomConnection<'a> {
// A connection instance
conn: MyDatabaseConnection<'a>,
}

Step 2: Implement the Store trait

The Store trait manages connection lifecycle:

#[async_trait]
impl Store for MyCustomStore {
type Connection<'c> = MyCustomConnection<'c>;

async fn connect<'c>(&'c self) -> anyhow::Result<Self::Connection<'c>> {
// Your implementation
}
}

Step 3: Implement the Connection trait

The Connection trait handles watermark operations for pipeline coordination:

#[async_trait]
impl Connection for MyCustomConnection<'_> {
// Get the highest checkpoint processed by a pipeline
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<CommitterWatermark>> {
// Query your database for watermark data
todo!("Implement based on your storage system")
}

// Get the lowest available checkpoint for readers
async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<ReaderWatermark>> {
// Implementation depends on your database schema
todo!("Implement based on your storage system")
}

// Implement other required methods...
}

For a complete reference, study the sui-pg-db implementation on Connection:

#[async_trait]
impl store::Connection for Connection<'_> {
async fn committer_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<store::CommitterWatermark>> {
let watermark: Option<(i64, i64, i64, i64)> = watermarks::table
.select((
watermarks::epoch_hi_inclusive,
watermarks::checkpoint_hi_inclusive,
watermarks::tx_hi,
watermarks::timestamp_ms_hi_inclusive,
))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;

if let Some(watermark) = watermark {
Ok(Some(store::CommitterWatermark {
epoch_hi_inclusive: watermark.0 as u64,
checkpoint_hi_inclusive: watermark.1 as u64,
tx_hi: watermark.2 as u64,
timestamp_ms_hi_inclusive: watermark.3 as u64,
}))
} else {
Ok(None)
}
}

async fn reader_watermark(
&mut self,
pipeline: &'static str,
) -> anyhow::Result<Option<store::ReaderWatermark>> {
let watermark: Option<(i64, i64)> = watermarks::table
.select((watermarks::checkpoint_hi_inclusive, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;

if let Some(watermark) = watermark {
Ok(Some(store::ReaderWatermark {
checkpoint_hi_inclusive: watermark.0 as u64,
reader_lo: watermark.1 as u64,
}))
} else {
Ok(None)
}
}

async fn pruner_watermark(
&mut self,
pipeline: &'static str,
delay: Duration,
) -> anyhow::Result<Option<store::PrunerWatermark>> {
// |---------- + delay ---------------------|
// |--- wait_for ---|
// |-----------------------|----------------|
// ^ ^
// pruner_timestamp NOW()
let wait_for = sql!(as BigInt,
"CAST({BigInt} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
delay.as_millis() as i64,
);

let watermark: Option<(i64, i64, i64)> = watermarks::table
.select((wait_for, watermarks::pruner_hi, watermarks::reader_lo))
.filter(watermarks::pipeline.eq(pipeline))
.first(self)
.await
.optional()?;

if let Some(watermark) = watermark {
Ok(Some(store::PrunerWatermark {
wait_for_ms: watermark.0,
pruner_hi: watermark.1 as u64,
reader_lo: watermark.2 as u64,
}))
} else {
Ok(None)
}
}

async fn set_committer_watermark(
&mut self,
pipeline: &'static str,
watermark: store::CommitterWatermark,
) -> anyhow::Result<bool> {
// Create a StoredWatermark directly from CommitterWatermark
let stored_watermark = StoredWatermark {
pipeline: pipeline.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
tx_hi: watermark.tx_hi as i64,
timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive as i64,
reader_lo: 0,
pruner_timestamp: NaiveDateTime::UNIX_EPOCH,
pruner_hi: 0,
};

use diesel::query_dsl::methods::FilterDsl;
Ok(diesel::insert_into(watermarks::table)
.values(&stored_watermark)
// There is an existing entry, so only write the new `hi` values
.on_conflict(watermarks::pipeline)
.do_update()
.set((
watermarks::epoch_hi_inclusive.eq(stored_watermark.epoch_hi_inclusive),
watermarks::checkpoint_hi_inclusive.eq(stored_watermark.checkpoint_hi_inclusive),
watermarks::tx_hi.eq(stored_watermark.tx_hi),
watermarks::timestamp_ms_hi_inclusive
.eq(stored_watermark.timestamp_ms_hi_inclusive),
))
.filter(
watermarks::checkpoint_hi_inclusive.lt(stored_watermark.checkpoint_hi_inclusive),
)
.execute(self)
.await?
> 0)
}

async fn set_reader_watermark(
&mut self,
pipeline: &'static str,
reader_lo: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set((
watermarks::reader_lo.eq(reader_lo as i64),
watermarks::pruner_timestamp.eq(diesel::dsl::now),
))
.filter(watermarks::pipeline.eq(pipeline))
.filter(watermarks::reader_lo.lt(reader_lo as i64))
.execute(self)
.await?
> 0)
}

async fn set_pruner_watermark(
&mut self,
pipeline: &'static str,
pruner_hi: u64,
) -> anyhow::Result<bool> {
Ok(diesel::update(watermarks::table)
.set(watermarks::pruner_hi.eq(pruner_hi as i64))
.filter(watermarks::pipeline.eq(pipeline))
.execute(self)
.await?
> 0)
}
}

Step 4: Use manual indexer

Replace IndexerCluster with manual Indexer:

use sui_indexer_alt_framework::{Indexer, IndexerArgs};
use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig};

async fn main() -> anyhow::Result<()> {
// Initialize your custom store
let store = MyCustomStore::new(config).await?;

// Configure indexer manually
let indexer = Indexer::new(
store,
IndexerArgs::default(),
ClientArgs {
remote_store_url: Some("https://checkpoints.testnet.sui.io".to_string()),
local_ingestion_path: None,
rpc_api_url: None,
rpc_username: None,
rpc_password: None,
},
IngestionConfig::default(),
&prometheus::Registry::new(),
tokio_util::sync::CancellationToken::new(),
).await?;

// Add your pipelines
indexer.concurrent_pipeline(
YourHandler::default(),
ConcurrentConfig::default(),
).await?;

// Start the indexer
indexer.run().await?;
Ok(())
}

Example: ClickHouse implementation

For a complete working example of BYOS with ClickHouse (a high-performance columnar database for analytics), see the example project in the Sui repo.

Click to open

ClickHouse example README

ClickHouse Sui Indexer

A simple example of how to build a custom Sui indexer that writes transaction data to ClickHouse.

Quick Start

1. Start ClickHouse

docker run -d --name clickhouse-dev -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse/clickhouse-server

2. Set up database user

docker exec clickhouse-dev clickhouse-client --query "CREATE USER IF NOT EXISTS dev IDENTIFIED WITH no_password"
docker exec clickhouse-dev clickhouse-client --query "GRANT CREATE, INSERT, SELECT, ALTER, UPDATE, DELETE ON *.* TO dev"

3. Run the indexer

cargo run -- --remote-store-url https://checkpoints.testnet.sui.io --last-checkpoint=10

That’s it! The indexer will:

  • Create the necessary tables automatically
  • Fetch checkpoints from the Sui testnet
  • Write transaction data to ClickHouse

Verify Data

Check that data was written:

docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT COUNT(*) FROM transactions"
docker exec clickhouse-dev clickhouse-client --user=dev --query "SELECT * FROM transactions LIMIT 5"

Clean Up

Stop and remove the ClickHouse container:

docker stop clickhouse-dev && docker rm clickhouse-dev

What This Example Shows

  • Custom Store Implementation: How to implement the Store trait for ClickHouse
  • Concurrent Pipeline: Uses the concurrent pipeline for better pruning and watermark testing
  • Watermark Management: Tracking indexer progress with committer, reader, and pruner watermarks
  • Transaction Processing: Extracting and storing transaction digests from checkpoints
  • Simple Setup: Minimal configuration for local development

Architecture

Sui Network → Checkpoints → Concurrent Pipeline → ClickHouse Store → ClickHouse DB

The indexer uses a concurrent pipeline that processes checkpoints out-of-order with separate reader, committer, and pruner components. This is ideal for testing watermark functionality and pruning behavior.

This example demonstrates:

  • Custom store implementation using the ClickHouse Rust client.
  • Watermark persistence with ClickHouse-specific SQL syntax.
  • Transaction digest indexing similar to the built-in PostgreSQL handler.

The example includes three main components:

  1. store.rs - ClickHouseStore implementing Store and Connection traits.

    Click to open

    store.rs

    use anyhow::Result;
    use async_trait::async_trait;
    use chrono::Utc;
    use clickhouse::{Client, Row};
    use scoped_futures::ScopedBoxFuture;
    use serde::{Deserialize, Serialize};
    use std::time::Duration;
    use sui_indexer_alt_framework_store_traits::{
    CommitterWatermark, Connection, PrunerWatermark, ReaderWatermark, Store, TransactionalStore,
    };
    use url::Url;

    #[derive(Clone)]
    pub struct ClickHouseStore {
    client: Client,
    }

    pub struct ClickHouseConnection {
    pub client: Client,
    }

    /// Row structure for watermark table operations
    #[derive(Row, Serialize, Deserialize, Debug, Default)]
    struct WatermarkRow {
    pipeline: String,
    epoch_hi_inclusive: u64,
    checkpoint_hi_inclusive: u64,
    tx_hi: u64,
    timestamp_ms_hi_inclusive: u64,
    reader_lo: u64,
    pruner_hi: u64,
    pruner_timestamp: u64, // Unix timestamp in milliseconds
    }

    impl ClickHouseStore {
    pub fn new(url: Url) -> Self {
    let client = Client::default()
    .with_url(url.as_str())
    .with_user("dev") // Simple user for local development
    .with_compression(clickhouse::Compression::Lz4);
    Self { client }
    }

    /// Create tables if they don't exist
    pub async fn create_tables_if_not_exists(&self) -> Result<()> {
    // Create watermarks table for pipeline state management
    self.client
    .query(
    "
    CREATE TABLE IF NOT EXISTS watermarks
    (
    pipeline String,
    epoch_hi_inclusive UInt64,
    checkpoint_hi_inclusive UInt64,
    tx_hi UInt64,
    timestamp_ms_hi_inclusive UInt64,
    reader_lo UInt64,
    pruner_hi UInt64,
    pruner_timestamp UInt64
    )
    ENGINE = MergeTree()
    ORDER BY pipeline
    ",
    )
    .execute()
    .await?;

    // Create transactions table for the actual indexing data
    self.client
    .query(
    "
    CREATE TABLE IF NOT EXISTS transactions
    (
    checkpoint_sequence_number UInt64,
    transaction_digest String,
    indexed_at DateTime64(3, 'UTC') DEFAULT now()
    )
    ENGINE = MergeTree()
    ORDER BY checkpoint_sequence_number
    ",
    )
    .execute()
    .await?;

    Ok(())
    }
    }

    #[async_trait]
    impl Store for ClickHouseStore {
    type Connection<'c> = ClickHouseConnection;

    async fn connect<'c>(&'c self) -> Result<Self::Connection<'c>> {
    Ok(ClickHouseConnection {
    client: self.client.clone(),
    })
    }
    }

    #[async_trait]
    impl TransactionalStore for ClickHouseStore {
    async fn transaction<'a, R, F>(&self, f: F) -> anyhow::Result<R>
    where
    R: Send + 'a,
    F: Send + 'a,
    F: for<'r> FnOnce(
    &'r mut Self::Connection<'_>,
    ) -> ScopedBoxFuture<'a, 'r, anyhow::Result<R>>,
    {
    let mut conn = self.connect().await?;
    f(&mut conn).await
    }
    }

    #[async_trait]
    impl Connection for ClickHouseConnection {
    async fn committer_watermark(
    &mut self,
    pipeline: &'static str,
    ) -> Result<Option<CommitterWatermark>> {
    let mut cursor = self
    .client
    .query(
    "SELECT epoch_hi_inclusive, checkpoint_hi_inclusive, tx_hi, timestamp_ms_hi_inclusive
    FROM watermarks
    WHERE pipeline = ?
    ORDER BY pruner_timestamp DESC
    LIMIT 1"
    )
    .bind(pipeline)
    .fetch::<(u64, u64, u64, u64)>()?;

    let row: Option<(u64, u64, u64, u64)> = cursor.next().await?;
    Ok(row.map(
    |(epoch_hi, checkpoint_hi, tx_hi, timestamp_hi)| CommitterWatermark {
    epoch_hi_inclusive: epoch_hi,
    checkpoint_hi_inclusive: checkpoint_hi,
    tx_hi,
    timestamp_ms_hi_inclusive: timestamp_hi,
    },
    ))
    }

    async fn reader_watermark(
    &mut self,
    pipeline: &'static str,
    ) -> Result<Option<ReaderWatermark>> {
    let mut cursor = self
    .client
    .query(
    "SELECT checkpoint_hi_inclusive, reader_lo
    FROM watermarks
    WHERE pipeline = ?
    ORDER BY pruner_timestamp DESC
    LIMIT 1",
    )
    .bind(pipeline)
    .fetch::<(u64, u64)>()?;

    let row: Option<(u64, u64)> = cursor.next().await?;
    Ok(row.map(|(checkpoint_hi, reader_lo)| ReaderWatermark {
    checkpoint_hi_inclusive: checkpoint_hi,
    reader_lo,
    }))
    }

    async fn pruner_watermark(
    &mut self,
    pipeline: &'static str,
    delay: Duration,
    ) -> Result<Option<PrunerWatermark>> {
    // Follow PostgreSQL pattern: calculate wait_for_ms on database side
    // We do this so that we can rely on the database to keep a consistent sense of time.
    // Using own clocks can potentially be subject to some clock skew.
    let delay_ms = delay.as_millis() as i64;
    let mut cursor = self
    .client
    .query(
    "SELECT reader_lo, pruner_hi,
    toInt64(? + (pruner_timestamp - toUnixTimestamp64Milli(now64()))) as wait_for_ms
    FROM watermarks
    WHERE pipeline = ?
    ORDER BY pruner_timestamp DESC
    LIMIT 1"
    )
    .bind(delay_ms)
    .bind(pipeline)
    .fetch::<(u64, u64, i64)>()?;

    let row: Option<(u64, u64, i64)> = cursor.next().await?;
    Ok(
    row.map(|(reader_lo, pruner_hi, wait_for_ms)| PrunerWatermark {
    wait_for_ms,
    reader_lo,
    pruner_hi,
    }),
    )
    }

    async fn set_committer_watermark(
    &mut self,
    pipeline: &'static str,
    watermark: CommitterWatermark,
    ) -> Result<bool> {
    // Follow PostgreSQL pattern: check if row exists, then UPDATE or INSERT accordingly

    // First check if pipeline exists and get current checkpoint
    let mut cursor = self
    .client
    .query("SELECT checkpoint_hi_inclusive FROM watermarks WHERE pipeline = ? LIMIT 1")
    .bind(pipeline)
    .fetch::<u64>()?;

    let existing_checkpoint: Option<u64> = cursor.next().await?;

    if let Some(existing_checkpoint) = existing_checkpoint {
    // Row exists - only update if checkpoint advances
    if existing_checkpoint < watermark.checkpoint_hi_inclusive {
    self.client
    .query(
    "ALTER TABLE watermarks
    UPDATE
    epoch_hi_inclusive = ?,
    checkpoint_hi_inclusive = ?,
    tx_hi = ?,
    timestamp_ms_hi_inclusive = ?
    WHERE pipeline = ?",
    )
    .bind(watermark.epoch_hi_inclusive)
    .bind(watermark.checkpoint_hi_inclusive)
    .bind(watermark.tx_hi)
    .bind(watermark.timestamp_ms_hi_inclusive)
    .bind(pipeline)
    .execute()
    .await?;
    }
    } else {
    // No existing row - insert new one
    let mut inserter = self.client.inserter("watermarks")?;
    inserter.write(&WatermarkRow {
    pipeline: pipeline.to_string(),
    epoch_hi_inclusive: watermark.epoch_hi_inclusive,
    checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive,
    tx_hi: watermark.tx_hi,
    timestamp_ms_hi_inclusive: watermark.timestamp_ms_hi_inclusive,
    reader_lo: 0, // Will be updated by reader
    pruner_hi: 0, // Will be updated by pruner
    pruner_timestamp: Utc::now().timestamp_millis() as u64,
    })?;
    inserter.end().await?;
    }

    Ok(true)
    }

    async fn set_reader_watermark(
    &mut self,
    pipeline: &'static str,
    reader_lo: u64,
    ) -> Result<bool> {
    // Follow PostgreSQL pattern: simple UPDATE with timestamp update and advancement check
    self.client
    .query(
    "ALTER TABLE watermarks
    UPDATE reader_lo = ?, pruner_timestamp = toUnixTimestamp64Milli(now64())
    WHERE pipeline = ? AND reader_lo < ?",
    )
    .bind(reader_lo)
    .bind(pipeline)
    .bind(reader_lo)
    .execute()
    .await?;

    Ok(true)
    }

    async fn set_pruner_watermark(
    &mut self,
    pipeline: &'static str,
    pruner_hi: u64,
    ) -> Result<bool> {
    // Follow PostgreSQL pattern: simple UPDATE statement
    self.client
    .query(
    "ALTER TABLE watermarks
    UPDATE pruner_hi = ?
    WHERE pipeline = ?",
    )
    .bind(pruner_hi)
    .bind(pipeline)
    .execute()
    .await?;

    Ok(true)
    }
    }
  2. handlers.rs - TxDigest handler processing checkpoint data.

    Click to open

    handlers.rs

    use anyhow::Result;
    use async_trait::async_trait;
    use clickhouse::Row;
    use serde::Serialize;
    use std::sync::Arc;

    use sui_indexer_alt_framework::{
    pipeline::{concurrent::Handler, Processor},
    FieldCount,
    };
    use sui_indexer_alt_framework_store_traits::Store;
    use sui_types::full_checkpoint_content::CheckpointData;

    use crate::store::ClickHouseStore;

    /// Structure representing a transaction digest record in ClickHouse
    /// Aligned with sui-indexer-alt's StoredTxDigest structure
    #[derive(Row, Serialize, Clone, Debug, FieldCount)]
    pub struct StoredTxDigest {
    pub tx_sequence_number: i64,
    pub tx_digest: Vec<u8>,
    }

    /// Handler that processes checkpoint data and extracts transaction digests
    /// Named to align with sui-indexer-alt's TxDigests handler
    #[derive(Clone, Default)]
    pub struct TxDigests;

    impl Processor for TxDigests {
    const NAME: &'static str = "tx_digests";
    type Value = StoredTxDigest;

    fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
    let CheckpointData {
    transactions,
    checkpoint_summary,
    ..
    } = checkpoint.as_ref();

    let first_tx = checkpoint_summary.network_total_transactions as usize - transactions.len();

    Ok(transactions
    .iter()
    .enumerate()
    .map(|(i, tx)| StoredTxDigest {
    tx_sequence_number: (first_tx + i) as i64,
    tx_digest: tx.transaction.digest().inner().to_vec(),
    })
    .collect())
    }
    }

    #[async_trait]
    impl Handler for TxDigests {
    type Store = ClickHouseStore;

    async fn commit<'a>(
    values: &[Self::Value],
    conn: &mut <Self::Store as Store>::Connection<'a>,
    ) -> Result<usize> {
    let row_count = values.len();
    if row_count == 0 {
    return Ok(0);
    }

    // Use ClickHouse inserter for efficient bulk inserts
    let mut inserter = conn.client.inserter("tx_digests")?;
    for tx_digest in values {
    inserter.write(tx_digest)?;
    }
    inserter.end().await?;

    Ok(row_count)
    }
    }
  3. main.rs - Manual indexer setup with ClickHouse backend.

    Click to open

    main.rs

    mod handlers;
    mod store;

    use anyhow::Result;
    use clap::Parser;
    use sui_indexer_alt_framework::{
    ingestion::{ClientArgs, IngestionConfig},
    pipeline::concurrent::ConcurrentConfig,
    Indexer, IndexerArgs,
    };
    use url::Url;

    use handlers::TxDigests;
    use store::ClickHouseStore;

    #[derive(clap::Parser, Debug, Clone)]
    struct Args {
    #[clap(flatten)]
    pub indexer_args: IndexerArgs,

    #[clap(flatten)]
    pub client_args: ClientArgs,
    }

    #[tokio::main]
    async fn main() -> Result<()> {
    // Initialize crypto provider for HTTPS connections (needed for remote checkpoint fetching)
    rustls::crypto::ring::default_provider()
    .install_default()
    .expect("Failed to install crypto provider");

    // Parse command-line arguments
    let args = Args::parse();

    // ClickHouse connection (uses 'dev' user by default for local development)
    let clickhouse_url = "http://localhost:8123".parse::<Url>()?;

    println!("Connecting to ClickHouse at: {}", clickhouse_url);

    // Create our custom ClickHouse store
    let store = ClickHouseStore::new(clickhouse_url);

    // Ensure the database tables are created before starting the indexer
    store.create_tables_if_not_exists().await?;

    // Manually build the indexer with our custom ClickHouse store
    // This is the key difference from basic-sui-indexer which uses IndexerCluster::builder()
    let mut indexer = Indexer::new(
    store.clone(),
    args.indexer_args,
    args.client_args,
    IngestionConfig::default(),
    None, // No metrics prefix
    &Default::default(), // Empty prometheus registry
    tokio_util::sync::CancellationToken::new(),
    )
    .await?;

    // Register our concurrent pipeline handler (better for testing pruning)
    // This processes checkpoints with separate reader and pruner components
    indexer
    .concurrent_pipeline(
    TxDigests,
    // ConcurrentConfig default comes with no pruning.
    ConcurrentConfig::default(),
    )
    .await?;

    println!("Starting ClickHouse Sui indexer...");

    // Start the indexer and wait for it to complete
    let handle = indexer.run().await?;

    // This will run until the indexer is stopped (e.g., by Ctrl+C)
    handle.await?;

    println!("Indexer stopped");
    Ok(())
    }

Deserializing Move events

When Move smart contracts execute on Sui, they can emit events using the sui::event module. These events are stored in checkpoints as BCS-serialized bytes that your indexer needs to deserialize to extract meaningful data.

Why deserialization is needed

Move contracts emit events like this:

// Move smart contract
use sui::event;

public fun transfer_balance(...) {
event::emit(BalanceEvent {
balance_manager_id: id,
asset: asset_id,
amount: 100,
deposit: true
});
}

But in checkpoint data, these events arrive as raw BCS bytes that need to be converted back to Rust structs for processing.

Step-by-step deserialization

  1. Add BCS dependency

    [dependencies]
    bcs = "0.1.6"
    serde = { version = "1.0", features = ["derive"] }
  2. Define the Event struct in Rust

    Define the same structure in Rust as declared in Move. You can do this manually or use move-binding to auto-generate it from on-chain packages.

    use serde::Deserialize;
    use sui_indexer_alt_framework::types::::base_types::ObjectID;

    #[derive(Deserialize, Debug)]
    struct BalanceEvent {
    balance_manager_id: ObjectID,
    asset: ObjectID,
    amount: u64,
    deposit: bool,
    }
    important

    Field order and types must match the Move event exactly.

  3. Extract event bytes in your processor

    impl Processor for YourHandler {
    fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>> {
    let mut results = Vec::new();

    for transaction in &checkpoint.transactions {
    for event in &transaction.events {
    // Get the raw BCS bytes
    let event_bytes = &event.contents;

    // Deserialize to your Rust struct
    if let Ok(balance_event) = bcs::from_bytes::<BalanceEvent>(event_bytes) {
    // Do something
    }
    }
    }

    Ok(results)
    }
    }
Custom Indexing Framework

The sui-indexer-alt-framework is a powerful Rust framework for building high-performance, custom blockchain indexers on Sui. It provides customizable, production-ready components for data ingestion, processing, and storage.

Custom Indexer

Establishing a custom indexer helps improve latency, allows pruning the data of your Sui full node, and provides efficient assemblage of checkpoint data.