From d8bbc67e24700948bc11bcd2c1ad20b27d42bc70 Mon Sep 17 00:00:00 2001 From: Bob Ong Date: Wed, 28 Aug 2024 18:44:51 +0800 Subject: [PATCH] [test] 1. add index test; 2. change index job into Handler class --- .../starcoin/indexer/config/QuartzConfig.java | 3 +- .../indexer/handler/IndexerHandleJob.java | 181 ++++++++++++++++++ ...andle.java => LegacyMainIndexHandler.java} | 55 +++--- .../indexer/handler/MarketCapIndexer.java | 2 + .../indexer/test/IndexHandlerJobTest.java | 31 +++ 5 files changed, 246 insertions(+), 26 deletions(-) create mode 100644 starcoin-indexer/src/main/java/org/starcoin/indexer/handler/IndexerHandleJob.java rename starcoin-indexer/src/main/java/org/starcoin/indexer/handler/{IndexerHandle.java => LegacyMainIndexHandler.java} (88%) create mode 100644 starcoin-indexer/src/test/java/org/starcoin/indexer/test/IndexHandlerJobTest.java diff --git a/starcoin-indexer/src/main/java/org/starcoin/indexer/config/QuartzConfig.java b/starcoin-indexer/src/main/java/org/starcoin/indexer/config/QuartzConfig.java index 8ca2b3d..9048169 100644 --- a/starcoin-indexer/src/main/java/org/starcoin/indexer/config/QuartzConfig.java +++ b/starcoin-indexer/src/main/java/org/starcoin/indexer/config/QuartzConfig.java @@ -5,7 +5,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.starcoin.config.SearchJobFactory; import org.starcoin.indexer.handler.*; @@ -25,7 +24,7 @@ public class QuartzConfig { @Bean public JobDetail indexerJob() { - return JobBuilder.newJob(IndexerHandle.class).withIdentity("indexer").storeDurably().build(); + return JobBuilder.newJob(IndexerHandleJob.class).withIdentity("indexer").storeDurably().build(); } @Bean diff --git a/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/IndexerHandleJob.java b/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/IndexerHandleJob.java new file mode 100644 index 0000000..dfb33cd --- /dev/null +++ b/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/IndexerHandleJob.java @@ -0,0 +1,181 @@ +package org.starcoin.indexer.handler; +import org.quartz.JobExecutionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.quartz.QuartzJobBean; +import org.starcoin.api.BlockRPCClient; +import org.starcoin.api.TransactionRPCClient; + +import javax.annotation.PostConstruct; + + +public class IndexerHandleJob extends QuartzJobBean { + // private static final Logger logger = LoggerFactory.getLogger(IndexerHandleJob.class); + + @Value("${starcoin.indexer.bulk_size}") + private long bulkSize; + + @Autowired + private ElasticSearchHandler elasticSearchHandler; + + @Autowired + private TransactionRPCClient transactionRPCClient; + + @Autowired + private BlockRPCClient blockRPCClient; + + private LegacyMainIndexHandler legacyIndexHandler; + +// @PostConstruct +// public void initOffset() { +// localBlockOffset = elasticSearchHandler.getRemoteOffset(); +// //update current handle header +// try { +// if (localBlockOffset != null) { +// Block block = blockRPCClient.getBlockByHeight(localBlockOffset.getBlockHeight()); +// if (block != null) { +// currentHandleHeader = block.getHeader(); +// } else { +// logger.error("init offset block not exist on chain: {}", localBlockOffset); +// } +// +// } else { +// logger.warn("offset is null,init reset to genesis"); +// currentHandleHeader = blockRPCClient.getBlockByHeight(0).getHeader(); +// localBlockOffset = new BlockOffset(0, currentHandleHeader.getBlockHash()); +// elasticSearchHandler.setRemoteOffset(localBlockOffset); +// logger.info("init offset ok: {}", localBlockOffset); +// } +// } catch (JSONRPC2SessionException e) { +// logger.error("set current header error:", e); +// } +// } +// +// @Override +// protected void executeInternal(JobExecutionContext jobExecutionContext) { +// //read current offset +// if (localBlockOffset == null || currentHandleHeader == null) { +//// logger.warn("local offset error, reset it: {}, {}", localOffset, currentHandleHeader); +// initOffset(); +// } +// BlockOffset remoteBlockOffset = elasticSearchHandler.getRemoteOffset(); +// logger.info("current remote offset: {}", remoteBlockOffset); +// if (remoteBlockOffset == null) { +// logger.warn("offset must not null, please check blocks.mapping!!"); +// return; +// } +// if (remoteBlockOffset.getBlockHeight() > localBlockOffset.getBlockHeight()) { +// logger.info("indexer equalize chain blocks."); +// return; +// } +// //read head +// try { +// BlockHeader chainHeader = blockRPCClient.getChainHeader(); +// //calculate bulk size +// long headHeight = chainHeader.getHeight(); +// long bulkNumber = Math.min(headHeight - localBlockOffset.getBlockHeight(), bulkSize); +// int index = 1; +// List blockList = new ArrayList<>(); +// while (index <= bulkNumber) { +// long readNumber = localBlockOffset.getBlockHeight() + index; +// Block block = blockRPCClient.getBlockByHeight(readNumber); +// if (!block.getHeader().getParentHash().equals(currentHandleHeader.getBlockHash())) { +// //fork handle until reach forked point block +// logger.warn("Fork detected, roll back: {}, {}, {}", readNumber, block.getHeader().getParentHash(), currentHandleHeader.getBlockHash()); +// Block lastForkBlock, lastMasterBlock; +// BlockHeader forkHeader = currentHandleHeader; +// long lastMasterNumber = readNumber - 1; +// String forkHeaderParentHash = null; +// do { +// //获取分叉的block +// if (forkHeaderParentHash == null) { +// //第一次先回滚当前最高的分叉块 +// forkHeaderParentHash = forkHeader.getBlockHash(); +// } else { +// forkHeaderParentHash = forkHeader.getParentHash(); +// } +// lastForkBlock = elasticSearchHandler.getBlockContent(forkHeaderParentHash); +// if (lastForkBlock == null) { +// logger.warn("get fork block null: {}", forkHeaderParentHash); +// //read from node +// lastForkBlock = blockRPCClient.getBlockByHash(forkHeaderParentHash); +// } +// if (lastForkBlock != null) { +// elasticSearchHandler.bulkForkedUpdate(lastForkBlock); +// logger.info("rollback forked block ok: {}, {}", lastForkBlock.getHeader().getHeight(), forkHeaderParentHash); +// } else { +// //如果块取不到,先退出当前任务,下一个轮询周期再执行 +// logger.warn("get forked block is null: {}", forkHeaderParentHash); +// return; +// } +// +// //获取上一个高度主块 +// lastMasterBlock = blockRPCClient.getBlockByHeight(lastMasterNumber); +// if (lastMasterBlock != null) { +// long forkNumber = forkHeader.getHeight(); +// logger.info("fork number: {}", forkNumber); +// forkHeader = lastForkBlock.getHeader(); +// //reset offset to handled fork block +// currentHandleHeader = forkHeader; +// localBlockOffset.setBlockHeight(currentHandleHeader.getHeight()); +// localBlockOffset.setBlockHash(currentHandleHeader.getBlockHash()); +// elasticSearchHandler.setRemoteOffset(localBlockOffset); +// if (lastMasterNumber == forkNumber && lastMasterBlock.getHeader().getBlockHash().equals(forkHeaderParentHash)) { +// //find fork point +// logger.info("find fork height: {}", lastMasterNumber); +// break; +// } +// //继续找下一个分叉 +// lastMasterNumber--; +// logger.info("continue last forked block: {}", lastMasterNumber); +// } else { +// logger.warn("get last master Block null: {}", lastMasterNumber); +// } +// } while (true); +// +// logger.info("rollback handle ok: {}", localBlockOffset); +// return; //退出当前任务,重新添加从分叉点之后的block +// } +// +// //set event +// ServiceUtils.fetchTransactionsForBlock(transactionRPCClient, block); +// blockList.add(block); +// +// //update current header +// currentHandleHeader = block.getHeader(); +// index++; +// logger.debug("add block: {}", block.getHeader()); +// } +// //bulk execute +// elasticSearchHandler.bulk(blockList); +// +// //update offset +// localBlockOffset.setBlockHeight(currentHandleHeader.getHeight()); +// localBlockOffset.setBlockHash(currentHandleHeader.getBlockHash()); +// elasticSearchHandler.setRemoteOffset(localBlockOffset); +// logger.info("indexer update success: {}", localBlockOffset); +// } catch (JSONRPC2SessionException e) { +// logger.error("chain header error:", e); +// } +// } + + @PostConstruct + public void initOffset() { + if (legacyIndexHandler == null) { + legacyIndexHandler = new LegacyMainIndexHandler( + elasticSearchHandler, + transactionRPCClient, + blockRPCClient, + bulkSize + ); + legacyIndexHandler.initOffset(); + } + } + + @Override + protected void executeInternal(JobExecutionContext jobExecutionContext) { + legacyIndexHandler.execute(); + } +} \ No newline at end of file diff --git a/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/IndexerHandle.java b/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/LegacyMainIndexHandler.java similarity index 88% rename from starcoin-indexer/src/main/java/org/starcoin/indexer/handler/IndexerHandle.java rename to starcoin-indexer/src/main/java/org/starcoin/indexer/handler/LegacyMainIndexHandler.java index 98451c2..f70b8e5 100644 --- a/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/IndexerHandle.java +++ b/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/LegacyMainIndexHandler.java @@ -1,47 +1,54 @@ package org.starcoin.indexer.handler; -import com.fasterxml.jackson.core.JsonProcessingException; -import org.quartz.JobExecutionContext; + +import com.thetransactioncompany.jsonrpc2.client.JSONRPC2SessionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.quartz.QuartzJobBean; import org.starcoin.api.BlockRPCClient; import org.starcoin.api.TransactionRPCClient; import org.starcoin.bean.Block; import org.starcoin.bean.BlockHeader; import org.starcoin.bean.BlockOffset; -import com.thetransactioncompany.jsonrpc2.client.JSONRPC2SessionException; -import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; - -public class IndexerHandle extends QuartzJobBean { - private static final Logger logger = LoggerFactory.getLogger(IndexerHandle.class); +public class LegacyMainIndexHandler { + private static final Logger logger = LoggerFactory.getLogger(LegacyMainIndexHandler.class); private BlockOffset localBlockOffset; - private BlockHeader currentHandleHeader; - - @Value("${starcoin.network}") - private String network; - @Value("${starcoin.indexer.bulk_size}") - private long bulkSize; + private BlockHeader currentHandleHeader; - @Autowired private ElasticSearchHandler elasticSearchHandler; - - @Autowired private TransactionRPCClient transactionRPCClient; - @Autowired private BlockRPCClient blockRPCClient; + private Long bulkSize; + + public LegacyMainIndexHandler( + ElasticSearchHandler elasticSearchHandler, + TransactionRPCClient transactionRPCClient, + BlockRPCClient blockRPCClient, + Long bulkSize + ) { + this.elasticSearchHandler = elasticSearchHandler; + this.transactionRPCClient = transactionRPCClient; + this.blockRPCClient = blockRPCClient; + this.bulkSize = bulkSize; + } + + public void initOffsetWith(Long height, String blockHash) throws JSONRPC2SessionException { + localBlockOffset = new BlockOffset(height, blockHash); + Block block = blockRPCClient.getBlockByHeight(height); + if (block != null) { + currentHandleHeader = block.getHeader(); + } else { + logger.error("init offset block not exist on chain: {}", localBlockOffset); + } + } - @PostConstruct public void initOffset() { localBlockOffset = elasticSearchHandler.getRemoteOffset(); //update current handle header @@ -66,8 +73,8 @@ public void initOffset() { } } - @Override - protected void executeInternal(JobExecutionContext jobExecutionContext) { + + public void execute() { //read current offset if (localBlockOffset == null || currentHandleHeader == null) { // logger.warn("local offset error, reset it: {}, {}", localOffset, currentHandleHeader); @@ -173,4 +180,4 @@ protected void executeInternal(JobExecutionContext jobExecutionContext) { logger.error("chain header error:", e); } } -} \ No newline at end of file +} diff --git a/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/MarketCapIndexer.java b/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/MarketCapIndexer.java index ab5a4cd..a706927 100644 --- a/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/MarketCapIndexer.java +++ b/starcoin-indexer/src/main/java/org/starcoin/indexer/handler/MarketCapIndexer.java @@ -13,8 +13,10 @@ public class MarketCapIndexer extends QuartzJobBean { private static final Logger logger = LoggerFactory.getLogger(MarketCapIndexer.class); + @Autowired private MarketCapHandle handle; + @Autowired private AddressHolderService addressHolderService; diff --git a/starcoin-indexer/src/test/java/org/starcoin/indexer/test/IndexHandlerJobTest.java b/starcoin-indexer/src/test/java/org/starcoin/indexer/test/IndexHandlerJobTest.java new file mode 100644 index 0000000..228d85b --- /dev/null +++ b/starcoin-indexer/src/test/java/org/starcoin/indexer/test/IndexHandlerJobTest.java @@ -0,0 +1,31 @@ +package org.starcoin.indexer.test; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.starcoin.api.BlockRPCClient; +import org.starcoin.api.TransactionRPCClient; +import org.starcoin.indexer.handler.ElasticSearchHandler; +import org.starcoin.indexer.handler.LegacyMainIndexHandler; + +public class IndexHandlerJobTest extends IndexerLogicBaseTest { + + @Value("${starcoin.indexer.bulk_size}") + private long bulkSize; + + @Autowired + private ElasticSearchHandler elasticSearchHandler; + + @Autowired + private TransactionRPCClient transactionRPCClient; + + @Autowired + private BlockRPCClient blockRPCClient; + + @Test + public void testIndexerHandle() throws Exception { + LegacyMainIndexHandler legacyMainIndexHandler = new LegacyMainIndexHandler(elasticSearchHandler, transactionRPCClient, blockRPCClient, bulkSize); + legacyMainIndexHandler.initOffsetWith(2713240L, "0x4d58d276809bd061ba422a4699c90c790efc5dd1b6d40e8c2adb0b1cb98dfafd"); + legacyMainIndexHandler.execute(); + } +}