Skip to content

Commit 8ed840e

Browse files
committed
ddl: support region split policy
Signed-off-by: xhe <[email protected]>
1 parent 865730d commit 8ed840e

File tree

26 files changed

+10770
-9258
lines changed

26 files changed

+10770
-9258
lines changed

errors.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,11 @@ error = '''
806806
Incorrect table name '%-.100s'
807807
'''
808808

809+
["ddl:1105"]
810+
error = '''
811+
It is recommended to add a region split strategy to the new index to avoid write hotspots
812+
'''
813+
809814
["ddl:1111"]
810815
error = '''
811816
Invalid use of group function

pkg/ddl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ go_library(
184184
"//pkg/util/memory",
185185
"//pkg/util/ppcpuusage",
186186
"//pkg/util/ranger",
187+
"//pkg/util/regionsplit",
187188
"//pkg/util/rowDecoder",
188189
"//pkg/util/rowcodec",
189190
"//pkg/util/set",

pkg/ddl/executor.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,24 @@ func (e *executor) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (
10401040
if err = checkTableInfoValidWithStmt(metaBuildCtx, tbInfo, s); err != nil {
10411041
return err
10421042
}
1043+
1044+
// Process region split policies from CREATE TABLE
1045+
if len(s.SplitIndex) > 0 {
1046+
for _, splitOpt := range s.SplitIndex {
1047+
policy, indexName, isPK, err := normalizeSplitPolicy(splitOpt, tbInfo)
1048+
if err != nil {
1049+
return errors.Trace(err)
1050+
}
1051+
if indexName == "" || (isPK && tbInfo.PKIsHandle) {
1052+
tbInfo.TableSplitPolicy = policy
1053+
} else {
1054+
indexInfo := tbInfo.FindIndexByName(indexName)
1055+
indexInfo.RegionSplitPolicy = policy
1056+
}
1057+
}
1058+
1059+
}
1060+
10431061
if err = checkTableForeignKeysValid(ctx, is, schema.Name.L, tbInfo); err != nil {
10441062
return err
10451063
}
@@ -1933,6 +1951,8 @@ func (e *executor) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt
19331951
case ast.AlterTableDisableKeys, ast.AlterTableEnableKeys:
19341952
// Nothing to do now, see https://github.com/pingcap/tidb/issues/1051
19351953
// MyISAM specific
1954+
case ast.AlterTableSplitIndex:
1955+
err = e.AlterTableSetRegionSplitPolicy(sctx, ident, spec.SplitIndex)
19361956
case ast.AlterTableRemoveTTL:
19371957
// the parser makes sure we have only one `ast.AlterTableRemoveTTL` in an alter statement
19381958
err = e.AlterTableRemoveTTL(sctx, ident)
@@ -5082,6 +5102,10 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
50825102
OpType: model.OpAddIndex,
50835103
}
50845104

5105+
// Check if we should warn about missing region split policy
5106+
// If other indexes already have region split policy, warn user to set policy for new index
5107+
checkAndWarnMissingRegionSplitPolicy(ctx, tblInfo, indexName)
5108+
50855109
err = e.doDDLJob2(ctx, job, args)
50865110
// key exists, but if_not_exists flags is true, so we ignore this error.
50875111
if dbterror.ErrDupKeyName.Equal(err) && ifNotExists {
@@ -7175,3 +7199,56 @@ func checkColumnReferencedByPartialCondition(t *model.TableInfo, colName ast.CIS
71757199

71767200
return nil
71777201
}
7202+
7203+
// AlterTableSetRegionSplitPolicy sets persistent region split policy for table
7204+
func (e *executor) AlterTableSetRegionSplitPolicy(ctx sessionctx.Context, ident ast.Ident, splitOpt *ast.SplitIndexOption) error {
7205+
schema, tb, err := e.getSchemaAndTableByIdent(ident)
7206+
if err != nil {
7207+
return errors.Trace(err)
7208+
}
7209+
7210+
meta := tb.Meta()
7211+
7212+
policy, indexName, isPK, err := normalizeSplitPolicy(splitOpt, tb.Meta())
7213+
if err != nil {
7214+
return errors.Trace(err)
7215+
}
7216+
7217+
job := &model.Job{
7218+
Version: model.GetJobVerInUse(),
7219+
SchemaID: schema.ID,
7220+
TableID: meta.ID,
7221+
SchemaName: schema.Name.L,
7222+
TableName: meta.Name.L,
7223+
Type: model.ActionAlterTableSetRegionSplitPolicy,
7224+
BinlogInfo: &model.HistoryInfo{},
7225+
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
7226+
SQLMode: ctx.GetSessionVars().SQLMode,
7227+
}
7228+
7229+
args := &model.AlterTableSetRegionSplitPolicyArgs{
7230+
IndexName: indexName,
7231+
PrimaryKey: isPK,
7232+
Policy: policy,
7233+
}
7234+
return e.doDDLJob2(ctx, job, args)
7235+
}
7236+
7237+
// checkAndWarnMissingRegionSplitPolicy checks if table has other indexes with region split policy,
7238+
// and warns user to set policy for newly created index.
7239+
func checkAndWarnMissingRegionSplitPolicy(ctx sessionctx.Context, tblInfo *model.TableInfo, newIndexName ast.CIStr) {
7240+
// Check if any existing index has RegionSplitPolicy
7241+
hasExistingPolicy := false
7242+
for _, idx := range tblInfo.Indices {
7243+
if idx.RegionSplitPolicy != nil {
7244+
hasExistingPolicy = true
7245+
break
7246+
}
7247+
}
7248+
7249+
// If existing indexes have region split policy, warn about new index
7250+
if hasExistingPolicy {
7251+
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError(
7252+
"It is recommended to add a region split strategy to the new index '" + newIndexName.O + "' to avoid write hotspots"))
7253+
}
7254+
}

pkg/ddl/job_worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,8 @@ func (w *worker) runOneJobStep(
10821082
ver, err = onRefreshMeta(jobCtx, job)
10831083
case model.ActionAlterTableAffinity:
10841084
ver, err = onAlterTableAffinity(jobCtx, job)
1085+
case model.ActionAlterTableSetRegionSplitPolicy:
1086+
ver, err = w.onAlterTableSetRegionSplitPolicy(jobCtx, job)
10851087
default:
10861088
// Invalid job, cancel it.
10871089
job.State = model.JobStateCancelled

pkg/ddl/split_region.go

Lines changed: 223 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,24 @@ package ddl
1616

1717
import (
1818
"context"
19+
"strconv"
20+
"strings"
1921

2022
"github.com/pingcap/errors"
2123
"github.com/pingcap/tidb/pkg/ddl/logutil"
2224
"github.com/pingcap/tidb/pkg/kv"
2325
"github.com/pingcap/tidb/pkg/meta/autoid"
2426
"github.com/pingcap/tidb/pkg/meta/model"
27+
"github.com/pingcap/tidb/pkg/parser/ast"
28+
"github.com/pingcap/tidb/pkg/parser/format"
2529
"github.com/pingcap/tidb/pkg/parser/mysql"
2630
"github.com/pingcap/tidb/pkg/sessionctx"
2731
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
32+
"github.com/pingcap/tidb/pkg/table/tables"
2833
"github.com/pingcap/tidb/pkg/tablecodec"
2934
"github.com/pingcap/tidb/pkg/types"
35+
"github.com/pingcap/tidb/pkg/util/dbterror"
36+
"github.com/pingcap/tidb/pkg/util/regionsplit"
3037
tikverr "github.com/tikv/client-go/v2/error"
3138
"go.uber.org/zap"
3239
)
@@ -36,11 +43,19 @@ const GlobalScatterGroupID int64 = -1
3643

3744
func splitPartitionTableRegion(ctx sessionctx.Context, store kv.SplittableStore, tbInfo *model.TableInfo, parts []model.PartitionDefinition, scatterScope string) {
3845
// Max partition count is 8192, should we sample and just choose some partitions to split?
39-
var regionIDs []uint64
4046
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), ctx.GetSessionVars().GetSplitRegionTimeout())
4147
defer cancel()
4248
ctxWithTimeout = kv.WithInternalSourceType(ctxWithTimeout, kv.InternalTxnDDL)
43-
if shardingBits(tbInfo) > 0 && tbInfo.PreSplitRegions > 0 {
49+
50+
var regionIDs []uint64
51+
if hasSplitPolicies(tbInfo) {
52+
regionIDs = append(regionIDs,
53+
applySplitPoliciesForTable(ctxWithTimeout, ctx, store, tbInfo, tbInfo.ID)...)
54+
for _, def := range parts {
55+
regionIDs = append(regionIDs,
56+
applySplitPoliciesForTable(ctxWithTimeout, ctx, store, tbInfo, def.ID)...)
57+
}
58+
} else if shardingBits(tbInfo) > 0 && tbInfo.PreSplitRegions > 0 {
4459
regionIDs = make([]uint64, 0, len(parts)*(len(tbInfo.Indices)+1))
4560
scatter, tableID := getScatterConfig(scatterScope, tbInfo.ID)
4661
// Try to split global index region here.
@@ -63,8 +78,11 @@ func splitTableRegion(ctx sessionctx.Context, store kv.SplittableStore, tbInfo *
6378
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), ctx.GetSessionVars().GetSplitRegionTimeout())
6479
defer cancel()
6580
ctxWithTimeout = kv.WithInternalSourceType(ctxWithTimeout, kv.InternalTxnDDL)
81+
6682
var regionIDs []uint64
67-
if shardingBits(tbInfo) > 0 && tbInfo.PreSplitRegions > 0 {
83+
if hasSplitPolicies(tbInfo) {
84+
regionIDs = applySplitPoliciesForTable(ctxWithTimeout, ctx, store, tbInfo, tbInfo.ID)
85+
} else if shardingBits(tbInfo) > 0 && tbInfo.PreSplitRegions > 0 {
6886
regionIDs = preSplitPhysicalTableByShardRowID(ctxWithTimeout, store, tbInfo, tbInfo.ID, scatterScope)
6987
} else {
7088
regionIDs = append(regionIDs, SplitRecordRegion(ctxWithTimeout, store, tbInfo.ID, tbInfo.ID, scatterScope))
@@ -200,3 +218,205 @@ func WaitScatterRegionFinish(ctx context.Context, store kv.SplittableStore, regi
200218
}
201219
}
202220
}
221+
222+
// waitScatterIfNeeded waits for scatter to finish based on scatterScope
223+
// Extracted from multiple places to avoid code duplication
224+
func waitScatterIfNeeded(ctx context.Context, store kv.SplittableStore,
225+
scatterScope string, regionIDs []uint64) {
226+
if scatterScope == vardef.ScatterOff {
227+
return // No need to wait
228+
}
229+
230+
if len(regionIDs) == 0 {
231+
return
232+
}
233+
234+
WaitScatterRegionFinish(ctx, store, regionIDs...)
235+
}
236+
237+
func hasSplitPolicies(tbInfo *model.TableInfo) bool {
238+
if tbInfo.TableSplitPolicy != nil {
239+
return true
240+
}
241+
for _, idx := range tbInfo.Indices {
242+
if idx.RegionSplitPolicy != nil {
243+
return true
244+
}
245+
}
246+
return false
247+
}
248+
249+
func applySplitPoliciesForTable(ctx context.Context, sctx sessionctx.Context, store kv.SplittableStore,
250+
tbInfo *model.TableInfo, physicalTableID int64) []uint64 {
251+
var keys [][]byte
252+
253+
sc := sctx.GetSessionVars().StmtCtx
254+
scatterScope := sctx.GetSessionVars().ScatterRegion
255+
scatter, tableID := getScatterConfig(scatterScope, tbInfo.ID)
256+
257+
// apply table policy
258+
for p := tbInfo.TableSplitPolicy; p != nil; p = nil {
259+
policy := tbInfo.TableSplitPolicy
260+
lower, err := parseValuesToDatums(policy.Lower)
261+
if err != nil {
262+
logutil.DDLLogger().Warn("failed to parse lower bound for table policy",
263+
zap.String("table", tbInfo.Name.O), zap.Error(err))
264+
break
265+
}
266+
upper, err := parseValuesToDatums(policy.Upper)
267+
if err != nil {
268+
logutil.DDLLogger().Warn("failed to parse upper bound for table policy",
269+
zap.String("table", tbInfo.Name.O), zap.Error(err))
270+
break
271+
}
272+
273+
handleCols := regionsplit.BuildHandleColsForSplit(tbInfo)
274+
keys, err = regionsplit.GetSplitTableKeys(sc, tbInfo, handleCols, physicalTableID, lower, upper, int(policy.Regions), keys, dbterror.ErrInvalidSplitRegionRanges)
275+
if err != nil {
276+
logutil.DDLLogger().Warn("failed to generate split keys for table policy",
277+
zap.String("table", tbInfo.Name.O), zap.Error(err))
278+
}
279+
}
280+
281+
// 2. Apply index policies (including PRIMARY)
282+
for _, idx := range tbInfo.Indices {
283+
if tbInfo.GetPartitionInfo() != nil &&
284+
((idx.Global && tbInfo.ID != physicalTableID) || (!idx.Global && tbInfo.ID == physicalTableID)) {
285+
continue
286+
}
287+
288+
if idx.RegionSplitPolicy == nil {
289+
continue
290+
}
291+
292+
policy := idx.RegionSplitPolicy
293+
lower, err := parseValuesToDatums(policy.Lower)
294+
if err != nil {
295+
logutil.DDLLogger().Warn("failed to parse lower bound for index policy",
296+
zap.String("table", tbInfo.Name.O),
297+
zap.String("index", idx.Name.O),
298+
zap.Error(err))
299+
continue
300+
}
301+
upper, err := parseValuesToDatums(policy.Upper)
302+
if err != nil {
303+
logutil.DDLLogger().Warn("failed to parse upper bound for index policy",
304+
zap.String("table", tbInfo.Name.O),
305+
zap.String("index", idx.Name.O),
306+
zap.Error(err))
307+
continue
308+
}
309+
310+
keys, err = regionsplit.GetSplitIndexKeys(sc, tbInfo, idx, physicalTableID, lower, upper, int(policy.Regions), keys, dbterror.ErrInvalidSplitRegionRanges)
311+
if err != nil {
312+
logutil.DDLLogger().Warn("failed to generate split keys for index policy",
313+
zap.String("table", tbInfo.Name.O),
314+
zap.String("index", idx.Name.O),
315+
zap.Error(err))
316+
}
317+
}
318+
319+
regionIDs := []uint64{}
320+
if len(keys) > 0 {
321+
ids, err := store.SplitRegions(ctx, keys, scatter, &tableID)
322+
if err != nil {
323+
logutil.DDLLogger().Warn("split regions failed", zap.Error(err))
324+
} else {
325+
regionIDs = ids
326+
}
327+
}
328+
return regionIDs
329+
}
330+
331+
func parseValuesToDatums(values []string) ([]types.Datum, error) {
332+
datums := make([]types.Datum, len(values))
333+
for i, val := range values {
334+
// Try to parse as different types
335+
// First try integer
336+
if intVal, err := strconv.ParseInt(val, 10, 64); err == nil {
337+
datums[i] = types.NewIntDatum(intVal)
338+
continue
339+
}
340+
// Then try float
341+
if floatVal, err := strconv.ParseFloat(val, 64); err == nil {
342+
datums[i] = types.NewFloat64Datum(floatVal)
343+
continue
344+
}
345+
// Default to string (remove quotes if present)
346+
strVal := val
347+
if len(strVal) >= 2 && strVal[0] == '\'' && strVal[len(strVal)-1] == '\'' {
348+
strVal = strVal[1 : len(strVal)-1]
349+
}
350+
datums[i] = types.NewStringDatum(strVal)
351+
}
352+
return datums, nil
353+
}
354+
355+
func normalizeSplitPolicy(splitOpt *ast.SplitIndexOption, tbInfo *model.TableInfo) (*model.RegionSplitPolicy, string, bool, error) {
356+
if tbInfo.HasClusteredIndex() && splitOpt.TableLevel {
357+
// cannot specify both SPLIT BETWEEN for CLUSTERED table
358+
// it is for _tidb_rowid
359+
return nil, "", false, dbterror.ErrInvalidRegionSplitPolicy
360+
}
361+
362+
indexName := ""
363+
if !splitOpt.TableLevel {
364+
pkName := strings.ToLower(mysql.PrimaryKeyName)
365+
indexName = splitOpt.IndexName.L
366+
367+
// fill primary key name
368+
isPK := splitOpt.PrimaryKey
369+
if isPK && indexName == "" {
370+
indexName = pkName
371+
}
372+
373+
if isPK != (pkName == indexName) {
374+
// specified pk, but incorrect name, or reverse
375+
return nil, "", false, dbterror.ErrWrongNameForIndex.GenWithStackByArgs(indexName)
376+
}
377+
}
378+
379+
if indexName != "" {
380+
var colen int
381+
if tbInfo.PKIsHandle && splitOpt.PrimaryKey {
382+
colen = 1
383+
} else if tbInfo.IsCommonHandle && splitOpt.PrimaryKey {
384+
pk := tables.FindPrimaryIndex(tbInfo)
385+
colen = len(pk.Columns)
386+
indexName = pk.Name.L
387+
} else {
388+
var idx *model.IndexInfo
389+
idx = tbInfo.FindIndexByName(indexName)
390+
if idx == nil {
391+
return nil, "", false, dbterror.ErrWrongNameForIndex.GenWithStackByArgs(indexName)
392+
}
393+
colen = len(idx.Columns)
394+
}
395+
if colen != len(splitOpt.SplitOpt.Upper) || colen != len(splitOpt.SplitOpt.Lower) {
396+
return nil, "", false, dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs("length of index columns and split values differ")
397+
}
398+
}
399+
400+
var buf strings.Builder
401+
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, &buf)
402+
403+
policy := &model.RegionSplitPolicy{
404+
Regions: splitOpt.SplitOpt.Num,
405+
}
406+
407+
policy.Lower = make([]string, len(splitOpt.SplitOpt.Lower))
408+
for i, expr := range splitOpt.SplitOpt.Lower {
409+
buf.Reset()
410+
_ = expr.Restore(restoreCtx)
411+
policy.Lower[i] = buf.String()
412+
}
413+
414+
policy.Upper = make([]string, len(splitOpt.SplitOpt.Upper))
415+
for i, expr := range splitOpt.SplitOpt.Upper {
416+
buf.Reset()
417+
_ = expr.Restore(restoreCtx)
418+
policy.Upper[i] = buf.String()
419+
}
420+
421+
return policy, indexName, splitOpt.PrimaryKey, nil
422+
}

0 commit comments

Comments
 (0)