Skip to content

Java客户端 数据库分片

He, Jiehui edited this page Apr 2, 2018 · 17 revisions

简介

本文主要说明dal的分片使用方式。包括

  1. 如何配置shard策略
  2. 如何在运行中选取shard
  3. 基于SQL的跨shard操作
  4. 基于单表的跨shard操作

注意

如无特别说明,所有的字段,表名等字符相关信息全部是大小写敏感的

为了展示功能,这里大量使用了dal的单元测试代码。实际使用中请使用code gen生成的dao,不要自己调用DalTableDao,DalQueryDao或者DalClient接口及实现。

since 1.15.0. DalShardingStrategy#locateTableShard接口添加了tableName参数。

String locateTableShard(DalConfigure configure, String logicDbName, String tabelName, DalHints hints);

配置指南

示例配置

<databaseSet name="HA_Test" provider="mySqlProvider" shardingStrategy="">
    <add  name="dao_test_M" databaseType="Master" sharding="" connectionString="dao_test"/>
    <add  name="dao_test_S1" databaseType="Slave" sharding="" connectionString="dao_test"/>
    <add  name="dao_test_S2" databaseType="Slave" sharding="" connectionString="dao_test_1"/>
    <add  name="dao_test_S3" databaseType="Slave" sharding="" connectionString="dao_test_2"/>
</databaseSet>
<databaseSet name="dao_test_mod" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.ShardColModShardStrategy;columns=id;mod=2" >
  <add  name="dao_test_sqlsvr_M" databaseType="Master" sharding="0" connectionString="HotelPubDB"/>
  <add  name="dao_test_sqlsvr_S" databaseType="Slave" sharding="0" connectionString="HotelPubDB"/>
  <add  name="dao_test_mysql_M" databaseType="Master" sharding="1" connectionString="dao_test"/>
  <add  name="dao_test_mysql_S" databaseType="Slave" sharding="1" connectionString="dao_test"/>
</databaseSet>
<databaseSet name="dao_test_simple" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.SimpleShardHintStrategy;shardByDb=true" >
  <add  name="dao_test_sqlsvr_M" databaseType="Master" sharding="0" connectionString="HotelPubDB"/>
  <add  name="dao_test_mysql_M" databaseType="Master" sharding="1" connectionString="dao_test"/>
</databaseSet> 
<databaseSet name="dao_test_sqlsvr_tableShard" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.ShardColModShardStrategy;tableColumns=index,tableIndex;tableMod=4;separator=_;shardedTables=dal_client_test" >
  <add  name="dao_test_sqlsvr_M" databaseType="Master" sharding="" connectionString="HotelPubDB"/>
</databaseSet>
<databaseSet name="dao_test_sqlsvr_dbShard" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.ShardColModShardStrategy;columns=index,tableIndex;mod=2;">
  <add  name="dao_test_sqlsvr_dbShard_0" databaseType="Master" sharding="0" connectionString="SimpleShard_0"/>
  <add  name="dao_test_sqlsvr_dbShard_1" databaseType="Master" sharding="1" connectionString="SimpleShard_1"/>
</databaseSet>
<databaseSet name="dao_test_sqlsvr_dbTableShard" provider="sqlProvider" shardingStrategy="class=com.ctrip.platform.dal.dao.strategy.ShardColModShardStrategy;columns=index,dbIndex;mod=2;tableColumns=table,tableIndex;tableMod=4;separator=_;shardedTables=dal_client_test">
  <add  name="dao_test_sqlsvr_dbShard_0" databaseType="Master" sharding="0" connectionString="SimpleShard_0"/>
  <add  name="dao_test_sqlsvr_dbShard_1" databaseType="Master" sharding="1" connectionString="SimpleShard_1"/>
</databaseSet>

预定义的分片策略

SimpleShardHintStrategy

通过用户直接指定shard id的方式来通知系统具体使用那个shard。可以指定shard的方式是按照DB还是Table。

参数说明

  1. shardByDb。是否按照DB shard。例如 shardByDb=true
  2. shardByTable。是否按照Table shard。例如 shardByTable=true
  3. shardedTables。按照Table shard的表名列表。例如 shardedTables=dal_client_test
  4. separator。Table名和shard Id之间的分割符。例如separator=_

按照DB shard的配置。指定shardByDb=true

按照Table shard的配置。指定shardByTable=true。并且运行shard的表名list为shardedTables=dal_client_test

可以同时指定shardByDb=true和shardByTable=true

示例代码

    @Test
    public void testSimple() {
        try {
            DalClient client = DalClientFactory.getClient(DATABASE_NAME_SIMPLE);
             
            String sql = "select id from " + TABLE_NAME;
             
            StatementParameters parameters = new StatementParameters();
            DalHints hints = new DalHints().inShard("0").masterOnly();
             
            Integer o = (Integer)client.query(sql, parameters, hints, new DalScalarExtractor());
            assertNotNull(o);
            assertEquals(4, o.longValue());
             
            hints = new DalHints().inShard("1").masterOnly();
            Long l = (Long)client.query(sql, parameters, hints, new DalScalarExtractor());
            assertNotNull(l);
            assertEquals(1, l.longValue());
        } catch (Exception e) {
            e.printStackTrace();
            fail();
        }
    }

ShardColModShardStrategy

利用DalHints里面的shardValue, parameters和shardColValues来判断具体的shard。 参数说明:

  1. columns。用于DB shard的列名。可以为虚拟列,即不存在的名字。例如 columns=index,tableIndex
  2. mod。DB shard的模值。例如 mod=2
  3. shardedTables。可以应用table shard的表名。例如 shardedTables=dal_client_test
  4. tableColumns。用于Table shard的列名。可以为虚拟列,即不存在的名字。例如tableColumns=index,tableIndex
  5. tableMod。Table shard的模值。例如 tableMod=4
  6. separator。Table名和shard Id之间的分割符。例如 separator=_

注:参数之间用‘;’分隔。多个参数值之间用‘,’分隔。

计算DB和table shard id的优先级

由于shard id可以由多个factor计算得来。需要确定优先级以便设定正确的参数。其运算考虑使用的factor的优先级为:

shardId〉shardValue〉shardColValue〉parameters〉fields

配置

代码

    @Test
    public void testQueryByPk() throws SQLException {
        ClientTestModel model = null;
         
        for(int i = 0; i < mod; i++) {
            // By shard
            if(i%2 == 0)
                model = dao.queryByPk(1, new DalHints().inShard(String.valueOf(i)));
            else
                model = dao.queryByPk(1, new DalHints().inShard(i));
            Assert.assertEquals(1, model.getId().intValue());
            Assert.assertEquals(i, model.getTableIndex().intValue());
            // By shardValue
            if(i%2 == 0)
                model = dao.queryByPk(1, new DalHints().setShardValue(String.valueOf(i)));
            else
                model = dao.queryByPk(1, new DalHints().setShardValue(i));
            Assert.assertEquals(1, model.getId().intValue());
            Assert.assertEquals(i, model.getTableIndex().intValue());
            // By shardColValue
            if(i%2 == 0)
                model = dao.queryByPk(1, new DalHints().setShardColValue("index", String.valueOf(i)));
            else
                model = dao.queryByPk(1, new DalHints().setShardColValue("index", i));
            Assert.assertEquals(1, model.getId().intValue());
            Assert.assertEquals(i, model.getTableIndex().intValue());
            // By shardColValue
            if(i%2 == 0)
                model = dao.queryByPk(1, new DalHints().setShardColValue("tableIndex", String.valueOf(i)));
            else
                model = dao.queryByPk(1, new DalHints().setShardColValue("tableIndex", i));
            Assert.assertEquals(1, model.getId().intValue());
            Assert.assertEquals(i, model.getTableIndex().intValue());
        }
    }

按照Table shard

代码

    @Test
    public void testQueryByPkWithEntity() throws SQLException{
        ClientTestModel pk = null;
        ClientTestModel model = null;
         
        for(int i = 0; i < mod; i++) {
            pk = new ClientTestModel();
            pk.setId(1);
            // By tabelShard
            model = dao.queryByPk(pk, new DalHints().inTableShard(i));
            Assert.assertEquals(1, model.getId().intValue());
            Assert.assertEquals(i, model.getTableIndex().intValue());
            // By tableShardValue
            model = dao.queryByPk(pk, new DalHints().setTableShardValue(i));
            Assert.assertEquals(1, model.getId().intValue());
            Assert.assertEquals(i, model.getTableIndex().intValue());
            // By shardColValue
            model = dao.queryByPk(pk, new DalHints().setShardColValue("index", i));
            Assert.assertEquals(1, model.getId().intValue());
            Assert.assertEquals(i, model.getTableIndex().intValue());
             
            // By shardColValue
            model = dao.queryByPk(pk, new DalHints().setShardColValue("tableIndex", i));
            Assert.assertEquals(1, model.getId().intValue());
            Assert.assertEquals(i, model.getTableIndex().intValue());
            // By fields
            pk.setTableIndex(i);
            model = dao.queryByPk(pk, new DalHints());
            Assert.assertEquals(1, model.getId().intValue());
            Assert.assertEquals(i, model.getTableIndex().intValue());
        }
    }

同时按照DB和Table shard

代码

    @Test
    public void testQueryByPk() throws SQLException {
        ClientTestModel model = null;
         
        for(int i = 0; i < mod; i++) {
            // By shard
            if(i%2 == 0)
                testQueryByPk(i, new DalHints().inShard(String.valueOf(i)));
            else
                testQueryByPk(i, new DalHints().inShard(i));
            // By shardValue
            if(i%2 == 0)
                testQueryByPk(i, new DalHints().setShardValue(String.valueOf(i)));
            else
                testQueryByPk(i, new DalHints().setShardValue(i));
            // By shardColValue
            if(i%2 == 0)
                testQueryByPk(i, new DalHints().setShardColValue("index", String.valueOf(i)));
            else
                testQueryByPk(i, new DalHints().setShardColValue("index", i));
            // By shardColValue
            if(i%2 == 0)
                testQueryByPk(i, new DalHints().setShardColValue("dbIndex", String.valueOf(i)));
            else
                testQueryByPk(i, new DalHints().setShardColValue("dbIndex", i));
        }
    }
     
    private void testQueryByPk(int shardId, DalHints hints) throws SQLException {
        ClientTestModel model = null;
         
        for(int i = 0; i < tableMod; i++) {
            int id = 1;
            // By tabelShard
            if(i%2 == 0)
                model = dao.queryByPk(1, hints.clone().inTableShard(String.valueOf(i)));
            else
                model = dao.queryByPk(1, hints.clone().inTableShard(i));
            assertQueryByPk(shardId, model, i, id);
            // By tableShardValue
            if(i%2 == 0)
                model = dao.queryByPk(1, hints.clone().setTableShardValue(String.valueOf(i)));
            else
                model = dao.queryByPk(1, hints.clone().setTableShardValue(i));
            assertQueryByPk(shardId, model, i, id);
            // By shardColValue
            if(i%2 == 0)
                model = dao.queryByPk(1, hints.clone().setShardColValue("table", String.valueOf(i)));
            else
                model = dao.queryByPk(1, hints.clone().setShardColValue("table", i));
            assertQueryByPk(shardId, model, i, id);
             
            // By shardColValue
            if(i%2 == 0)
                model = dao.queryByPk(1, hints.clone().setShardColValue("tableIndex", String.valueOf(i)));
            else
                model = dao.queryByPk(1, hints.clone().setShardColValue("tableIndex", i));
            assertQueryByPk(shardId, model, i, id);
        }
    }
    private void assertQueryByPk(int shardId, ClientTestModel model, int i,
            int id) {
        assertQueryFirstWithWhereClause(shardId, model, i);
        Assert.assertEquals(id * (shardId + 1) * (i+1), model.getQuantity().intValue());
    }

按照fields的例子

    /**
     * Test Insert multiple entities with key-holder
     * @throws SQLException
     */
    @Test
    public void testInsertMultipleAsListWithKeyHolderFail() throws SQLException{
        List<ClientTestModel> entities = createListNoId(3);
        KeyHolder holder = createKeyHolder();
        int res;
        try {
            res = dao.insert(new DalHints(), holder, entities);
            Assert.fail();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
     
    @Test
    public void testInsertMultipleAsListWithKeyHolderByShard() throws SQLException{
        for(int i = 0; i < mod; i++) {
            // By shard
            testInsertMultipleAsListWithKeyHolder(i, new DalHints().inShard(i));
        }
    }
    @Test
    public void testInsertMultipleAsListWithKeyHolderByShardValue() throws SQLException{
        for(int i = 0; i < mod; i++) {
            // By shardValue
            testInsertMultipleAsListWithKeyHolder(i, new DalHints().setShardValue(i));
        }
    }
     
    @Test
    public void testInsertMultipleAsListWithKeyHolderByShardCol() throws SQLException{
        for(int i = 0; i < mod; i++) {
            // By shardColValue
            testInsertMultipleAsListWithKeyHolder(i, new DalHints().setShardColValue("index", i));
        }
    }
     
    @Test
    public void testInsertMultipleAsListWithKeyHolderByShardCol2() throws SQLException{
        for(int i = 0; i < mod; i++) {
            // By shardColValue
            testInsertMultipleAsListWithKeyHolder(i, new DalHints().setShardColValue("dbIndex", i));
        }
    }
    @Test
    public void testInsertMultipleAsListWithKeyHolderByFields() throws SQLException{
        List<ClientTestModel> entities = createListNoId(3);
        int res;
        KeyHolder holder = createKeyHolder();
        deleteAllShardsByDbTable(dao, mod, tableMod);
         
        // By fields not same shard
        holder = createKeyHolder();
        entities.get(0).setTableIndex(0);
        entities.get(0).setDbIndex(0);
        entities.get(1).setTableIndex(1);
        entities.get(1).setDbIndex(1);
         
        entities.get(2).setTableIndex(2);
        entities.get(2).setDbIndex(2);
         
        res = dao.insert(new DalHints(), holder, entities);
        assertResEquals(3, res);
        Assert.assertEquals(1, getCount(0, 0));
        Assert.assertEquals(1, getCount(1, 1));
        Assert.assertEquals(1, getCount(0, 2));
        assertKeyHolder(holder);
    }
    /**
     * Test Insert multiple entities with key-holder
     * @throws SQLException
     */
    public void testInsertMultipleAsListWithKeyHolder(int shardId, DalHints hints) throws SQLException{
        List<ClientTestModel> entities = createListNoId(3);
        KeyHolder holder = new KeyHolder();
        int res;
        try {
            res = dao.insert(hints.clone(), holder, entities);
            Assert.fail();
        } catch (Exception e) {
            e.printStackTrace();
        }
        for(int i = 0; i < tableMod; i++) {
            int j = 1;
            // By tabelShard
            holder = createKeyHolder();
            res = dao.insert(hints.clone().inTableShard(i), holder, entities);
            assertResEquals(3, res);
            Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
            assertKeyHolder(holder);
            // By tableShardValue
            holder = createKeyHolder();
            res = dao.insert(hints.clone().setTableShardValue(i), holder, entities);
            assertResEquals(3, res);
            Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
            assertKeyHolder(holder);
             
            // By shardColValue
            holder = createKeyHolder();
            res = dao.insert(hints.clone().setShardColValue("table", i), holder, entities);
            assertResEquals(3, res);
            Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
            assertKeyHolder(holder);
             
            // By shardColValue
            holder = createKeyHolder();
            res = dao.insert(hints.clone().setShardColValue("tableIndex", i), holder, entities);
            assertResEquals(3, res);
            Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
            assertKeyHolder(holder);
             
            // By fields same shard
            holder = createKeyHolder();
            entities.get(0).setTableIndex(i);
            entities.get(1).setTableIndex(i);
            entities.get(2).setTableIndex(i);
            res = dao.insert(hints.clone(), holder, entities);
            assertResEquals(3, res);
            Assert.assertEquals((i + 1) + j++ * 3, getCount(shardId, i));
            assertKeyHolder(holder);
        }
         
        deleteAllShards(shardId);
         
        // By fields not same shard
        holder = createKeyHolder();
        entities.get(0).setTableIndex(0);
        entities.get(1).setTableIndex(1);
        entities.get(2).setTableIndex(2);
        res = dao.insert(hints.clone(), holder, entities);
        Assert.assertEquals(1, getCount(shardId, 0));
        Assert.assertEquals(1, getCount(shardId, 1));
        Assert.assertEquals(1, getCount(shardId, 2));
        assertResEquals(3, res);
        assertKeyHolder(holder);
    }

自定义类似ShardColModShardStrategy

since 1.15.0

由于很多用户需要类似ShardColModShardStrategy的逻辑,但提供自己的计算方式,DAL新版本提供了AbstractColumnShardStrategy。该抽象策略要求继承类实现两个方法:

/**
 * Locate DB shard value
 * @param value column or parameter value
 * @return DB shard id
 */
abstract public String calculateDbShard(Object value);
 
/**
 * Locate table shard value
 * @param tableName the rawTableName table name template without any sharding separator or shard id suffix
 * @param value column or parameter value
 * @return table shard id
 */
abstract public String calculateTableShard(String rawTableName, Object value);

ShardColModShardStrategy类已经改造为继承自这个抽象类并实现了这两个接口,用户可以借鉴这个类实现

跨shard的批量操作

DalTableDao里面的下面操作支持shard的批量操作

  1. combinedInsert
  2. batchInsert
  3. batchDelete

例如

    @Test
    public void testCrossShardBatchInsert() {
        try {
            deleteAllShardsByDbTable(dao, mod, tableMod);
             
            ClientTestModel[] pList = new ClientTestModel[mod * (1 + tableMod)*tableMod/2];
            int x = 0;
            for(int i = 0; i < mod; i++) {
                for(int j = 0; j < tableMod; j++) {
                    for(int k = 0; k < j + 1; k ++) {
                        ClientTestModel p = new ClientTestModel();
                     
                        p = new ClientTestModel();
                        p.setId(1 + k);
                        p.setAddress("aaa");
                        p.setDbIndex(i);
                        p.setTableIndex(j);
                         
                        pList[x++] = p;
                    }
                }
            }
             
            dao.batchInsert(new DalHints(), pList);
            for(int i = 0; i < mod; i++) {
                for(int j = 0; j < tableMod; j++) {
                    Assert.assertEquals(j + 1, getCount(i, j));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
    }

基于SQL的跨shard操作

基于SQL的跨shard操作是指同一条语句在多个shard上执行。达到这样的效果,只需调用一次API即可。对该功能的支持是通过DalHints来实现的,没有新添加API。本功能的支持也是在DAO层面通过DalRequestExecutor实现,不涉及到DalClient改动。该功能只针对传入语句的API,例如delete和update操作(insert可以利用update API来完成),对传入pojo或pojo list的没效果。

注意

基于SQL的跨shard操作目前只支持数据库级别的shard。暂时不支持表级别的shard,因为无法安全高效的解析和替换SQL里面的表名。 跨shard操作的两种类型

全shards

如果希望操作在所有shard上都执行,可以通过Dalhints的如下设置来实现 hints.inAllShards();

指定shard集合

如果希望操作在给定的shard范围上执行,可以通过Dalhints的如下设置来实现 // By shards Set shards = new HashSet<>(); shards.add("0"); shards.add("1"); hints.inShards(shards);

查询

下面列出了4个例子,分别是返回列表,返回对象,返回第一个,返回头几个,返回中间几个

 private List<Short> queryListInAllShard(DalHints hints) throws SQLException {
    return new DalQueryDao(DATABASE_NAME).query(
            sqlList, parameters, 
            hints.inAllShards(), 
            new ShortRowMapper());
}

private ClientTestModel queryForObjectInAllShard(DalHints hints) throws SQLException {
    StatementParameters parameters = new StatementParameters();
    parameters.set(1, 1);
    return new DalQueryDao(DATABASE_NAME).queryForObject(
            sqlObject, parameters, 
            hints.inAllShards(), 
            new ClientTestDalRowMapper());
}

private ClientTestModel queryFirstInAllShard(DalHints hints) throws SQLException {
    StatementParameters parameters = new StatementParameters();
    parameters.set(1, 1);
    return new DalQueryDao(DATABASE_NAME).queryFirst(
            sqlFirst, parameters, 
            hints.inAllShards(), 
            new ClientTestDalRowMapper());
}

 private List<Short> queryTopInAllShard(DalHints hints) throws SQLException {
    return new DalQueryDao(DATABASE_NAME).queryTop(
            sqlList, parameters, 
            hints.inAllShards(), 
            new ShortRowMapper(), 4);
}

 private List<Short> queryFromInAllShard(DalHints hints) throws SQLException {
    return new DalQueryDao(DATABASE_NAME).queryFrom(
            sqlList, parameters, 
            hints.inAllShards(), 
            new ShortRowMapper(), 2, 4);
}

结果合并

由于查询可以在多个shard上执行,为了保证返回的结果是用户希望的顺序,DAL支持用户自定义的结果合并接口ResultMerger,或者简单的传入一个Comparator作为排序的sorter。并且为ResultMerger提供了缺省的常用实现。

public interface ResultMerger<T> {
    void addPartial(String shard, T partial) throws SQLException;
    T merge() throws SQLException;
     
    static class IntSummary implements ResultMerger<Integer>{
        private int total;
        @Override
        public void addPartial(String shard, Integer partial) {
            total += partial.intValue();
        }
        @Override
        public Integer merge() {
            return total;
        }
    }

代码示例1,使用ResultMerger

@Test
public void testQueryListAllShardsWithMerger() {
    try {
        DalHints hints = new DalHints();
        List<Short> result = queryListInAllShard(hints.mergeBy(new TestResultMerger()));
        Short t = result.get(0);
        assertEquals(new Short((short)3), t);
    } catch (Exception e) {
        fail();
    }
}

代码示例2,使用Comparator

@Test
public void testQueryListAllShardsWithSorter() {
    try {
        DalHints hints = new DalHints();
        List<Short> result = queryListInAllShard(hints.sortBy(new TestComparator()));
        assertEquals(6, result.size());
    } catch (Exception e) {
        e.printStackTrace();
        fail();
    }
}

private class TestComparator implements Comparator<Short>{
    @Override
    public int compare(Short o1, Short o2) {
        return o1.compareTo(o2);
    }
}

综合例子

异步和跨shard执行可以结合使用,如下

    @Test
    public void testQueryListAllShardsWithRowCallbackSequentialAsync() {
        try {
            DalHints hints = new DalHints();
            TestDalRowCallback callback = new TestDalRowCallback();
            new DalQueryDao(DATABASE_NAME).query(
                    sqlListQuantity, parameters, 
                    hints.inAllShards().sequentialExecute().asyncExecution(), 
                    callback);
             
            // Make sure the execution is completed
            hints.getAsyncResult().get();
            // 66 = (10 + 11 + 12)*2
            assertEquals(66, callback.result.get());
        } catch (Exception e) {
            fail();
        }        
    }
     
    private static class TestDalRowCallback implements DalRowCallback {
        AtomicInteger result = new AtomicInteger();
        public void process(ResultSet rs) throws SQLException {
            result.addAndGet(rs.getShort("quantity"));
        }
    }

更新

跨shard更新只需在调用前指定inAllShards和inShards即可

代码示例 @Test public void testUpdatePlainAllShardsCallback() throws SQLException{ String sql = "UPDATE " + TABLE_NAME + " SET address = 'CTRIP' WHERE id = 1"; StatementParameters parameters = new StatementParameters(); IntCallback callback = new IntCallback(); DalHints hints = new DalHints().callbackWith(callback);

    int res;
     
    // By allShards
    sql = "UPDATE " + TABLE_NAME
            + " SET address = 'CTRIP' WHERE id = 1";
    res = dao.update(sql, parameters, hints.inAllShards());
    assertEquals(0, res);
    res = callback.getInt();
    assertResEquals(2, res);
    assertEquals("CTRIP", dao.queryByPk(1, new DalHints().inShard(0)).getAddress());
    assertEquals("CTRIP", dao.queryByPk(1, new DalHints().inShard(1)).getAddress());
}

@Test
public void testUpdatePlainShards() throws SQLException{
    String sql = "UPDATE " + TABLE_NAME
            + " SET address = 'CTRIP' WHERE id = 1";
    StatementParameters parameters = new StatementParameters();
    DalHints hints = new DalHints();
    int res;
     
    // By shards
    Set<String> shards = new HashSet<>();
    shards.add("0");
    shards.add("1");
    sql = "UPDATE " + TABLE_NAME
            + " SET address = 'CTRIP' WHERE id = 1";
    res = dao.update(sql, parameters, new DalHints().inShards(shards));
    assertResEquals(2, res);
    assertEquals("CTRIP", dao.queryByPk(1, new DalHints().inShard(0)).getAddress());
    assertEquals("CTRIP", dao.queryByPk(1, new DalHints().inShard(1)).getAddress());
}

删除

跨shard删除只需在调用前指定inAllShards和inShards即可

@Test
public void testDeleteWithWhereClauseShards() throws SQLException{
    String whereClause = "type=?";
    StatementParameters parameters = new StatementParameters();
    parameters.set(1, Types.SMALLINT, 1);
    DalHints hints = new DalHints();
    int res;
     
    // By shards
    Set<String> shards = new HashSet<>();
    shards.add("0");
    shards.add("1");
    assertEquals(3, getCountByDb(dao, 0));
    assertEquals(3, getCountByDb(dao, 1));
    res = dao.delete(whereClause, parameters, new DalHints().inShards(shards));
    assertResEquals(6, res);
    assertEquals(0, dao.query(whereClause, parameters, new DalHints().inShards(shards)).size());
}

Shard by support

Dal支持按照指定shardBy参数来优化包含IN的语句。Dal会把IN里面的参数按照各自属于的shard划分为几份,然后在需要的shard上执行SQL,参数只包括那个shard对应的参数集合

For more detail, please refer to DalTableDao and DalQueryDao enhancement

/*
 * Indicate name of the parameter that will partition shards for the request. 
 */
shardBy,

示例代码

private List<Short> queryListForInParamBuilder(DalHints hints) throws SQLException {
    StatementParameters parameters = new StatementParameters();
     
    List<Integer> inParam = new ArrayList<>();
    inParam.add(0);
    inParam.add(1);
    inParam.add(2);
    inParam.add(3);
    inParam.add(4);
     
    parameters.setInParameter(1, "type", Types.INTEGER, inParam);
    FreeSelectSqlBuilder<List<Short>> builder = new FreeSelectSqlBuilder<>(dbCategory);
    builder.setTemplate(sqlInParam);
    builder.mapWith(new ShortRowMapper());
    return new DalQueryDao(DATABASE_NAME).query(
            builder, parameters, 
            hints.shardBy("type"));
}

跨shard操作的并发控制

可以通过DalHints来控制,缺省都是并行操作,可以通过设置为sequentialExecute来指定为顺序操作。

hints.inAllShards().sequentialExecute().asyncExecution(), 
Clone this wiki locally