Skip to content

Commit

Permalink
[test] 1. add index test; 2. change index job into Handler class
Browse files Browse the repository at this point in the history
  • Loading branch information
welbon committed Aug 28, 2024
1 parent 4ba90d0 commit d8bbc67
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.starcoin.config.SearchJobFactory;
import org.starcoin.indexer.handler.*;
Expand All @@ -25,7 +24,7 @@ public class QuartzConfig {

@Bean
public JobDetail indexerJob() {
return JobBuilder.newJob(IndexerHandle.class).withIdentity("indexer").storeDurably().build();
return JobBuilder.newJob(IndexerHandleJob.class).withIdentity("indexer").storeDurably().build();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package org.starcoin.indexer.handler;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.starcoin.api.BlockRPCClient;
import org.starcoin.api.TransactionRPCClient;

import javax.annotation.PostConstruct;


public class IndexerHandleJob extends QuartzJobBean {
// private static final Logger logger = LoggerFactory.getLogger(IndexerHandleJob.class);

@Value("${starcoin.indexer.bulk_size}")
private long bulkSize;

@Autowired
private ElasticSearchHandler elasticSearchHandler;

@Autowired
private TransactionRPCClient transactionRPCClient;

@Autowired
private BlockRPCClient blockRPCClient;

private LegacyMainIndexHandler legacyIndexHandler;

// @PostConstruct
// public void initOffset() {
// localBlockOffset = elasticSearchHandler.getRemoteOffset();
// //update current handle header
// try {
// if (localBlockOffset != null) {
// Block block = blockRPCClient.getBlockByHeight(localBlockOffset.getBlockHeight());
// if (block != null) {
// currentHandleHeader = block.getHeader();
// } else {
// logger.error("init offset block not exist on chain: {}", localBlockOffset);
// }
//
// } else {
// logger.warn("offset is null,init reset to genesis");
// currentHandleHeader = blockRPCClient.getBlockByHeight(0).getHeader();
// localBlockOffset = new BlockOffset(0, currentHandleHeader.getBlockHash());
// elasticSearchHandler.setRemoteOffset(localBlockOffset);
// logger.info("init offset ok: {}", localBlockOffset);
// }
// } catch (JSONRPC2SessionException e) {
// logger.error("set current header error:", e);
// }
// }
//
// @Override
// protected void executeInternal(JobExecutionContext jobExecutionContext) {
// //read current offset
// if (localBlockOffset == null || currentHandleHeader == null) {
//// logger.warn("local offset error, reset it: {}, {}", localOffset, currentHandleHeader);
// initOffset();
// }
// BlockOffset remoteBlockOffset = elasticSearchHandler.getRemoteOffset();
// logger.info("current remote offset: {}", remoteBlockOffset);
// if (remoteBlockOffset == null) {
// logger.warn("offset must not null, please check blocks.mapping!!");
// return;
// }
// if (remoteBlockOffset.getBlockHeight() > localBlockOffset.getBlockHeight()) {
// logger.info("indexer equalize chain blocks.");
// return;
// }
// //read head
// try {
// BlockHeader chainHeader = blockRPCClient.getChainHeader();
// //calculate bulk size
// long headHeight = chainHeader.getHeight();
// long bulkNumber = Math.min(headHeight - localBlockOffset.getBlockHeight(), bulkSize);
// int index = 1;
// List<Block> blockList = new ArrayList<>();
// while (index <= bulkNumber) {
// long readNumber = localBlockOffset.getBlockHeight() + index;
// Block block = blockRPCClient.getBlockByHeight(readNumber);
// if (!block.getHeader().getParentHash().equals(currentHandleHeader.getBlockHash())) {
// //fork handle until reach forked point block
// logger.warn("Fork detected, roll back: {}, {}, {}", readNumber, block.getHeader().getParentHash(), currentHandleHeader.getBlockHash());
// Block lastForkBlock, lastMasterBlock;
// BlockHeader forkHeader = currentHandleHeader;
// long lastMasterNumber = readNumber - 1;
// String forkHeaderParentHash = null;
// do {
// //获取分叉的block
// if (forkHeaderParentHash == null) {
// //第一次先回滚当前最高的分叉块
// forkHeaderParentHash = forkHeader.getBlockHash();
// } else {
// forkHeaderParentHash = forkHeader.getParentHash();
// }
// lastForkBlock = elasticSearchHandler.getBlockContent(forkHeaderParentHash);
// if (lastForkBlock == null) {
// logger.warn("get fork block null: {}", forkHeaderParentHash);
// //read from node
// lastForkBlock = blockRPCClient.getBlockByHash(forkHeaderParentHash);
// }
// if (lastForkBlock != null) {
// elasticSearchHandler.bulkForkedUpdate(lastForkBlock);
// logger.info("rollback forked block ok: {}, {}", lastForkBlock.getHeader().getHeight(), forkHeaderParentHash);
// } else {
// //如果块取不到,先退出当前任务,下一个轮询周期再执行
// logger.warn("get forked block is null: {}", forkHeaderParentHash);
// return;
// }
//
// //获取上一个高度主块
// lastMasterBlock = blockRPCClient.getBlockByHeight(lastMasterNumber);
// if (lastMasterBlock != null) {
// long forkNumber = forkHeader.getHeight();
// logger.info("fork number: {}", forkNumber);
// forkHeader = lastForkBlock.getHeader();
// //reset offset to handled fork block
// currentHandleHeader = forkHeader;
// localBlockOffset.setBlockHeight(currentHandleHeader.getHeight());
// localBlockOffset.setBlockHash(currentHandleHeader.getBlockHash());
// elasticSearchHandler.setRemoteOffset(localBlockOffset);
// if (lastMasterNumber == forkNumber && lastMasterBlock.getHeader().getBlockHash().equals(forkHeaderParentHash)) {
// //find fork point
// logger.info("find fork height: {}", lastMasterNumber);
// break;
// }
// //继续找下一个分叉
// lastMasterNumber--;
// logger.info("continue last forked block: {}", lastMasterNumber);
// } else {
// logger.warn("get last master Block null: {}", lastMasterNumber);
// }
// } while (true);
//
// logger.info("rollback handle ok: {}", localBlockOffset);
// return; //退出当前任务,重新添加从分叉点之后的block
// }
//
// //set event
// ServiceUtils.fetchTransactionsForBlock(transactionRPCClient, block);
// blockList.add(block);
//
// //update current header
// currentHandleHeader = block.getHeader();
// index++;
// logger.debug("add block: {}", block.getHeader());
// }
// //bulk execute
// elasticSearchHandler.bulk(blockList);
//
// //update offset
// localBlockOffset.setBlockHeight(currentHandleHeader.getHeight());
// localBlockOffset.setBlockHash(currentHandleHeader.getBlockHash());
// elasticSearchHandler.setRemoteOffset(localBlockOffset);
// logger.info("indexer update success: {}", localBlockOffset);
// } catch (JSONRPC2SessionException e) {
// logger.error("chain header error:", e);
// }
// }

@PostConstruct
public void initOffset() {
if (legacyIndexHandler == null) {
legacyIndexHandler = new LegacyMainIndexHandler(
elasticSearchHandler,
transactionRPCClient,
blockRPCClient,
bulkSize
);
legacyIndexHandler.initOffset();
}
}

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) {
legacyIndexHandler.execute();
}
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,54 @@
package org.starcoin.indexer.handler;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.quartz.JobExecutionContext;

import com.thetransactioncompany.jsonrpc2.client.JSONRPC2SessionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.starcoin.api.BlockRPCClient;
import org.starcoin.api.TransactionRPCClient;
import org.starcoin.bean.Block;
import org.starcoin.bean.BlockHeader;
import org.starcoin.bean.BlockOffset;
import com.thetransactioncompany.jsonrpc2.client.JSONRPC2SessionException;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;


public class IndexerHandle extends QuartzJobBean {
private static final Logger logger = LoggerFactory.getLogger(IndexerHandle.class);
public class LegacyMainIndexHandler {
private static final Logger logger = LoggerFactory.getLogger(LegacyMainIndexHandler.class);

private BlockOffset localBlockOffset;
private BlockHeader currentHandleHeader;

@Value("${starcoin.network}")
private String network;

@Value("${starcoin.indexer.bulk_size}")
private long bulkSize;
private BlockHeader currentHandleHeader;

@Autowired
private ElasticSearchHandler elasticSearchHandler;


@Autowired
private TransactionRPCClient transactionRPCClient;

@Autowired
private BlockRPCClient blockRPCClient;

private Long bulkSize;

public LegacyMainIndexHandler(
ElasticSearchHandler elasticSearchHandler,
TransactionRPCClient transactionRPCClient,
BlockRPCClient blockRPCClient,
Long bulkSize
) {
this.elasticSearchHandler = elasticSearchHandler;
this.transactionRPCClient = transactionRPCClient;
this.blockRPCClient = blockRPCClient;
this.bulkSize = bulkSize;
}

public void initOffsetWith(Long height, String blockHash) throws JSONRPC2SessionException {
localBlockOffset = new BlockOffset(height, blockHash);
Block block = blockRPCClient.getBlockByHeight(height);
if (block != null) {
currentHandleHeader = block.getHeader();
} else {
logger.error("init offset block not exist on chain: {}", localBlockOffset);
}
}

@PostConstruct
public void initOffset() {
localBlockOffset = elasticSearchHandler.getRemoteOffset();
//update current handle header
Expand All @@ -66,8 +73,8 @@ public void initOffset() {
}
}

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) {

public void execute() {
//read current offset
if (localBlockOffset == null || currentHandleHeader == null) {
// logger.warn("local offset error, reset it: {}, {}", localOffset, currentHandleHeader);
Expand Down Expand Up @@ -173,4 +180,4 @@ protected void executeInternal(JobExecutionContext jobExecutionContext) {
logger.error("chain header error:", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

public class MarketCapIndexer extends QuartzJobBean {
private static final Logger logger = LoggerFactory.getLogger(MarketCapIndexer.class);

@Autowired
private MarketCapHandle handle;

@Autowired
private AddressHolderService addressHolderService;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.starcoin.indexer.test;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.starcoin.api.BlockRPCClient;
import org.starcoin.api.TransactionRPCClient;
import org.starcoin.indexer.handler.ElasticSearchHandler;
import org.starcoin.indexer.handler.LegacyMainIndexHandler;

public class IndexHandlerJobTest extends IndexerLogicBaseTest {

@Value("${starcoin.indexer.bulk_size}")
private long bulkSize;

@Autowired
private ElasticSearchHandler elasticSearchHandler;

@Autowired
private TransactionRPCClient transactionRPCClient;

@Autowired
private BlockRPCClient blockRPCClient;

@Test
public void testIndexerHandle() throws Exception {
LegacyMainIndexHandler legacyMainIndexHandler = new LegacyMainIndexHandler(elasticSearchHandler, transactionRPCClient, blockRPCClient, bulkSize);
legacyMainIndexHandler.initOffsetWith(2713240L, "0x4d58d276809bd061ba422a4699c90c790efc5dd1b6d40e8c2adb0b1cb98dfafd");
legacyMainIndexHandler.execute();
}
}

0 comments on commit d8bbc67

Please sign in to comment.