📸 Comprehensive NestJS module for generating and managing token holder snapshots across multiple blockchain networks
Enterprise-grade snapshot management solution providing multi-ledger support, real-time progress tracking via WebSockets, IPFS integration, and plugin-based architecture for Hedera Hashgraph and Ripple/XRP Ledger networks with asynchronous processing.
Table of Contents
Quick Start
Installation
Copy npm install @hsuite/snapshots
Basic Setup
Copy import { SnapshotsModule } from '@hsuite/snapshots';
@Module({
imports: [
SnapshotsModule.forRootAsync({
imports: [ConfigModule],
useFactory: (config: ConfigService) => ({
bull: {
redis: {
socket: {
host: config.get('REDIS_HOST'),
port: config.get('REDIS_PORT'),
},
password: config.get('REDIS_PASSWORD')
}
},
jwt: {
secret: config.get('JWT_SECRET'),
signOptions: { expiresIn: '1h' }
}
}),
inject: [ConfigService]
})
]
})
export class AppModule {}
Simple Snapshot Generation
Copy import { SnapshotsService } from '@hsuite/snapshots';
import { IAuth } from '@hsuite/auth-types';
@Injectable()
export class YourService {
constructor(private snapshotsService: SnapshotsService) {}
async generateSnapshot(tokenId: string, session: IAuth.ICredentials.IWeb3.IEntity) {
const result = await this.snapshotsService.generateSnapshot(tokenId, session);
return result; // Returns { jobId: string }
}
}
Architecture
Core Component Areas
📸 Snapshot Generation Engine
Multi-Ledger Support - Hedera Hashgraph and Ripple/XRP Ledger compatibility
Asynchronous Processing - Bull queues for reliable background processing
Real-time Progress - WebSocket events for live snapshot generation updates
Rate Limiting Compliance - Network-specific API rate limiting and batching
🔌 Plugin-Based Architecture
Chain-Specific Handlers - Extensible plugin system for different blockchain networks
Dynamic Configuration - Smart chain selection and configuration management
Modular Design - Easy addition of new blockchain network support
Standardized Interface - Consistent API across all supported networks
🌐 Real-time Communication
WebSocket Gateway - Live progress tracking and event broadcasting
JWT Authentication - Secure WebSocket connections and API access
Event-Driven Updates - Comprehensive event system for all snapshot phases
Client Reconnection - Automatic handling of connection recovery
💾 Storage & Distribution
IPFS Integration - Decentralized storage for generated snapshots
Data Persistence - Reliable snapshot data management
Metadata Handling - Complete token metadata and holder information
Export Formats - Multiple output formats for snapshot data
Module Structure
Copy src/
├── index.ts # Main exports and public API
├── snapshots.module.ts # NestJS module configuration
├── snapshots.service.ts # Core snapshot generation service
├── snapshots.controller.ts # REST API endpoints
├── snapshots.consumer.ts # Bull queue processing
├── snapshots.gateway.ts # WebSocket real-time updates
├── types/ # TypeScript definitions
│ ├── interfaces/ # Interface contracts
│ └── models/ # Concrete implementations
└── ledgers/ # Multi-chain support
└── plugins/ # Blockchain-specific handlers
API Reference
Core Module
SnapshotsModule
Purpose : Main module providing snapshot generation functionality
Features : Redis queue configuration, JWT authentication, plugin registration
Usage : Application-wide snapshot management integration
Core Services
SnapshotsService
Purpose : Core service for initiating snapshot generation
Methods : generateSnapshot(), getSnapshotStatus()
Features : Token validation, job queue management, progress tracking
SnapshotsController
Purpose : REST API controller for HTTP endpoints
Endpoints : POST /snapshots/generate/:tokenId
Features : Request validation, authentication, response formatting
WebSocket Events
Event Type
Description
Payload
Job entered the waiting queue
{ jobId, jobName, snapshotId, status: 'activated', progress: 0 }
Progress update during generation
{ jobId, jobName, snapshotId, status: 'running', progress }
Job completed successfully
{ jobId, jobName, snapshotId, status: 'completed', progress: 100 }
{ jobId, jobName, snapshotId, status: 'error', error }
Supported Networks
Network
Token Types
Features
Rate Limits
Balance snapshots, metadata
25 accounts/batch, 1s delay
Plugin Interface
Copy interface SnapshotLedgerPlugin {
fetchAllTokenBalances(tokenId: string, job: Job): Promise<Array<any>>;
validateToken(jobRequest: Snapshots.Queue.Upload.Request): void;
}
Guides
Types System Documentation
Complete TypeScript definitions and data models for all snapshot operations. Understand the type system, interfaces, and models used throughout the snapshot generation process.
Multi-Chain Ledger Support
Plugin architecture guide for implementing new blockchain network support. Learn how to extend the system to support additional blockchains beyond Hedera and XRPL.
WebSocket Integration Guide
Learn how to implement real-time snapshot progress tracking in your applications. Set up WebSocket connections, handle progress events, and manage client-side state.
IPFS Storage Configuration
Set up decentralized storage for snapshot data with IPFS integration. Configure IPFS nodes, implement pinning strategies, and manage distributed storage for large datasets.
Examples
Advanced Snapshot Service Implementation
Copy import { SnapshotsService, ISnapshots } from '@hsuite/snapshots';
import { Injectable, Logger } from '@nestjs/common';
import { IAuth } from '@hsuite/auth-types';
@Injectable()
export class AdvancedSnapshotService {
private readonly logger = new Logger(AdvancedSnapshotService.name);
constructor(private readonly snapshotsService: SnapshotsService) {}
async generateTokenSnapshot(
tokenId: string,
session: IAuth.ICredentials.IWeb3.IEntity,
options?: SnapshotGenerationOptions
) {
try {
// Validate token format based on chain
this.validateTokenFormat(tokenId);
// Log snapshot request
this.logger.log(`Generating snapshot for token ${tokenId} by user ${session.walletId}`);
// Generate snapshot with monitoring
const result = await this.snapshotsService.generateSnapshot(tokenId, session);
// Set up progress monitoring
if (options?.monitorProgress) {
await this.setupProgressMonitoring(result.jobId, options.progressCallback);
}
return {
success: true,
jobId: result.jobId,
estimatedTime: this.estimateCompletionTime(tokenId),
supportedFormats: ['json', 'csv'],
websocketChannel: 'snapshots_events'
};
} catch (error) {
this.logger.error(`Snapshot generation failed: ${error.message}`);
throw new Error(`Failed to generate snapshot: ${error.message}`);
}
}
async getSnapshotHistory(
session: IAuth.ICredentials.IWeb3.IEntity,
limit: number = 10
) {
try {
const history = await this.getSnapshotsByUser(session.walletId, limit);
return {
success: true,
snapshots: history.map(snapshot => ({
jobId: snapshot.jobId,
tokenId: snapshot.tokenId,
status: snapshot.status,
progress: snapshot.progress,
createdAt: snapshot.createdAt,
completedAt: snapshot.completedAt,
holderCount: snapshot.holderCount,
ipfsCid: snapshot.ipfsCid
})),
totalCount: history.length
};
} catch (error) {
throw new Error(`Failed to retrieve snapshot history: ${error.message}`);
}
}
async cancelSnapshot(jobId: string, session: IAuth.ICredentials.IWeb3.IEntity) {
try {
// Verify ownership
const snapshot = await this.getSnapshotByJobId(jobId);
if (snapshot.userId !== session.walletId) {
throw new Error('Unauthorized: Cannot cancel snapshot from another user');
}
// Cancel the job
await this.cancelSnapshotJob(jobId);
return {
success: true,
jobId: jobId,
status: 'cancelled',
cancelledAt: new Date().toISOString()
};
} catch (error) {
throw new Error(`Failed to cancel snapshot: ${error.message}`);
}
}
private validateTokenFormat(tokenId: string) {
// Hedera format: 0.0.123456
const hederaPattern = /^0\.0\.\d+$/;
// XRP format validation (if needed)
const xrpPattern = /^[A-Z0-9]{40}$/;
if (!hederaPattern.test(tokenId) && !xrpPattern.test(tokenId)) {
throw new Error('Invalid token ID format. Expected Hedera (0.0.123456) or XRP format.');
}
}
private estimateCompletionTime(tokenId: string): string {
// Mock estimation based on network and historical data
const baseTime = this.isHederaToken(tokenId) ? 300 : 180; // seconds
return `${baseTime} seconds (estimated)`;
}
private isHederaToken(tokenId: string): boolean {
return /^0\.0\.\d+$/.test(tokenId);
}
private async setupProgressMonitoring(
jobId: string,
callback?: (progress: number) => void
): Promise<void> {
// Implementation for progress monitoring
// This would typically involve WebSocket subscriptions
}
private async getSnapshotsByUser(userId: string, limit: number): Promise<any[]> {
// Mock implementation - replace with actual database queries
return [];
}
private async getSnapshotByJobId(jobId: string): Promise<any> {
// Mock implementation - replace with actual database queries
return { jobId, userId: 'user-id' };
}
private async cancelSnapshotJob(jobId: string): Promise<void> {
// Implementation for cancelling Bull queue jobs
}
}
interface SnapshotGenerationOptions {
monitorProgress?: boolean;
progressCallback?: (progress: number) => void;
outputFormat?: 'json' | 'csv';
includeMetadata?: boolean;
}
WebSocket Client Integration
Copy import { Injectable } from '@nestjs/common';
import { io, Socket } from 'socket.io-client';
@Injectable()
export class SnapshotWebSocketClient {
private socket: Socket;
private activeSnapshots = new Map<string, SnapshotProgress>();
connect(accessToken: string, serverUrl: string = 'ws://localhost:3000') {
this.socket = io(`${serverUrl}/snapshots`, {
auth: { accessToken },
autoConnect: true,
reconnection: true,
reconnectionAttempts: 5,
reconnectionDelay: 1000
});
this.setupEventHandlers();
}
private setupEventHandlers() {
this.socket.on('connect', () => {
console.log('Connected to snapshot WebSocket server');
});
this.socket.on('disconnect', (reason) => {
console.log('Disconnected from snapshot server:', reason);
});
this.socket.on('snapshots_events', (event) => {
this.handleSnapshotEvent(event);
});
this.socket.on('connect_error', (error) => {
console.error('WebSocket connection error:', error);
});
}
private handleSnapshotEvent(event: any) {
const { type, payload } = event;
switch (type) {
case 'snapshots_waiting':
this.handleSnapshotWaiting(payload);
break;
case 'snapshots_active':
this.handleSnapshotActive(payload);
break;
case 'snapshots_progress':
this.handleSnapshotProgress(payload);
break;
case 'snapshots_completed':
this.handleSnapshotCompleted(payload);
break;
case 'snapshots_failed':
this.handleSnapshotFailed(payload);
break;
default:
console.warn('Unknown snapshot event type:', type);
}
}
private handleSnapshotWaiting(payload: any) {
console.log(`Snapshot ${payload.jobId} is waiting in queue`);
this.activeSnapshots.set(payload.jobId, {
jobId: payload.jobId,
status: 'waiting',
progress: 0,
startedAt: new Date()
});
}
private handleSnapshotActive(payload: any) {
console.log(`Snapshot ${payload.jobId} started processing`);
const snapshot = this.activeSnapshots.get(payload.jobId);
if (snapshot) {
snapshot.status = 'active';
snapshot.progress = payload.progress || 0;
snapshot.snapshotId = payload.snapshotId;
}
// Notify UI components
this.notifyProgressUpdate(payload.jobId, 0, 'started');
}
private handleSnapshotProgress(payload: any) {
console.log(`Snapshot ${payload.jobId} progress: ${payload.progress}%`);
const snapshot = this.activeSnapshots.get(payload.jobId);
if (snapshot) {
snapshot.status = 'running';
snapshot.progress = payload.progress;
snapshot.lastUpdate = new Date();
}
// Notify UI components
this.notifyProgressUpdate(payload.jobId, payload.progress, 'progress');
}
private handleSnapshotCompleted(payload: any) {
console.log(`Snapshot ${payload.jobId} completed successfully`);
const snapshot = this.activeSnapshots.get(payload.jobId);
if (snapshot) {
snapshot.status = 'completed';
snapshot.progress = 100;
snapshot.completedAt = new Date();
snapshot.ipfsCid = payload.ipfsCid;
}
// Notify UI components
this.notifyProgressUpdate(payload.jobId, 100, 'completed');
// Clean up after delay
setTimeout(() => {
this.activeSnapshots.delete(payload.jobId);
}, 60000); // Keep for 1 minute
}
private handleSnapshotFailed(payload: any) {
console.error(`Snapshot ${payload.jobId} failed:`, payload.error);
const snapshot = this.activeSnapshots.get(payload.jobId);
if (snapshot) {
snapshot.status = 'failed';
snapshot.error = payload.error;
snapshot.failedAt = new Date();
}
// Notify UI components
this.notifyProgressUpdate(payload.jobId, snapshot?.progress || 0, 'failed', payload.error);
}
private notifyProgressUpdate(
jobId: string,
progress: number,
status: string,
error?: string
) {
// Emit custom events for UI components to listen to
const customEvent = new CustomEvent('snapshotProgressUpdate', {
detail: { jobId, progress, status, error, timestamp: new Date() }
});
if (typeof window !== 'undefined') {
window.dispatchEvent(customEvent);
}
}
getActiveSnapshots(): Map<string, SnapshotProgress> {
return new Map(this.activeSnapshots);
}
getSnapshotStatus(jobId: string): SnapshotProgress | undefined {
return this.activeSnapshots.get(jobId);
}
disconnect() {
if (this.socket) {
this.socket.disconnect();
}
}
}
interface SnapshotProgress {
jobId: string;
status: 'waiting' | 'active' | 'running' | 'completed' | 'failed';
progress: number;
snapshotId?: string;
startedAt: Date;
lastUpdate?: Date;
completedAt?: Date;
failedAt?: Date;
ipfsCid?: string;
error?: string;
}
Multi-Chain Plugin Implementation
Copy import { Injectable } from '@nestjs/common';
import { Job } from 'bull';
import { ISnapshots } from '@hsuite/snapshots';
@Injectable()
export class CustomChainSnapshotHandler {
async fetchAllTokenBalances(tokenId: string, job: Job): Promise<Array<any>> {
try {
const balances: any[] = [];
let page = 0;
const batchSize = 100;
let hasMore = true;
// Update job progress
await job.progress(0);
while (hasMore) {
const batch = await this.fetchTokenHoldersBatch(tokenId, page, batchSize);
if (batch.length === 0) {
hasMore = false;
break;
}
// Process batch
for (const holder of batch) {
const balance = await this.getAccountBalance(holder.accountId, tokenId);
if (balance > 0) {
balances.push({
accountId: holder.accountId,
balance: balance.toString(),
lastTransactionTime: holder.lastTransaction,
metadata: holder.metadata || {}
});
}
}
// Update progress
page++;
const progress = Math.min(95, (balances.length / this.estimateTotalHolders(tokenId)) * 100);
await job.progress(progress);
// Rate limiting
await this.rateLimitDelay();
}
// Final processing
await job.progress(100);
return this.sortAndFormatBalances(balances);
} catch (error) {
throw new Error(`Failed to fetch token balances: ${error.message}`);
}
}
validateToken(jobRequest: ISnapshots.IQueue.IUpload.IRequest): void {
const tokenId = jobRequest.tokenId;
// Custom token validation logic
if (!this.isValidTokenFormat(tokenId)) {
throw new Error(`Invalid token format for custom chain: ${tokenId}`);
}
// Additional validation
if (!this.tokenExists(tokenId)) {
throw new Error(`Token does not exist: ${tokenId}`);
}
}
private async fetchTokenHoldersBatch(
tokenId: string,
page: number,
batchSize: number
): Promise<any[]> {
// Implementation specific to your blockchain
// This would call the appropriate API for your network
try {
const response = await this.callBlockchainAPI(`/tokens/${tokenId}/holders`, {
page,
limit: batchSize
});
return response.data || [];
} catch (error) {
throw new Error(`API call failed: ${error.message}`);
}
}
private async getAccountBalance(accountId: string, tokenId: string): Promise<number> {
// Get specific account balance for the token
try {
const response = await this.callBlockchainAPI(`/accounts/${accountId}/tokens/${tokenId}`);
return response.balance || 0;
} catch (error) {
console.warn(`Failed to get balance for ${accountId}: ${error.message}`);
return 0;
}
}
private async callBlockchainAPI(endpoint: string, params?: any): Promise<any> {
// Mock API call - replace with actual blockchain API calls
await new Promise(resolve => setTimeout(resolve, 100)); // Simulate API delay
return {
data: [],
balance: 0
};
}
private async rateLimitDelay(): Promise<void> {
// Implement appropriate rate limiting for your blockchain
await new Promise(resolve => setTimeout(resolve, 1000)); // 1 second delay
}
private estimateTotalHolders(tokenId: string): number {
// Estimate total holders for progress calculation
// This could be cached or retrieved from token metadata
return 10000; // Mock estimate
}
private isValidTokenFormat(tokenId: string): boolean {
// Implement token format validation for your chain
return tokenId.length > 0 && /^[a-zA-Z0-9]+$/.test(tokenId);
}
private tokenExists(tokenId: string): boolean {
// Check if token exists on the blockchain
// This would typically involve an API call
return true; // Mock validation
}
private sortAndFormatBalances(balances: any[]): any[] {
// Sort by balance (descending) and format
return balances
.sort((a, b) => parseFloat(b.balance) - parseFloat(a.balance))
.map(balance => ({
...balance,
balance: this.formatBalance(balance.balance),
rank: balances.indexOf(balance) + 1
}));
}
private formatBalance(balance: string): string {
// Format balance according to token decimals
return parseFloat(balance).toFixed(8);
}
}
Integration
Required Dependencies
Copy {
"@nestjs/common": "^10.4.2",
"@nestjs/core": "^10.4.2",
"@hsuite/hashgraph": "^2.0.2",
"@hsuite/auth-types": "^2.1.2",
"@hsuite/ipfs": "^2.0.5",
"@hsuite/nestjs-swagger": "^1.0.3",
"@hashgraph/sdk": "^2.62.0",
"@compodoc/compodoc": "^1.1.23"
}
Module Integration
Copy import { Module } from '@nestjs/common';
import { SnapshotsModule } from '@hsuite/snapshots';
@Module({
imports: [
SnapshotsModule.forRootAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
bull: {
redis: {
socket: {
host: configService.get('REDIS_HOST', 'localhost'),
port: configService.get('REDIS_PORT', 6379)
},
password: configService.get('REDIS_PASSWORD'),
username: configService.get('REDIS_USERNAME'),
database: configService.get('REDIS_DATABASE', 0)
},
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: 10,
removeOnFail: 5
}
},
jwt: {
secret: configService.get('JWT_SECRET'),
signOptions: {
expiresIn: '1h',
issuer: 'hsuite-snapshots'
}
}
}),
inject: [ConfigService]
})
]
})
export class AppModule {}
Environment Configuration
Copy # Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=your-redis-password
REDIS_USERNAME=default
REDIS_DATABASE=0
# JWT Configuration
JWT_SECRET=your-jwt-secret
# Snapshot Settings
SNAPSHOT_BATCH_SIZE=25
SNAPSHOT_RATE_LIMIT_DELAY=1000
SNAPSHOT_MAX_RETRIES=3
# IPFS Configuration
IPFS_API_URL=http://localhost:5001
IPFS_GATEWAY_URL=http://localhost:8080
Processing Flow
📊 Snapshot Generation Pipeline
Request Validation - Token format and authentication verification
Queue Management - Bull queue job creation with Redis persistence
Multi-Chain Processing - Plugin-based blockchain data fetching
Progress Tracking - Real-time WebSocket updates during generation
IPFS Upload - Decentralized storage of final snapshot data
Notification - Complete event broadcasting to clients
Rate Limiting Compliance - Network-specific API batching and delays
Asynchronous Processing - Non-blocking queue-based generation
Real-time Updates - Efficient WebSocket event broadcasting
Error Recovery - Automatic retry with exponential backoff
📸 Multi-Ledger Snapshots : Comprehensive token holder snapshot generation with support for Hedera Hashgraph and Ripple/XRP Ledger networks.
🌐 Real-time Tracking : WebSocket-based progress monitoring with JWT authentication and automatic reconnection handling.
🔌 Plugin Architecture : Extensible multi-chain support with standardized interfaces for easy blockchain network integration.
Built with ❤️ by the HbarSuite Team
Copyright © 2024 HbarSuite. All rights reserved.