GraphQL for Sui RPC (Beta)
GraphQL provides a flexible way to query the Sui network. This page covers the core concepts for working with GraphQL on Sui RPC, including request headers, query composition with variables and fragments, pagination strategies, query scope, and service limits.
For practical examples, see Querying Sui RPC with GraphQL. For comprehensive GraphQL fundamentals, consult the introductory documentation from GraphQL and GitHub.
JSON-RPC is deprecated. Migrate to either gRPC or GraphQL RPC by July 2026.
Refer to the list of RPC or data providers that have enabled gRPC on their full nodes or offer GraphQL RPC. Contact a provider directly to request access. If your RPC or data provider doesnβt yet support these data access methods, ask them to enable support or contact the Sui Foundation team on Discord, Telegram, or Slack for help.
The GraphQL RPC Service reads data from the General-purpose Indexer's Postgres-compatible database, Archival Store and Service, and a full node. GraphQL RPC is an alternative to the gRPC API. The General-purpose Indexer is a scalable implementation of the custom indexing framework. The framework ingests data using the remote checkpoint store and full node RPCs. It lets you configure it to load different types of Sui network data into Postgres tables in parallel, improving data ingestion performance. You can also configure pruning for different tables to balance performance and cost.
High-level release timelineβ
The target times indicated are tentative and subject to updates based on project progress and your feedback.
| Tentative time | Milestone | Description |
|---|---|---|
| βοΈ September 2025 | Beta release of GraphQL RPC Server and General-purpose Indexer. | You can start validating the setup of General-purpose Indexer, along with testing the GraphQL RPC Server to access the indexed Sui data. You can also start migrating your application in the non-production environments, and share feedback on the improvements you want to see. |
| βοΈ September-October 2025 | Deprecation of JSON-RPC. | JSON-RPC is deprecated at this point and migration notice period starts. |
| February-March 2026 | GA release of GraphQL RPC Server and General-purpose Indexer. | Begin migration and cutover of your application in the production environment. |
| July 2026 | End of migration timeline. | JSON-RPC is fully deactivated at this point. This timeline assumes about 7 months of migration notice period. |
Refer to Access Sui Data for an overview of options to access Sui network data.
The GraphQL RPC release stage is currently in beta. Refer to the high-level timeline for releases.
Componentsβ
The key components of the GraphQL and General-purpose Indexer stack include the following:
-
General-purpose Indexer: Ingests and transforms Sui checkpoint data using configurable and parallel pipelines, then writes it into a Postgres-compatible database. It can be configured to use the Sui remote checkpoint store and a full node as its sources.
-
Postgres-compatible database: Stores indexed data for GraphQL queries. It is tested using GCP AlloyDB, but you can run any Postgres-compatible database. Test alternative databases and share feedback on performance, cost, and operational characteristics.
-
GraphQL service: Serves structured queries over indexed data. It follows the GraphQL specification and the supported schema is documented in the GraphQL API reference.
-
Archival Service: Enables point lookups for historical data from a key-value store. If unavailable, the GraphQL service falls back to the Postgres-compatible database for lookups, which might be limited by that database's retention policy. See Archival Store and Service for more information.
-
Consistent Store: Answers queries about the latest state of the network within the last hour (objects owned by addresses, objects by type, balances by address and type). Consistency is guaranteed by pinning queries to a specific (recent) checkpoint.
-
Full node: Enables transaction execution and simulation.
When to useβ
Use GraphQL RPC with the General-purpose Indexer as a flexible and ergonomic data API to build rich dashboards, explorers, and data-driven apps. The API is powered by an indexer created using the custom indexing framework.
Use GraphQL if your application:
-
Requires historical data with configurable retention or filtered access to data, such as all transactions sent by an address.
-
Needs to display structured results in a frontend, such as wallets and dashboards.
-
Benefits from flexible, composable queries that reduce overfetching.
-
Relies on multiple data entities, such as transactions, objects, or events, in a single request, or in a consistent fashion when spread over multiple requests as if the responses came from a snapshot at some checkpoint.
How GraphQL RPC and General-purpose Indexer fit into the application stackβ
If you are using the deprecated JSON-RPC in your application, you can migrate to GraphQL RPC by either self-operating the combined stack of General-purpose Indexer, Postgres-compatible database, and GraphQL RPC server, or by utilizing it as a service from an RPC provider or indexer operator.
You can run or use the GraphQL and Indexer data stack in the following configurations.
Fully managed serviceβ
As a developer, you can access GraphQL as a service from an indexer operator or data provider who runs and operates the full stack behind the scenes. Reach out to your data provider and ask if they already offer or plan to offer this service.
Partial self-managedβ
As a developer, you can:
-
Run the Indexer pipelines and GraphQL service, while using the Archival Service and a full node from an RPC provider or indexer operator.
-
Configure and manage a Postgres-compatible database (local Postgres, AlloyDB, and so on) as the primary data store.
-
Deploy the self-managed components on cloud infrastructure or baremetal.
Fully self-managedβ
As a developer, indexer operator, or RPC provider, you can:
-
Run the complete stack: Indexer pipelines, GraphQL service, Postgres-compatible database, Archival Service, Consistent Store and full node on cloud infrastructure or bare metal.
-
Serve GraphQL to your own applications or to other builders and third-party services.
Working with the GraphQL serviceβ
The GraphQL service exposes a query surface conforming to GraphQL concepts. It allows pagination, filtering, and consistent snapshot queries. The service also supports runtime configuration for schema, query cost limits, and logging. The GraphQL schema is defined in the GraphQL reference. You can explore supported types and fields there, use the GraphiQL IDE to test queries, and read documentation on the up-to-date schema.
The GraphQL service is deployed as a single binary implementing a stateless, horizontally scalable service. Queries are served with data from one or more of a Postgres-compatible database (filters over historical data), Archival Service (point lookups), Consistent Store (live data), or full node (execution and simulation), based on need. Access to these stores must be configured with the service on start-up, otherwise the service might fail to respond correctly to requests. More details on how to set up, configure, and run the service is available in its README.
Requests to GraphQL are subject to various limits, to ensure resources are shared fairly between clients. Each limit is configurable, and the values configured for an instance can be queried through Query.serviceConfig. Requests that do not meet limits return with an error. The following limits are in effect:
-
Request size: Requests might not exceed a certain size in bytes. The limit is spread across a transaction payload limit, which applies to all values and variable bindings that are parameters to transaction signing, execution, and simulation fields (default: 175KB), and a query payload limit which applies to all other parts of the query (default: 5KB).
-
Request timeout: Time spent on each request is bounded, with different bounds for execution (default: 74s) and regular reads (default: 40s).
-
Query input nodes and depth: The query cannot be too complex, meaning it cannot contain too many input nodes or field names (default: 300) or be too deeply nested (default: 20).
-
Output nodes: The service estimates the maximum number of output nodes the query might produce, assuming every requested field is present, every paginated field returns full pages, and every multi-get finds all requested keys. This estimate must be bounded (default: 1,000,000).
-
Page and multi-get size: Each paginated field (default: 50) and multi-get (default: 200) is subject to a maximum size. Certain paginated fields might override this to provide a higher or lower maximum.
-
(TBD) Rich queries: A request can contain only a bounded number (default: 5) of queries that require dedicated access to the database (cannot be grouped with other requests).
Working with General-purpose Indexerβ
General-purpose indexer fetches checkpoints data from either a remote object store, local files, or a full node RPC, and indexes data into multiple database tables through a set of specialized pipelines. Each pipeline is responsible for extracting specific data and writing to its target tables.
Full list of tables and their schemas
// @generated automatically by Diesel CLI.
diesel::table! {
coin_balance_buckets (object_id, cp_sequence_number) {
object_id -> Bytea,
cp_sequence_number -> Int8,
owner_kind -> Nullable<Int2>,
owner_id -> Nullable<Bytea>,
coin_type -> Nullable<Bytea>,
coin_balance_bucket -> Nullable<Int2>,
}
}
diesel::table! {
coin_balance_buckets_deletion_reference (cp_sequence_number, object_id) {
object_id -> Bytea,
cp_sequence_number -> Int8,
}
}
diesel::table! {
cp_bloom_blocks (cp_block_index, bloom_block_index) {
cp_block_index -> Int8,
bloom_block_index -> Int2,
bloom_filter -> Bytea,
}
}
diesel::table! {
cp_blooms (cp_sequence_number) {
cp_sequence_number -> Int8,
bloom_filter -> Bytea,
}
}
diesel::table! {
cp_sequence_numbers (cp_sequence_number) {
cp_sequence_number -> Int8,
tx_lo -> Int8,
epoch -> Int8,
}
}
diesel::table! {
ev_emit_mod (package, module, tx_sequence_number) {
package -> Bytea,
module -> Text,
tx_sequence_number -> Int8,
sender -> Bytea,
}
}
diesel::table! {
ev_struct_inst (package, module, name, instantiation, tx_sequence_number) {
package -> Bytea,
module -> Text,
name -> Text,
instantiation -> Bytea,
tx_sequence_number -> Int8,
sender -> Bytea,
}
}
diesel::table! {
kv_checkpoints (sequence_number) {
sequence_number -> Int8,
checkpoint_contents -> Bytea,
checkpoint_summary -> Bytea,
validator_signatures -> Bytea,
}
}
diesel::table! {
kv_epoch_ends (epoch) {
epoch -> Int8,
cp_hi -> Int8,
tx_hi -> Int8,
end_timestamp_ms -> Int8,
safe_mode -> Bool,
total_stake -> Nullable<Int8>,
storage_fund_balance -> Nullable<Int8>,
storage_fund_reinvestment -> Nullable<Int8>,
storage_charge -> Nullable<Int8>,
storage_rebate -> Nullable<Int8>,
stake_subsidy_amount -> Nullable<Int8>,
total_gas_fees -> Nullable<Int8>,
total_stake_rewards_distributed -> Nullable<Int8>,
leftover_storage_fund_inflow -> Nullable<Int8>,
epoch_commitments -> Bytea,
}
}
diesel::table! {
kv_epoch_starts (epoch) {
epoch -> Int8,
protocol_version -> Int8,
cp_lo -> Int8,
start_timestamp_ms -> Int8,
reference_gas_price -> Int8,
system_state -> Bytea,
}
}
diesel::table! {
kv_feature_flags (protocol_version, flag_name) {
protocol_version -> Int8,
flag_name -> Text,
flag_value -> Bool,
}
}
diesel::table! {
kv_genesis (genesis_digest) {
genesis_digest -> Bytea,
initial_protocol_version -> Int8,
}
}
diesel::table! {
kv_objects (object_id, object_version) {
object_id -> Bytea,
object_version -> Int8,
serialized_object -> Nullable<Bytea>,
}
}
diesel::table! {
kv_packages (package_id, package_version) {
package_id -> Bytea,
package_version -> Int8,
original_id -> Bytea,
is_system_package -> Bool,
serialized_object -> Bytea,
cp_sequence_number -> Int8,
}
}
diesel::table! {
kv_protocol_configs (protocol_version, config_name) {
protocol_version -> Int8,
config_name -> Text,
config_value -> Nullable<Text>,
}
}
diesel::table! {
kv_transactions (tx_digest) {
tx_digest -> Bytea,
cp_sequence_number -> Int8,
timestamp_ms -> Int8,
raw_transaction -> Bytea,
raw_effects -> Bytea,
events -> Bytea,
user_signatures -> Bytea,
}
}
diesel::table! {
obj_info (object_id, cp_sequence_number) {
object_id -> Bytea,
cp_sequence_number -> Int8,
owner_kind -> Nullable<Int2>,
owner_id -> Nullable<Bytea>,
package -> Nullable<Bytea>,
module -> Nullable<Text>,
name -> Nullable<Text>,
instantiation -> Nullable<Bytea>,
}
}
diesel::table! {
obj_info_deletion_reference (cp_sequence_number, object_id) {
object_id -> Bytea,
cp_sequence_number -> Int8,
}
}
diesel::table! {
obj_versions (object_id, object_version) {
object_id -> Bytea,
object_version -> Int8,
object_digest -> Nullable<Bytea>,
cp_sequence_number -> Int8,
}
}
diesel::table! {
sum_displays (object_type) {
object_type -> Bytea,
display_id -> Bytea,
display_version -> Int2,
display -> Bytea,
}
}
diesel::table! {
tx_affected_addresses (affected, tx_sequence_number) {
affected -> Bytea,
tx_sequence_number -> Int8,
sender -> Bytea,
}
}
diesel::table! {
tx_affected_objects (affected, tx_sequence_number) {
tx_sequence_number -> Int8,
affected -> Bytea,
sender -> Bytea,
}
}
diesel::table! {
tx_balance_changes (tx_sequence_number) {
tx_sequence_number -> Int8,
balance_changes -> Bytea,
}
}
diesel::table! {
tx_calls (package, module, function, tx_sequence_number) {
package -> Bytea,
module -> Text,
function -> Text,
tx_sequence_number -> Int8,
sender -> Bytea,
}
}
diesel::table! {
tx_digests (tx_sequence_number) {
tx_sequence_number -> Int8,
tx_digest -> Bytea,
}
}
diesel::table! {
tx_kinds (tx_kind, tx_sequence_number) {
tx_kind -> Int2,
tx_sequence_number -> Int8,
}
}
diesel::table! {
watermarks (pipeline) {
pipeline -> Text,
epoch_hi_inclusive -> Int8,
checkpoint_hi_inclusive -> Int8,
tx_hi -> Int8,
timestamp_ms_hi_inclusive -> Int8,
reader_lo -> Int8,
pruner_timestamp -> Timestamp,
pruner_hi -> Int8,
}
}
diesel::allow_tables_to_appear_in_same_query!(
coin_balance_buckets,
coin_balance_buckets_deletion_reference,
cp_bloom_blocks,
cp_blooms,
cp_sequence_numbers,
ev_emit_mod,
ev_struct_inst,
kv_checkpoints,
kv_epoch_ends,
kv_epoch_starts,
kv_feature_flags,
kv_genesis,
kv_objects,
kv_packages,
kv_protocol_configs,
kv_transactions,
obj_info,
obj_info_deletion_reference,
obj_versions,
sum_displays,
tx_affected_addresses,
tx_affected_objects,
tx_balance_changes,
tx_calls,
tx_digests,
tx_kinds,
watermarks,
);
Below are brief descriptions of the various categories of pipelines based on the type of data they handle:
Blockchain raw content pipelinesβ
These pipelines capture the core blockchain data in its raw form, preserving complete checkpoint information, full transaction and objects contents, and Move package bytecode and metadata. They ensure the complete blockchain state is available for direct lookup by key (such as object ID and version, transaction digest, checkpoint sequence number). Some production deployments use the Archival Store for looking up checkpoints, transactions, and objects contents instead of the corresponding kv_ tables.
The following pipelines create indexed views that allow efficient filtering and querying based on different attributes (such as object owner, transaction type, affected addresses, event type). These indexes help identify the keys of interest, which can then fetch detailed content from the raw content kv_ tables:
Tables: kv_checkpoints, kv_transactions, kv_objects, kv_packages
Transaction pipelinesβ
These pipelines extract and index key transaction attributes to support efficient filtering and querying. tx_kinds, tx_calls, tx_affected_addresses, and tx_affected_objects enable fast lookups of transactions based on types, function calls, sender and receiver addresses, and changed objects. tx_digests enable conversions between transaction sequence numbers and transaction digests needed for looking up transactions in kv_ tables by digests and tx_balance_changes stores balance changes information of each transaction.
Tables : tx_digests, tx_kinds, tx_calls, tx_affected_addresses, tx_affected_objects, tx_balance_changes
Object pipelinesβ
These pipelines manage current and historical object information. They store active object metadata, maintain version histories for each object, and categorize coin balances into buckets for efficient coin queries sorted by balances. obj_versions table is particularly important for the GraphQL service. It tracks the version history of all blockchain objects, storing object ID, version number, digest, and checkpoint sequence number. The GraphQL service uses this table as an efficient index to resolve object queries by version bounds, checkpoint bounds, or exact versions without loading full object data, enabling features like version pagination and temporal consistency.
Pruning policies can be configured for obj_info and coin_balance_buckets to retain historical data within a specified time range, balancing query needs with storage management. This allows supporting use cases that require querying recent object history without retaining all historical data indefinitely.
Tables: obj_info, obj_versions, coin_balance_buckets
Epoch information pipelinesβ
These pipelines capture protocol upgrades and epoch transition points. They track the system state, reward distribution, validator committee and protocol configurations of each epoch, providing a historical record of network evolution.
Tables: kv_epoch_starts, kv_epoch_ends, kv_feature_flags, kv_protocol_configs
Event processing pipelinesβ
These pipelines index blockchain events for efficient querying by sender, emitting module, or event type.
Tables: ev_emit_mod, ev_struct_inst
Utility and support pipelinesβ
These pipelines provide support infrastructure, such as checkpoint sequence number tracking for pruning and watermark tracking for ensuring consistent reads across different tables in a GraphQL query.
Tables: cp_sequence_numbers, watermarks
Other pipelinesβ
sum_displays tables stores the latest version of the Display object for each object type, used for rendering the off-chain representation (display) for a type.
Working with Consistent Storeβ
The Consistent Store is a combined indexer and RPC service that is responsible for indexing live data on-chain, and serving queries about it for recent checkpoints. Retention is configurable and is typically measured in minutes or hours. Its indexer fetches checkpoints from the same sources as the General-purpose Indexer, and writes data to an embedded RocksDB store, while requests are served through gRPC, answering the following queries:
-
Owner's live objects at a recent checkpoint, optionally filtered by type
-
Live objects for a given type at a recent checkpoint
-
Address balance at a recent checkpoint
This service is not stateless as it maintains its own database. A new instance can be spun up similar to the indexer by syncing it from genesis, or possibly by restoring it from a formal snapshot.
For RPC providers and data operatorsβ
If you're running the GraphQL RPC and General-purpose Indexer stack as a service, here are a few key considerations for configuring your setup to offer builders a performant and cost-effective experience. For step-by-step setup and operations instructions, see the GraphQL and General-Purpose Indexer guide.
How much data to index and retainβ
You should retain 30 to 90 days of recent checkpoint data in your Postgres-compatible database. This provides a strong default for most apps without incurring the high storage costs of full historical indexing.
-
30 days serves as a baseline for dashboards and explorers that need recent activity and assets.
-
90 days improves support for longer-range pagination, historical lookups, or apps with slower engagement cycles.
You can configure your indexing pipelines to scope which data you include, such as events, objects, and transactions, and disable any components that are not needed.
Retaining long-term historical data in Postgres is not recommended unless required for specific apps.
Use the Archival Service and Store for historical lookupsβ
For all production deployments, pair Postgres with the Archival Service to support point lookups of transactions, objects, and checkpoints when relevant data does not exist in Postgres. The Archival Service serves as the backend for historical versions and checkpoint data, reducing pressure on your Postgres instance. While not strictly required, use the Archival Service in any production setup that aims to support high-retention GraphQL or gRPC workloads.
Current implementation supports GCP Bigtable which is a highly scalable and performant data store. If you plan to operate your own archival store, refer to sui-kvstore and sui-kv-rpc for indexer setup and RPC service implementation respectively. For the indexer setup, make sure to use the custom indexing framework. If you're interested in contributing support for other scalable data stores, reach out on GitHub by creating a new issue.
lib.rs in sui-kvstore
lib.rs in sui-kvstoremod bigtable;
pub mod config;
mod handlers;
mod rate_limiter;
pub mod tables;
use std::sync::Arc;
use std::sync::OnceLock;
use anyhow::Result;
use async_trait::async_trait;
use prometheus::Registry;
use serde::Deserialize;
use serde::Serialize;
use sui_indexer_alt_framework::Indexer;
use sui_indexer_alt_framework::IndexerArgs;
use sui_indexer_alt_framework::ingestion::ClientArgs;
use sui_indexer_alt_framework::pipeline::CommitterConfig;
use sui_indexer_alt_framework::pipeline::concurrent::ConcurrentConfig;
use crate::rate_limiter::CompositeRateLimiter;
use crate::rate_limiter::RateLimiter;
use sui_protocol_config::Chain;
use sui_types::balance_change::BalanceChange;
use sui_types::base_types::ObjectID;
use sui_types::committee::EpochId;
use sui_types::crypto::AuthorityStrongQuorumSignInfo;
use sui_types::digests::CheckpointDigest;
use sui_types::digests::TransactionDigest;
use sui_types::effects::TransactionEffects;
use sui_types::effects::TransactionEvents;
use sui_types::event::Event;
use sui_types::messages_checkpoint::CheckpointContents;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use sui_types::messages_checkpoint::CheckpointSummary;
use sui_types::object::Object;
use sui_types::storage::ObjectKey;
use sui_types::transaction::Transaction;
pub use crate::bigtable::client::BigTableClient;
pub use crate::bigtable::store::BigTableConnection;
pub use crate::bigtable::store::BigTableStore;
pub use crate::handlers::BigTableHandler;
pub use crate::handlers::CheckpointsByDigestPipeline;
pub use crate::handlers::CheckpointsPipeline;
pub use crate::handlers::EpochEndPipeline;
pub use crate::handlers::EpochLegacyBatch;
pub use crate::handlers::EpochLegacyPipeline;
pub use crate::handlers::EpochStartPipeline;
pub use crate::handlers::ObjectsPipeline;
pub use crate::handlers::PackagesByCheckpointPipeline;
pub use crate::handlers::PackagesByIdPipeline;
pub use crate::handlers::PackagesPipeline;
pub use crate::handlers::PrevEpochUpdate;
pub use crate::handlers::ProtocolConfigsPipeline;
pub use crate::handlers::SystemPackagesPipeline;
pub use crate::handlers::TransactionsPipeline;
pub use config::CommitterLayer;
pub use config::ConcurrentLayer;
pub use config::IndexerConfig;
pub use config::IngestionConfig;
pub use config::PipelineLayer;
pub const CHECKPOINTS_PIPELINE: &str =
<BigTableHandler<CheckpointsPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const CHECKPOINTS_BY_DIGEST_PIPELINE: &str =
<BigTableHandler<CheckpointsByDigestPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const TRANSACTIONS_PIPELINE: &str =
<BigTableHandler<TransactionsPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const OBJECTS_PIPELINE: &str =
<BigTableHandler<ObjectsPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const EPOCH_START_PIPELINE: &str =
<BigTableHandler<EpochStartPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const EPOCH_END_PIPELINE: &str =
<BigTableHandler<EpochEndPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const PROTOCOL_CONFIGS_PIPELINE: &str =
<BigTableHandler<ProtocolConfigsPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const EPOCH_LEGACY_PIPELINE: &str =
<EpochLegacyPipeline as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const PACKAGES_PIPELINE: &str =
<BigTableHandler<PackagesPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const PACKAGES_BY_ID_PIPELINE: &str =
<BigTableHandler<PackagesByIdPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const PACKAGES_BY_CHECKPOINT_PIPELINE: &str =
<BigTableHandler<PackagesByCheckpointPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
pub const SYSTEM_PACKAGES_PIPELINE: &str =
<BigTableHandler<SystemPackagesPipeline> as sui_indexer_alt_framework::pipeline::Processor>::NAME;
/// All pipeline names registered by the indexer. Used by `LegacyWatermarkTracker`
/// to know when all pipelines have reported.
pub const ALL_PIPELINE_NAMES: [&str; 12] = [
CHECKPOINTS_PIPELINE,
CHECKPOINTS_BY_DIGEST_PIPELINE,
TRANSACTIONS_PIPELINE,
OBJECTS_PIPELINE,
EPOCH_START_PIPELINE,
EPOCH_END_PIPELINE,
PROTOCOL_CONFIGS_PIPELINE,
EPOCH_LEGACY_PIPELINE,
PACKAGES_PIPELINE,
PACKAGES_BY_ID_PIPELINE,
PACKAGES_BY_CHECKPOINT_PIPELINE,
SYSTEM_PACKAGES_PIPELINE,
];
/// Non-legacy pipeline names used for the default `get_watermark` implementation.
const WATERMARK_PIPELINES: [&str; 11] = [
CHECKPOINTS_PIPELINE,
CHECKPOINTS_BY_DIGEST_PIPELINE,
TRANSACTIONS_PIPELINE,
OBJECTS_PIPELINE,
EPOCH_START_PIPELINE,
EPOCH_END_PIPELINE,
PROTOCOL_CONFIGS_PIPELINE,
PACKAGES_PIPELINE,
PACKAGES_BY_ID_PIPELINE,
PACKAGES_BY_CHECKPOINT_PIPELINE,
SYSTEM_PACKAGES_PIPELINE,
];
static WRITE_LEGACY_DATA: OnceLock<bool> = OnceLock::new();
/// Set whether to write legacy data (legacy watermark row, epoch DEFAULT_COLUMN, tx column).
/// Must be called before creating any pipelines. Panics if called more than once.
pub fn set_write_legacy_data(value: bool) {
WRITE_LEGACY_DATA
.set(value)
.expect("write_legacy_data already set");
}
pub fn write_legacy_data() -> bool {
*WRITE_LEGACY_DATA.get_or_init(|| false)
}
pub struct BigTableIndexer {
pub indexer: Indexer<BigTableStore>,
}
#[derive(Clone, Debug)]
pub struct CheckpointData {
pub summary: CheckpointSummary,
pub contents: CheckpointContents,
pub signatures: AuthorityStrongQuorumSignInfo,
}
#[derive(Clone, Debug)]
pub struct TransactionData {
pub transaction: Transaction,
pub effects: TransactionEffects,
pub events: Option<TransactionEvents>,
pub checkpoint_number: CheckpointSequenceNumber,
pub timestamp: u64,
pub balance_changes: Vec<BalanceChange>,
pub unchanged_loaded_runtime_objects: Vec<ObjectKey>,
}
/// Partial transaction and events for when we only need transaction content for events
#[derive(Clone, Debug)]
pub struct TransactionEventsData {
pub events: Vec<Event>,
pub timestamp_ms: u64,
}
/// Epoch data returned by reader methods.
/// All fields are optional to support partial column queries.
#[derive(Clone, Debug, Default)]
pub struct EpochData {
pub epoch: Option<u64>,
pub protocol_version: Option<u64>,
pub start_timestamp_ms: Option<u64>,
pub start_checkpoint: Option<u64>,
pub reference_gas_price: Option<u64>,
pub system_state: Option<sui_types::sui_system_state::SuiSystemState>,
pub end_timestamp_ms: Option<u64>,
pub end_checkpoint: Option<u64>,
pub cp_hi: Option<u64>,
pub tx_hi: Option<u64>,
pub safe_mode: Option<bool>,
pub total_stake: Option<u64>,
pub storage_fund_balance: Option<u64>,
pub storage_fund_reinvestment: Option<u64>,
pub storage_charge: Option<u64>,
pub storage_rebate: Option<u64>,
pub stake_subsidy_amount: Option<u64>,
pub total_gas_fees: Option<u64>,
pub total_stake_rewards_distributed: Option<u64>,
pub leftover_storage_fund_inflow: Option<u64>,
pub epoch_commitments: Option<Vec<u8>>,
}
/// Package metadata returned by reader methods.
/// The actual serialized object is stored in the `objects` table.
#[derive(Clone, Debug)]
pub struct PackageData {
pub package_id: Vec<u8>,
pub package_version: u64,
pub original_id: Vec<u8>,
pub is_system_package: bool,
pub cp_sequence_number: u64,
}
/// Protocol config data returned by reader methods.
#[derive(Clone, Debug, Default)]
pub struct ProtocolConfigData {
pub configs: std::collections::BTreeMap<String, Option<String>>,
pub flags: std::collections::BTreeMap<String, bool>,
}
/// Serializable watermark for per-pipeline tracking in BigTable.
/// Mirrors the framework's CommitterWatermark type.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Watermark {
pub epoch_hi_inclusive: u64,
pub checkpoint_hi_inclusive: u64,
pub tx_hi: u64,
pub timestamp_ms_hi_inclusive: u64,
}
#[async_trait]
pub trait KeyValueStoreReader {
async fn get_objects(&mut self, objects: &[ObjectKey]) -> Result<Vec<Object>>;
async fn get_transactions(
&mut self,
transactions: &[TransactionDigest],
) -> Result<Vec<TransactionData>>;
async fn get_checkpoints(
&mut self,
sequence_numbers: &[CheckpointSequenceNumber],
) -> Result<Vec<CheckpointData>>;
async fn get_checkpoint_by_digest(
&mut self,
digest: CheckpointDigest,
) -> Result<Option<CheckpointData>>;
/// Return the minimum watermark across the given pipelines, selecting the whole
/// watermark with the lowest `checkpoint_hi_inclusive`. Returns `None` if any
/// pipeline is missing a watermark.
async fn get_watermark_for_pipelines(
&mut self,
pipelines: &[&str],
) -> Result<Option<Watermark>>;
/// Return the minimum watermark across all non-legacy pipelines.
async fn get_watermark(&mut self) -> Result<Option<Watermark>> {
self.get_watermark_for_pipelines(&WATERMARK_PIPELINES).await
}
async fn get_latest_object(&mut self, object_id: &ObjectID) -> Result<Option<Object>>;
async fn get_epoch(&mut self, epoch_id: EpochId) -> Result<Option<EpochData>>;
async fn get_latest_epoch(&mut self) -> Result<Option<EpochData>>;
async fn get_protocol_configs(
&mut self,
protocol_version: u64,
) -> Result<Option<ProtocolConfigData>>;
async fn get_events_for_transactions(
&mut self,
keys: &[TransactionDigest],
) -> Result<Vec<(TransactionDigest, TransactionEventsData)>>;
/// Resolve package_ids to their original_ids.
async fn get_package_original_ids(
&mut self,
package_ids: &[ObjectID],
) -> Result<Vec<(ObjectID, ObjectID)>>;
/// Get packages by (original_id, version) pairs.
async fn get_packages_by_version(
&mut self,
keys: &[(ObjectID, u64)],
) -> Result<Vec<PackageData>>;
/// Get the latest version of a package at or before `cp_bound`.
async fn get_package_latest(
&mut self,
original_id: ObjectID,
cp_bound: u64,
) -> Result<Option<PackageData>>;
/// Paginate package versions for an original_id, filtered by cp_bound.
async fn get_package_versions(
&mut self,
original_id: ObjectID,
cp_bound: u64,
after_version: Option<u64>,
before_version: Option<u64>,
limit: usize,
descending: bool,
) -> Result<Vec<PackageData>>;
/// Get packages created in a checkpoint range, ordered by checkpoint.
async fn get_packages_by_checkpoint_range(
&mut self,
cp_after: Option<u64>,
cp_before: Option<u64>,
limit: usize,
descending: bool,
) -> Result<Vec<PackageData>>;
/// Get all system packages with their latest version at or before `cp_bound`.
async fn get_system_packages(
&mut self,
cp_bound: u64,
after_original_id: Option<ObjectID>,
limit: usize,
) -> Result<Vec<PackageData>>;
}
impl BigTableIndexer {
pub async fn new(
store: BigTableStore,
indexer_args: IndexerArgs,
client_args: ClientArgs,
ingestion_config: IngestionConfig,
committer: CommitterConfig,
config: IndexerConfig,
pipeline: PipelineLayer,
chain: Chain,
registry: &Registry,
) -> Result<Self> {
let mut indexer = Indexer::new(
store,
indexer_args,
client_args,
ingestion_config.into(),
None,
registry,
)
.await?;
let global = config.total_max_rows_per_second.map(RateLimiter::new);
let base_rps = config.max_rows_per_second;
fn build_rate_limiter(
layer: &ConcurrentLayer,
base_rps: Option<u64>,
global: &Option<Arc<RateLimiter>>,
) -> Arc<CompositeRateLimiter> {
let mut limiters = Vec::new();
if let Some(rps) = layer.max_rows_per_second.or(base_rps) {
limiters.push(RateLimiter::new(rps));
}
if let Some(g) = global {
limiters.push(g.clone());
}
Arc::new(CompositeRateLimiter::new(limiters))
}
let base = ConcurrentConfig {
committer,
pruner: None,
..Default::default()
};
indexer
.concurrent_pipeline(
BigTableHandler::new(
CheckpointsPipeline,
&pipeline.checkpoints,
build_rate_limiter(&pipeline.checkpoints, base_rps, &global),
),
pipeline.checkpoints.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
CheckpointsByDigestPipeline,
&pipeline.checkpoints_by_digest,
build_rate_limiter(&pipeline.checkpoints_by_digest, base_rps, &global),
),
pipeline.checkpoints_by_digest.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
TransactionsPipeline,
&pipeline.transactions,
build_rate_limiter(&pipeline.transactions, base_rps, &global),
),
pipeline.transactions.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
ObjectsPipeline,
&pipeline.objects,
build_rate_limiter(&pipeline.objects, base_rps, &global),
),
pipeline.objects.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
EpochStartPipeline,
&pipeline.epoch_start,
build_rate_limiter(&pipeline.epoch_start, base_rps, &global),
),
pipeline.epoch_start.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
EpochEndPipeline,
&pipeline.epoch_end,
build_rate_limiter(&pipeline.epoch_end, base_rps, &global),
),
pipeline.epoch_end.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
ProtocolConfigsPipeline(chain),
&pipeline.protocol_configs,
build_rate_limiter(&pipeline.protocol_configs, base_rps, &global),
),
pipeline.protocol_configs.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
PackagesPipeline,
&pipeline.packages,
build_rate_limiter(&pipeline.packages, base_rps, &global),
),
pipeline.packages.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
PackagesByIdPipeline,
&pipeline.packages_by_id,
build_rate_limiter(&pipeline.packages_by_id, base_rps, &global),
),
pipeline.packages_by_id.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
PackagesByCheckpointPipeline,
&pipeline.packages_by_checkpoint,
build_rate_limiter(&pipeline.packages_by_checkpoint, base_rps, &global),
),
pipeline.packages_by_checkpoint.finish(base.clone()),
)
.await?;
indexer
.concurrent_pipeline(
BigTableHandler::new(
SystemPackagesPipeline,
&pipeline.system_packages,
build_rate_limiter(&pipeline.system_packages, base_rps, &global),
),
pipeline.system_packages.finish(base.clone()),
)
.await?;
if write_legacy_data() {
indexer
.concurrent_pipeline(EpochLegacyPipeline, pipeline.epoch_legacy.finish(base))
.await?;
}
Ok(Self { indexer })
}
pub fn pipeline_names(&self) -> Vec<&'static str> {
self.indexer.pipelines().collect()
}
}
impl From<sui_indexer_alt_framework_store_traits::CommitterWatermark> for Watermark {
fn from(w: sui_indexer_alt_framework_store_traits::CommitterWatermark) -> Self {
Self {
epoch_hi_inclusive: w.epoch_hi_inclusive,
checkpoint_hi_inclusive: w.checkpoint_hi_inclusive,
tx_hi: w.tx_hi,
timestamp_ms_hi_inclusive: w.timestamp_ms_hi_inclusive,
}
}
}
impl From<Watermark> for sui_indexer_alt_framework_store_traits::CommitterWatermark {
fn from(w: Watermark) -> Self {
Self {
epoch_hi_inclusive: w.epoch_hi_inclusive,
checkpoint_hi_inclusive: w.checkpoint_hi_inclusive,
tx_hi: w.tx_hi,
timestamp_ms_hi_inclusive: w.timestamp_ms_hi_inclusive,
}
}
}
main.rs in sui-kv-rpc
main.rs in sui-kv-rpcuse anyhow::Result;
use axum::Router;
use axum::routing::get;
use clap::Parser;
use mysten_network::callback::CallbackLayer;
use prometheus::Registry;
use std::sync::Arc;
use std::time::Duration;
use sui_kv_rpc::KvRpcServer;
use sui_rpc_api::{RpcMetrics, RpcMetricsMakeCallbackHandler, ServerVersion};
use telemetry_subscribers::TelemetryConfig;
use tonic::transport::{Identity, Server, ServerTlsConfig};
bin_version::bin_version!();
#[derive(Parser)]
struct App {
credentials: String,
instance_id: String,
#[clap(default_value = "[::1]:8000")]
address: String,
#[clap(default_value = "127.0.0.1")]
metrics_host: String,
#[clap(default_value_t = 9184)]
metrics_port: usize,
#[clap(long = "tls-cert", default_value = "")]
tls_cert: String,
#[clap(long = "tls-key", default_value = "")]
tls_key: String,
/// GCP project ID for the BigTable instance (defaults to the token provider's project)
#[clap(long = "bigtable-project")]
bigtable_project: Option<String>,
#[clap(long = "app-profile-id")]
app_profile_id: Option<String>,
#[clap(long = "checkpoint-bucket")]
checkpoint_bucket: Option<String>,
/// Channel-level timeout in milliseconds for BigTable gRPC calls (default: 60000)
#[clap(long = "bigtable-channel-timeout-ms")]
bigtable_channel_timeout_ms: Option<u64>,
}
async fn health_check() -> &'static str {
"OK"
}
#[tokio::main]
async fn main() -> Result<()> {
let _guard = TelemetryConfig::new().with_env().init();
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install CryptoProvider");
let app = App::parse();
unsafe {
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", app.credentials.clone());
};
let server_version = Some(ServerVersion::new("sui-kv-rpc", VERSION));
let registry_service = mysten_metrics::start_prometheus_server(
format!("{}:{}", app.metrics_host, app.metrics_port).parse()?,
);
let registry: Registry = registry_service.default_registry();
mysten_metrics::init_metrics(®istry);
let channel_timeout = app.bigtable_channel_timeout_ms.map(Duration::from_millis);
let server = KvRpcServer::new(
app.instance_id,
app.bigtable_project,
app.app_profile_id,
app.checkpoint_bucket,
channel_timeout,
server_version,
®istry,
)
.await?;
let addr = app.address.parse()?;
let mut builder = Server::builder();
if !app.tls_cert.is_empty() && !app.tls_key.is_empty() {
let identity =
Identity::from_pem(std::fs::read(app.tls_cert)?, std::fs::read(app.tls_key)?);
let tls_config = ServerTlsConfig::new().identity(identity);
builder = builder.tls_config(tls_config)?;
}
let reflection_v1 = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(
sui_rpc_api::proto::google::protobuf::FILE_DESCRIPTOR_SET,
)
.register_encoded_file_descriptor_set(sui_rpc_api::proto::google::rpc::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET)
.build_v1()?;
let reflection_v1alpha = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(
sui_rpc_api::proto::google::protobuf::FILE_DESCRIPTOR_SET,
)
.register_encoded_file_descriptor_set(sui_rpc_api::proto::google::rpc::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(sui_rpc::proto::sui::rpc::v2::FILE_DESCRIPTOR_SET)
.build_v1alpha()?;
tokio::spawn(async {
let web_server = Router::new().route("/health", get(health_check));
let listener = tokio::net::TcpListener::bind("0.0.0.0:8081")
.await
.expect("can't bind to the healthcheck port");
axum::serve(listener, web_server.into_make_service())
.await
.expect("healh check service failed");
});
builder
.layer(CallbackLayer::new(RpcMetricsMakeCallbackHandler::new(
Arc::new(RpcMetrics::new(®istry)),
)))
.add_service(
sui_rpc::proto::sui::rpc::v2::ledger_service_server::LedgerServiceServer::new(server),
)
.add_service(reflection_v1)
.add_service(reflection_v1alpha)
.serve(addr)
.await?;
Ok(())
}
Deployment strategies and trade-offsβ
You don't need to index everything to provide a reliable and performant GraphQL RPC service. In fact, many developers might need only the latest object and transaction data plus a few weeks to months of history. You can reduce operational overhead and improve query performance by:
-
Configuring a clear retention window, such as 30 to 90 days in Postgres.
-
Using the Archival Service to handle deep historical queries, rather than retaining all versions in Postgres.
When designing your deployment, consider the trade-offs between cost, reliability, and feature completeness:
-
Postgres-only with short-retention results in lower storage cost and faster performance, but limited historical coverage.
-
Postgres-only with high retention results in broader data coverage, but relatively higher storage cost and slower performance at scale.
-
Postgres with short-retention and Archival Service results in optimization for cost and completeness, ideal for production deployments.
Best practicesβ
To improve performance and reliability, also consider these operational best practices:
-
Try and co-locate your database, indexing pipelines, GraphQL RPC service, and archival service in the same region as your users to minimize latency.
-
Use replication and staged deployments to ensure SLA during upgrades or failures.
-
Consider offering different tiers of service to meet different developer needs. For example:
-
A basic tier that serves recent data (30 days, for example) through GraphQL RPC or gRPC.
-
A premium tier with full GraphQL or gRPC and Archival Service access, suited to apps that need historical lookups.
-
Optionally, offer region-specific instances or throughput-based pricing to support diverse client footprints.
-
Related linksβ
Practical guide to making queries of the Sui RPC using the GraphQL service, with examples for common tasks.
GraphQL is a public service for the Sui RPC that enables you to efficiently interact with the Sui network.
The sui-indexer-alt-framework is a powerful Rust framework for building high-performance, custom blockchain indexers on Sui. It provides customizable, production-ready components for data ingestion, processing, and storage.
Setup instructions for running the indexer, consistent store, and GraphQL in production.
Sui GraphiQL IDE for Testnet (Public good).
Sui GraphiQL IDE for Mainnet (Public good).