Skip to content

Commit 139875e

Browse files
committed
ddl: support region split policy
Signed-off-by: xhe <[email protected]>
1 parent 4b2223c commit 139875e

File tree

26 files changed

+10723
-9289
lines changed

26 files changed

+10723
-9289
lines changed

errors.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,11 @@ error = '''
801801
Incorrect table name '%-.100s'
802802
'''
803803

804+
["ddl:1105"]
805+
error = '''
806+
It is recommended to add a region split strategy to the new index to avoid write hotspots
807+
'''
808+
804809
["ddl:1111"]
805810
error = '''
806811
Invalid use of group function

pkg/ddl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ go_library(
182182
"//pkg/util/memory",
183183
"//pkg/util/ppcpuusage",
184184
"//pkg/util/ranger",
185+
"//pkg/util/regionsplit",
185186
"//pkg/util/rowDecoder",
186187
"//pkg/util/rowcodec",
187188
"//pkg/util/set",

pkg/ddl/executor.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,24 @@ func (e *executor) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (
10411041
if err = checkTableInfoValidWithStmt(metaBuildCtx, tbInfo, s); err != nil {
10421042
return err
10431043
}
1044+
1045+
// Process region split policies from CREATE TABLE
1046+
if len(s.SplitIndex) > 0 {
1047+
for _, splitOpt := range s.SplitIndex {
1048+
policy, indexName, isPK, err := normalizeSplitPolicy(splitOpt, tbInfo)
1049+
if err != nil {
1050+
return errors.Trace(err)
1051+
}
1052+
if indexName == "" || (isPK && tbInfo.PKIsHandle) {
1053+
tbInfo.TableSplitPolicy = policy
1054+
} else {
1055+
indexInfo := tbInfo.FindIndexByName(indexName)
1056+
indexInfo.RegionSplitPolicy = policy
1057+
}
1058+
}
1059+
1060+
}
1061+
10441062
if err = checkTableForeignKeysValid(ctx, is, schema.Name.L, tbInfo); err != nil {
10451063
return err
10461064
}
@@ -1932,6 +1950,8 @@ func (e *executor) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt
19321950
case ast.AlterTableDisableKeys, ast.AlterTableEnableKeys:
19331951
// Nothing to do now, see https://github.com/pingcap/tidb/issues/1051
19341952
// MyISAM specific
1953+
case ast.AlterTableSplitIndex:
1954+
err = e.AlterTableSetRegionSplitPolicy(sctx, ident, spec.SplitIndex)
19351955
case ast.AlterTableRemoveTTL:
19361956
// the parser makes sure we have only one `ast.AlterTableRemoveTTL` in an alter statement
19371957
err = e.AlterTableRemoveTTL(sctx, ident)
@@ -5017,6 +5037,10 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
50175037
OpType: model.OpAddIndex,
50185038
}
50195039

5040+
// Check if we should warn about missing region split policy
5041+
// If other indexes already have region split policy, warn user to set policy for new index
5042+
checkAndWarnMissingRegionSplitPolicy(ctx, tblInfo, indexName)
5043+
50205044
err = e.doDDLJob2(ctx, job, args)
50215045
// key exists, but if_not_exists flags is true, so we ignore this error.
50225046
if dbterror.ErrDupKeyName.Equal(err) && ifNotExists {
@@ -7060,3 +7084,56 @@ func checkColumnReferencedByPartialCondition(t *model.TableInfo, colName ast.CIS
70607084

70617085
return nil
70627086
}
7087+
7088+
// AlterTableSetRegionSplitPolicy sets persistent region split policy for table
7089+
func (e *executor) AlterTableSetRegionSplitPolicy(ctx sessionctx.Context, ident ast.Ident, splitOpt *ast.SplitIndexOption) error {
7090+
schema, tb, err := e.getSchemaAndTableByIdent(ident)
7091+
if err != nil {
7092+
return errors.Trace(err)
7093+
}
7094+
7095+
meta := tb.Meta()
7096+
7097+
policy, indexName, isPK, err := normalizeSplitPolicy(splitOpt, tb.Meta())
7098+
if err != nil {
7099+
return errors.Trace(err)
7100+
}
7101+
7102+
job := &model.Job{
7103+
Version: model.GetJobVerInUse(),
7104+
SchemaID: schema.ID,
7105+
TableID: meta.ID,
7106+
SchemaName: schema.Name.L,
7107+
TableName: meta.Name.L,
7108+
Type: model.ActionAlterTableSetRegionSplitPolicy,
7109+
BinlogInfo: &model.HistoryInfo{},
7110+
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
7111+
SQLMode: ctx.GetSessionVars().SQLMode,
7112+
}
7113+
7114+
args := &model.AlterTableSetRegionSplitPolicyArgs{
7115+
IndexName: indexName,
7116+
PrimaryKey: isPK,
7117+
Policy: policy,
7118+
}
7119+
return e.doDDLJob2(ctx, job, args)
7120+
}
7121+
7122+
// checkAndWarnMissingRegionSplitPolicy checks if table has other indexes with region split policy,
7123+
// and warns user to set policy for newly created index.
7124+
func checkAndWarnMissingRegionSplitPolicy(ctx sessionctx.Context, tblInfo *model.TableInfo, newIndexName ast.CIStr) {
7125+
// Check if any existing index has RegionSplitPolicy
7126+
hasExistingPolicy := false
7127+
for _, idx := range tblInfo.Indices {
7128+
if idx.RegionSplitPolicy != nil {
7129+
hasExistingPolicy = true
7130+
break
7131+
}
7132+
}
7133+
7134+
// If existing indexes have region split policy, warn about new index
7135+
if hasExistingPolicy {
7136+
ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError(
7137+
"It is recommended to add a region split strategy to the new index '" + newIndexName.O + "' to avoid write hotspots"))
7138+
}
7139+
}

pkg/ddl/job_worker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,6 +1067,8 @@ func (w *worker) runOneJobStep(
10671067
ver, err = w.onAlterCheckConstraint(jobCtx, job)
10681068
case model.ActionRefreshMeta:
10691069
ver, err = onRefreshMeta(jobCtx, job)
1070+
case model.ActionAlterTableSetRegionSplitPolicy:
1071+
ver, err = w.onAlterTableSetRegionSplitPolicy(jobCtx, job)
10701072
default:
10711073
// Invalid job, cancel it.
10721074
job.State = model.JobStateCancelled

pkg/ddl/split_region.go

Lines changed: 223 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,24 @@ package ddl
1616

1717
import (
1818
"context"
19+
"strconv"
20+
"strings"
1921

2022
"github.com/pingcap/errors"
2123
"github.com/pingcap/tidb/pkg/ddl/logutil"
2224
"github.com/pingcap/tidb/pkg/kv"
2325
"github.com/pingcap/tidb/pkg/meta/autoid"
2426
"github.com/pingcap/tidb/pkg/meta/model"
27+
"github.com/pingcap/tidb/pkg/parser/ast"
28+
"github.com/pingcap/tidb/pkg/parser/format"
2529
"github.com/pingcap/tidb/pkg/parser/mysql"
2630
"github.com/pingcap/tidb/pkg/sessionctx"
2731
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
32+
"github.com/pingcap/tidb/pkg/table/tables"
2833
"github.com/pingcap/tidb/pkg/tablecodec"
2934
"github.com/pingcap/tidb/pkg/types"
35+
"github.com/pingcap/tidb/pkg/util/dbterror"
36+
"github.com/pingcap/tidb/pkg/util/regionsplit"
3037
tikverr "github.com/tikv/client-go/v2/error"
3138
"go.uber.org/zap"
3239
)
@@ -36,11 +43,19 @@ const GlobalScatterGroupID int64 = -1
3643

3744
func splitPartitionTableRegion(ctx sessionctx.Context, store kv.SplittableStore, tbInfo *model.TableInfo, parts []model.PartitionDefinition, scatterScope string) {
3845
// Max partition count is 8192, should we sample and just choose some partitions to split?
39-
var regionIDs []uint64
4046
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), ctx.GetSessionVars().GetSplitRegionTimeout())
4147
defer cancel()
4248
ctxWithTimeout = kv.WithInternalSourceType(ctxWithTimeout, kv.InternalTxnDDL)
43-
if shardingBits(tbInfo) > 0 && tbInfo.PreSplitRegions > 0 {
49+
50+
var regionIDs []uint64
51+
if hasSplitPolicies(tbInfo) {
52+
regionIDs = append(regionIDs,
53+
applySplitPoliciesForTable(ctxWithTimeout, ctx, store, tbInfo, tbInfo.ID)...)
54+
for _, def := range parts {
55+
regionIDs = append(regionIDs,
56+
applySplitPoliciesForTable(ctxWithTimeout, ctx, store, tbInfo, def.ID)...)
57+
}
58+
} else if shardingBits(tbInfo) > 0 && tbInfo.PreSplitRegions > 0 {
4459
regionIDs = make([]uint64, 0, len(parts)*(len(tbInfo.Indices)+1))
4560
scatter, tableID := getScatterConfig(scatterScope, tbInfo.ID)
4661
// Try to split global index region here.
@@ -63,8 +78,11 @@ func splitTableRegion(ctx sessionctx.Context, store kv.SplittableStore, tbInfo *
6378
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), ctx.GetSessionVars().GetSplitRegionTimeout())
6479
defer cancel()
6580
ctxWithTimeout = kv.WithInternalSourceType(ctxWithTimeout, kv.InternalTxnDDL)
81+
6682
var regionIDs []uint64
67-
if shardingBits(tbInfo) > 0 && tbInfo.PreSplitRegions > 0 {
83+
if hasSplitPolicies(tbInfo) {
84+
regionIDs = applySplitPoliciesForTable(ctxWithTimeout, ctx, store, tbInfo, tbInfo.ID)
85+
} else if shardingBits(tbInfo) > 0 && tbInfo.PreSplitRegions > 0 {
6886
regionIDs = preSplitPhysicalTableByShardRowID(ctxWithTimeout, store, tbInfo, tbInfo.ID, scatterScope)
6987
} else {
7088
regionIDs = append(regionIDs, SplitRecordRegion(ctxWithTimeout, store, tbInfo.ID, tbInfo.ID, scatterScope))
@@ -200,3 +218,205 @@ func WaitScatterRegionFinish(ctx context.Context, store kv.SplittableStore, regi
200218
}
201219
}
202220
}
221+
222+
// waitScatterIfNeeded waits for scatter to finish based on scatterScope
223+
// Extracted from multiple places to avoid code duplication
224+
func waitScatterIfNeeded(ctx context.Context, store kv.SplittableStore,
225+
scatterScope string, regionIDs []uint64) {
226+
if scatterScope == vardef.ScatterOff {
227+
return // No need to wait
228+
}
229+
230+
if len(regionIDs) == 0 {
231+
return
232+
}
233+
234+
WaitScatterRegionFinish(ctx, store, regionIDs...)
235+
}
236+
237+
func hasSplitPolicies(tbInfo *model.TableInfo) bool {
238+
if tbInfo.TableSplitPolicy != nil {
239+
return true
240+
}
241+
for _, idx := range tbInfo.Indices {
242+
if idx.RegionSplitPolicy != nil {
243+
return true
244+
}
245+
}
246+
return false
247+
}
248+
249+
func applySplitPoliciesForTable(ctx context.Context, sctx sessionctx.Context, store kv.SplittableStore,
250+
tbInfo *model.TableInfo, physicalTableID int64) []uint64 {
251+
var keys [][]byte
252+
253+
sc := sctx.GetSessionVars().StmtCtx
254+
scatterScope := sctx.GetSessionVars().ScatterRegion
255+
scatter, tableID := getScatterConfig(scatterScope, tbInfo.ID)
256+
257+
// apply table policy
258+
for p := tbInfo.TableSplitPolicy; p != nil; p = nil {
259+
policy := tbInfo.TableSplitPolicy
260+
lower, err := parseValuesToDatums(policy.Lower)
261+
if err != nil {
262+
logutil.DDLLogger().Warn("failed to parse lower bound for table policy",
263+
zap.String("table", tbInfo.Name.O), zap.Error(err))
264+
break
265+
}
266+
upper, err := parseValuesToDatums(policy.Upper)
267+
if err != nil {
268+
logutil.DDLLogger().Warn("failed to parse upper bound for table policy",
269+
zap.String("table", tbInfo.Name.O), zap.Error(err))
270+
break
271+
}
272+
273+
handleCols := regionsplit.BuildHandleColsForSplit(tbInfo)
274+
keys, err = regionsplit.GetSplitTableKeys(sc, tbInfo, handleCols, physicalTableID, lower, upper, int(policy.Regions), keys, dbterror.ErrInvalidSplitRegionRanges)
275+
if err != nil {
276+
logutil.DDLLogger().Warn("failed to generate split keys for table policy",
277+
zap.String("table", tbInfo.Name.O), zap.Error(err))
278+
}
279+
}
280+
281+
// 2. Apply index policies (including PRIMARY)
282+
for _, idx := range tbInfo.Indices {
283+
if tbInfo.GetPartitionInfo() != nil &&
284+
((idx.Global && tbInfo.ID != physicalTableID) || (!idx.Global && tbInfo.ID == physicalTableID)) {
285+
continue
286+
}
287+
288+
if idx.RegionSplitPolicy == nil {
289+
continue
290+
}
291+
292+
policy := idx.RegionSplitPolicy
293+
lower, err := parseValuesToDatums(policy.Lower)
294+
if err != nil {
295+
logutil.DDLLogger().Warn("failed to parse lower bound for index policy",
296+
zap.String("table", tbInfo.Name.O),
297+
zap.String("index", idx.Name.O),
298+
zap.Error(err))
299+
continue
300+
}
301+
upper, err := parseValuesToDatums(policy.Upper)
302+
if err != nil {
303+
logutil.DDLLogger().Warn("failed to parse upper bound for index policy",
304+
zap.String("table", tbInfo.Name.O),
305+
zap.String("index", idx.Name.O),
306+
zap.Error(err))
307+
continue
308+
}
309+
310+
keys, err = regionsplit.GetSplitIndexKeys(sc, tbInfo, idx, physicalTableID, lower, upper, int(policy.Regions), keys, dbterror.ErrInvalidSplitRegionRanges)
311+
if err != nil {
312+
logutil.DDLLogger().Warn("failed to generate split keys for index policy",
313+
zap.String("table", tbInfo.Name.O),
314+
zap.String("index", idx.Name.O),
315+
zap.Error(err))
316+
}
317+
}
318+
319+
regionIDs := []uint64{}
320+
if len(keys) > 0 {
321+
ids, err := store.SplitRegions(ctx, keys, scatter, &tableID)
322+
if err != nil {
323+
logutil.DDLLogger().Warn("split regions failed", zap.Error(err))
324+
} else {
325+
regionIDs = ids
326+
}
327+
}
328+
return regionIDs
329+
}
330+
331+
func parseValuesToDatums(values []string) ([]types.Datum, error) {
332+
datums := make([]types.Datum, len(values))
333+
for i, val := range values {
334+
// Try to parse as different types
335+
// First try integer
336+
if intVal, err := strconv.ParseInt(val, 10, 64); err == nil {
337+
datums[i] = types.NewIntDatum(intVal)
338+
continue
339+
}
340+
// Then try float
341+
if floatVal, err := strconv.ParseFloat(val, 64); err == nil {
342+
datums[i] = types.NewFloat64Datum(floatVal)
343+
continue
344+
}
345+
// Default to string (remove quotes if present)
346+
strVal := val
347+
if len(strVal) >= 2 && strVal[0] == '\'' && strVal[len(strVal)-1] == '\'' {
348+
strVal = strVal[1 : len(strVal)-1]
349+
}
350+
datums[i] = types.NewStringDatum(strVal)
351+
}
352+
return datums, nil
353+
}
354+
355+
func normalizeSplitPolicy(splitOpt *ast.SplitIndexOption, tbInfo *model.TableInfo) (*model.RegionSplitPolicy, string, bool, error) {
356+
if tbInfo.HasClusteredIndex() && splitOpt.TableLevel {
357+
// cannot specify both SPLIT BETWEEN for CLUSTERED table
358+
// it is for _tidb_rowid
359+
return nil, "", false, dbterror.ErrInvalidRegionSplitPolicy
360+
}
361+
362+
indexName := ""
363+
if !splitOpt.TableLevel {
364+
pkName := strings.ToLower(mysql.PrimaryKeyName)
365+
indexName = splitOpt.IndexName.L
366+
367+
// fill primary key name
368+
isPK := splitOpt.PrimaryKey
369+
if isPK && indexName == "" {
370+
indexName = pkName
371+
}
372+
373+
if isPK != (pkName == indexName) {
374+
// specified pk, but incorrect name, or reverse
375+
return nil, "", false, dbterror.ErrWrongNameForIndex.GenWithStackByArgs(indexName)
376+
}
377+
}
378+
379+
if indexName != "" {
380+
var colen int
381+
if tbInfo.PKIsHandle && splitOpt.PrimaryKey {
382+
colen = 1
383+
} else if tbInfo.IsCommonHandle && splitOpt.PrimaryKey {
384+
pk := tables.FindPrimaryIndex(tbInfo)
385+
colen = len(pk.Columns)
386+
indexName = pk.Name.L
387+
} else {
388+
var idx *model.IndexInfo
389+
idx = tbInfo.FindIndexByName(indexName)
390+
if idx == nil {
391+
return nil, "", false, dbterror.ErrWrongNameForIndex.GenWithStackByArgs(indexName)
392+
}
393+
colen = len(idx.Columns)
394+
}
395+
if colen != len(splitOpt.SplitOpt.Upper) || colen != len(splitOpt.SplitOpt.Lower) {
396+
return nil, "", false, dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs("length of index columns and split values differ")
397+
}
398+
}
399+
400+
var buf strings.Builder
401+
restoreCtx := format.NewRestoreCtx(format.DefaultRestoreFlags, &buf)
402+
403+
policy := &model.RegionSplitPolicy{
404+
Regions: splitOpt.SplitOpt.Num,
405+
}
406+
407+
policy.Lower = make([]string, len(splitOpt.SplitOpt.Lower))
408+
for i, expr := range splitOpt.SplitOpt.Lower {
409+
buf.Reset()
410+
_ = expr.Restore(restoreCtx)
411+
policy.Lower[i] = buf.String()
412+
}
413+
414+
policy.Upper = make([]string, len(splitOpt.SplitOpt.Upper))
415+
for i, expr := range splitOpt.SplitOpt.Upper {
416+
buf.Reset()
417+
_ = expr.Restore(restoreCtx)
418+
policy.Upper[i] = buf.String()
419+
}
420+
421+
return policy, indexName, splitOpt.PrimaryKey, nil
422+
}

0 commit comments

Comments
 (0)