From bcd6e35ed9fa2c587fa2339741c1ea723fec5611 Mon Sep 17 00:00:00 2001 From: umangbhola Date: Thu, 1 Jul 2021 23:18:59 +0530 Subject: [PATCH] Parititon --- src/main/java/com/uci/dao/Application.java | 21 ++++++++----- .../com/uci/dao/config/CassandraConfig.java | 31 +++++++++---------- .../java/com/uci/dao/models/XMessageDAO.java | 13 ++++++-- .../dao/repository/XMessageRepository.java | 4 +++ ...tion.properties => application.properties} | 0 5 files changed, 42 insertions(+), 27 deletions(-) rename src/main/resources/{dao-application.properties => application.properties} (100%) diff --git a/src/main/java/com/uci/dao/Application.java b/src/main/java/com/uci/dao/Application.java index eec4f73..d13b24d 100644 --- a/src/main/java/com/uci/dao/Application.java +++ b/src/main/java/com/uci/dao/Application.java @@ -8,15 +8,18 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationProperties; +import reactor.core.publisher.SignalType; import java.time.LocalDateTime; +import java.util.List; import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; /** * @author chakshu */ @Slf4j -@ConfigurationProperties("dao-application.properties") @SpringBootApplication(scanBasePackages = "com.uci.dao") public class Application implements CommandLineRunner { @@ -31,15 +34,17 @@ public static void main(String[] args) { @Override public void run(String... args) throws Exception { - - UUID id = UUID.randomUUID(); xMessageRepository.insert(new XMessageDAO(new Long(121313), "HHBJ", "hkkh", "efef", "grdgrdg", LocalDateTime.now(), "HHBJ", "hkkh", "efef", "grdgrdg", "HHBJ", "hkkh", "efef")).log().subscribe(); - xMessageRepository.insert(new XMessageDAO(new Long(1213134), "HHBJ", "hkkh", "efef", "grdgrdg", LocalDateTime.now(), + xMessageRepository.insert(new XMessageDAO(new Long(1213134), "HHdeBJ", "hkkh", "efef", "grdgrdg", LocalDateTime.now(), "HHBJ", "hkkh", "efef", "grdgrdg", "HHBJ", "hkkh", "efef")).log().subscribe(); - xMessageRepository.findAll().subscribe(xMessageDAO -> { - log.info("XMessage List Item :>> " + counter + " " + xMessageDAO); - counter += 1; - }); + xMessageRepository.findAllByUserIdOrderByTimestamp("HHBJ").subscribe(new + Consumer>() { + @Override + public void accept(List xMessageDAOS) { + log.debug("Vevetf"+xMessageDAOS.size()); + + } + }); } } \ No newline at end of file diff --git a/src/main/java/com/uci/dao/config/CassandraConfig.java b/src/main/java/com/uci/dao/config/CassandraConfig.java index 5029eef..aca9fff 100644 --- a/src/main/java/com/uci/dao/config/CassandraConfig.java +++ b/src/main/java/com/uci/dao/config/CassandraConfig.java @@ -72,21 +72,20 @@ protected List getKeyspaceDrops() { protected List getStartupScripts() { - return Collections.singletonList("CREATE TABLE IF NOT EXISTS " + - keyspace + ".XMessage(id bigint," + - "userId text, " + - "fromId text, " + - "channel text, " + - "provider text, " + - "timestamp timestamp, " + - "messageState text, " + - "xMessage text, " + - "app text, " + - "auxData text, " + - "messageId text, " + - "replyId text, " + - "causeId text, " + - "PRIMARY KEY (id) " + - ") WITH default_time_to_live = 600;"); + return Collections.singletonList("CREATE TABLE IF NOT EXISTS "+keyspace+".XMessage(id bigint, \n" + + " userId text, \n" + + " fromId text, \n" + + " channel text, \n" + + " provider text, \n" + + " timestamp timestamp, \n" + + " messageState text, \n" + + " xMessage text, \n" + + " app text, \n" + + " auxData text, \n" + + " messageId text, \n" + + " replyId text, \n" + + " causeId text, \n" + + " PRIMARY KEY (userId,timestamp))\n" + + " WITH CLUSTERING ORDER BY(timestamp desc);"); } } diff --git a/src/main/java/com/uci/dao/models/XMessageDAO.java b/src/main/java/com/uci/dao/models/XMessageDAO.java index ec2a366..a3dc9d9 100644 --- a/src/main/java/com/uci/dao/models/XMessageDAO.java +++ b/src/main/java/com/uci/dao/models/XMessageDAO.java @@ -1,12 +1,18 @@ package com.uci.dao.models; import lombok.*; +import org.springframework.data.cassandra.core.cql.Ordering; +import org.springframework.data.cassandra.core.cql.PrimaryKeyType; import org.springframework.data.cassandra.core.mapping.Column; import org.springframework.data.cassandra.core.mapping.PrimaryKey; +import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn; import org.springframework.data.cassandra.core.mapping.Table; import java.time.LocalDateTime; +import static org.springframework.data.cassandra.core.cql.PrimaryKeyType.CLUSTERED; +import static org.springframework.data.cassandra.core.cql.PrimaryKeyType.PARTITIONED; + @Getter @Setter @NoArgsConstructor @@ -14,9 +20,10 @@ @Builder @Table("XMessage") public class XMessageDAO { - @PrimaryKey - private Long id; @Column + private Long id; + + @PrimaryKeyColumn(type = PrimaryKeyType.PARTITIONED) private String userId; @Column private String fromId; @@ -24,7 +31,7 @@ public class XMessageDAO { private String channel; @Column private String provider; - @Column + @PrimaryKeyColumn(type = CLUSTERED, ordering = Ordering.DESCENDING) private LocalDateTime timestamp; @Column private String messageState; diff --git a/src/main/java/com/uci/dao/repository/XMessageRepository.java b/src/main/java/com/uci/dao/repository/XMessageRepository.java index aa8aa07..323cc76 100644 --- a/src/main/java/com/uci/dao/repository/XMessageRepository.java +++ b/src/main/java/com/uci/dao/repository/XMessageRepository.java @@ -1,6 +1,7 @@ package com.uci.dao.repository; import com.uci.dao.models.XMessageDAO; +import org.springframework.data.cassandra.repository.AllowFiltering; import org.springframework.data.cassandra.repository.ReactiveCassandraRepository; import org.springframework.stereotype.Repository; import reactor.core.publisher.Flux; @@ -16,6 +17,7 @@ public interface XMessageRepository extends ReactiveCassandraRepository findFirstByAppOrderByTimestampDesc(String appName); + @AllowFiltering Flux> findAllByUserId(String userID); Flux findByMessageId(String messageID); @@ -24,8 +26,10 @@ public interface XMessageRepository extends ReactiveCassandraRepository findFirstByCauseIdAndMessageStateOrderByTimestampDesc(String causeId, String messageState); + @AllowFiltering Flux> findAllByUserIdOrderByTimestampDesc(String userID); + @AllowFiltering Flux> findAllByUserIdOrderByTimestamp(String userID); Flux findTopByUserIdAndMessageStateOrderByTimestampDesc(String userId, String messageState); diff --git a/src/main/resources/dao-application.properties b/src/main/resources/application.properties similarity index 100% rename from src/main/resources/dao-application.properties rename to src/main/resources/application.properties