Skip to content

Commit 4b2223c

Browse files
authored
dxf: separate metered traffic into object storage and cluster and fix missing cluster traffic for import-into (#64523)
close #61702
1 parent 9b24940 commit 4b2223c

34 files changed

+450
-228
lines changed

br/pkg/storage/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ go_test(
111111
shard_count = 50,
112112
deps = [
113113
"//br/pkg/mock",
114+
"//br/pkg/storage/recording",
114115
"//pkg/util/intest",
115116
"@com_github_aws_aws_sdk_go//aws",
116117
"@com_github_aws_aws_sdk_go//aws/awserr",

br/pkg/storage/azblob.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,7 @@ func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOptio
714714
cpkInfo: s.cpkInfo,
715715
}
716716

717-
uploaderWriter := newBufferedWriter(uploader, azblobChunkSize, NoCompression)
717+
uploaderWriter := newBufferedWriter(uploader, azblobChunkSize, NoCompression, nil)
718718
return uploaderWriter, nil
719719
}
720720

br/pkg/storage/compress.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/pingcap/errors"
1111
berrors "github.com/pingcap/tidb/br/pkg/errors"
12+
"github.com/pingcap/tidb/br/pkg/storage/recording"
1213
)
1314

1415
type withCompression struct {
@@ -38,7 +39,8 @@ func (w *withCompression) Create(ctx context.Context, name string, o *WriterOpti
3839
if bw, ok := writer.(*bufferedWriter); ok {
3940
writer = bw.writer
4041
}
41-
compressedWriter := newBufferedWriter(writer, hardcodedS3ChunkSize, w.compressType)
42+
// the external storage will do access recording, so no need to pass it again.
43+
compressedWriter := newBufferedWriter(writer, hardcodedS3ChunkSize, w.compressType, nil)
4244
return compressedWriter, nil
4345
}
4446

@@ -148,13 +150,15 @@ func (c *compressReader) GetFileSize() (int64, error) {
148150
}
149151

150152
type flushStorageWriter struct {
151-
writer io.Writer
152-
flusher flusher
153-
closer io.Closer
153+
writer io.Writer
154+
flusher flusher
155+
closer io.Closer
156+
accessRec *recording.AccessStats
154157
}
155158

156159
func (w *flushStorageWriter) Write(_ context.Context, data []byte) (int, error) {
157160
n, err := w.writer.Write(data)
161+
w.accessRec.RecWrite(n)
158162
return n, errors.Trace(err)
159163
}
160164

@@ -166,10 +170,11 @@ func (w *flushStorageWriter) Close(_ context.Context) error {
166170
return w.closer.Close()
167171
}
168172

169-
func newFlushStorageWriter(writer io.Writer, flusher2 flusher, closer io.Closer) *flushStorageWriter {
173+
func newFlushStorageWriter(writer io.Writer, flusher2 flusher, closer io.Closer, accessRec *recording.AccessStats) *flushStorageWriter {
170174
return &flushStorageWriter{
171-
writer: writer,
172-
flusher: flusher2,
173-
closer: closer,
175+
writer: writer,
176+
flusher: flusher2,
177+
closer: closer,
178+
accessRec: accessRec,
174179
}
175180
}

br/pkg/storage/gcs.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ type GCSStorage struct {
117117
handles []*storage.BucketHandle
118118
clients []*storage.Client
119119
clientCancel context.CancelFunc
120+
accessRec *recording.AccessStats
120121
}
121122

122123
// CopyFrom implements Copier.
@@ -205,6 +206,7 @@ func (s *GCSStorage) WriteFile(ctx context.Context, name string, data []byte) er
205206
if err != nil {
206207
return errors.Trace(err)
207208
}
209+
s.accessRec.RecWrite(len(data))
208210
return wc.Close()
209211
}
210212

@@ -228,6 +230,7 @@ func (s *GCSStorage) ReadFile(ctx context.Context, name string) ([]byte, error)
228230
b = make([]byte, size)
229231
_, err = io.ReadFull(rc, b)
230232
}
233+
s.accessRec.RecRead(len(b))
231234
return b, errors.Trace(err)
232235
}
233236

@@ -345,7 +348,7 @@ func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption)
345348
wc := s.GetBucketHandle().Object(object).NewWriter(ctx)
346349
wc.StorageClass = s.gcs.StorageClass
347350
wc.PredefinedACL = s.gcs.PredefinedAcl
348-
return newFlushStorageWriter(wc, &emptyFlusher{}, wc), nil
351+
return newFlushStorageWriter(wc, &emptyFlusher{}, wc, s.accessRec), nil
349352
}
350353
uri := s.objectName(name)
351354
// 5MB is the minimum part size for GCS.
@@ -354,8 +357,9 @@ func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption)
354357
if err != nil {
355358
return nil, errors.Trace(err)
356359
}
357-
fw := newFlushStorageWriter(w, &emptyFlusher{}, w)
358-
bw := newBufferedWriter(fw, int(partSize), NoCompression)
360+
fw := newFlushStorageWriter(w, &emptyFlusher{}, w, s.accessRec)
361+
// we already pass the accessRec to flushStorageWriter.
362+
bw := newBufferedWriter(fw, int(partSize), NoCompression, nil)
359363
return bw, nil
360364
}
361365

@@ -423,14 +427,14 @@ skipHandleCred:
423427
}
424428

425429
httpClient := opts.HTTPClient
426-
if reqRec := recording.GetRequests(ctx); reqRec != nil {
430+
if opts.AccessRecording != nil {
427431
if httpClient == nil {
428432
transport, _ := http.DefaultTransport.(*http.Transport)
429433
httpClient = &http.Client{Transport: transport.Clone()}
430434
}
431435
httpClient.Transport = &roundTripperWrapper{
432436
RoundTripper: httpClient.Transport,
433-
requestsRec: reqRec,
437+
accessRec: opts.AccessRecording,
434438
}
435439
}
436440
if httpClient != nil {
@@ -460,6 +464,7 @@ skipHandleCred:
460464
idx: atomic.NewInt64(0),
461465
clientCnt: gcsClientCnt,
462466
clientOps: clientOps,
467+
accessRec: opts.AccessRecording,
463468
}
464469
if err := ret.Reset(ctx); err != nil {
465470
return nil, errors.Trace(err)
@@ -632,6 +637,7 @@ func (r *gcsObjectReader) Read(p []byte) (n int, err error) {
632637
}
633638
}
634639
n, err = r.reader.Read(p)
640+
r.storage.accessRec.RecRead(n)
635641
r.pos += int64(n)
636642
return n, err
637643
}
@@ -700,10 +706,10 @@ func (r *gcsObjectReader) GetFileSize() (int64, error) {
700706

701707
type roundTripperWrapper struct {
702708
http.RoundTripper
703-
requestsRec *recording.Requests
709+
accessRec *recording.AccessStats
704710
}
705711

706712
func (rt *roundTripperWrapper) RoundTrip(req *http.Request) (*http.Response, error) {
707-
rt.requestsRec.Rec(req)
713+
rt.accessRec.RecRequest(req)
708714
return rt.RoundTripper.RoundTrip(req)
709715
}

br/pkg/storage/gcs_test.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,22 @@ import (
1919

2020
"github.com/fsouza/fake-gcs-server/fakestorage"
2121
backuppb "github.com/pingcap/kvproto/pkg/brpb"
22+
"github.com/pingcap/tidb/br/pkg/storage/recording"
2223
"github.com/pingcap/tidb/pkg/util/intest"
2324
"github.com/stretchr/testify/require"
2425
"golang.org/x/sync/errgroup"
2526
)
2627

27-
func TestGCS(t *testing.T) {
28+
func CheckAccessStats(t *testing.T, rec *recording.AccessStats, expectedGets, expectedPuts, expectedRead, expectWrite int) {
29+
t.Helper()
30+
require.EqualValues(t, expectedGets, rec.Requests.Get.Load())
31+
require.EqualValues(t, expectedPuts, rec.Requests.Put.Load())
32+
require.EqualValues(t, expectedRead, rec.Traffic.Read.Load())
33+
require.EqualValues(t, expectWrite, rec.Traffic.Write.Load())
34+
}
35+
36+
func prepareGCSStore(t *testing.T, bucketName string, accessRec *recording.AccessStats) (*fakestorage.Server, *GCSStorage) {
37+
t.Helper()
2838
require.True(t, intest.InTest)
2939
ctx := context.Background()
3040

@@ -33,7 +43,6 @@ func TestGCS(t *testing.T) {
3343
}
3444
server, err := fakestorage.NewServerWithOptions(opts)
3545
require.NoError(t, err)
36-
bucketName := "testbucket"
3746
server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName})
3847

3948
gcs := &backuppb.GCS{
@@ -47,10 +56,19 @@ func TestGCS(t *testing.T) {
4756
SendCredentials: false,
4857
CheckPermissions: []Permission{AccessBuckets},
4958
HTTPClient: server.HTTPClient(),
59+
AccessRecording: accessRec,
5060
})
5161
require.NoError(t, err)
62+
return server, stg
63+
}
64+
65+
func TestGCS(t *testing.T) {
66+
require.True(t, intest.InTest)
67+
ctx := context.Background()
68+
bucketName := "testbucket"
69+
server, stg := prepareGCSStore(t, bucketName, nil)
5270

53-
err = stg.WriteFile(ctx, "key", []byte("data"))
71+
err := stg.WriteFile(ctx, "key", []byte("data"))
5472
require.NoError(t, err)
5573

5674
err = stg.WriteFile(ctx, "key1", []byte("data1"))
@@ -629,3 +647,29 @@ func TestDeleteFiles(t *testing.T) {
629647
require.NoError(t, err)
630648
require.NoError(t, stg.DeleteFiles(ctx, []string{filename, "not-exist-file"}))
631649
}
650+
651+
func TestGCSAccessRecording(t *testing.T) {
652+
ctx := context.Background()
653+
accessRec := &recording.AccessStats{}
654+
_, store := prepareGCSStore(t, "testbucket", accessRec)
655+
require.NoError(t, store.WriteFile(ctx, "a.txt", []byte("hello")))
656+
CheckAccessStats(t, accessRec, 0, 1, 0, 5)
657+
_, err := store.ReadFile(ctx, "a.txt")
658+
require.NoError(t, err)
659+
CheckAccessStats(t, accessRec, 1, 1, 5, 5)
660+
writer, err := store.Create(ctx, "b.txt", nil)
661+
require.NoError(t, err)
662+
_, err = writer.Write(ctx, []byte(" world!"))
663+
require.NoError(t, err)
664+
require.NoError(t, writer.Close(ctx))
665+
CheckAccessStats(t, accessRec, 1, 2, 5, 12)
666+
reader, err := store.Open(ctx, "b.txt", nil)
667+
require.NoError(t, err)
668+
buf := make([]byte, 20)
669+
n, err := reader.Read(buf)
670+
require.NoError(t, err)
671+
require.Equal(t, 7, n)
672+
require.NoError(t, reader.Close())
673+
// Open will use 2 get, one for get file size.
674+
CheckAccessStats(t, accessRec, 3, 2, 12, 12)
675+
}

br/pkg/storage/ks3.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ const (
5252

5353
// KS3Storage acts almost same as S3Storage except it's used for kingsoft s3.
5454
type KS3Storage struct {
55-
svc *s3.S3 // https://github.com/ks3sdklib/aws-sdk-go/issues/28
56-
options *backuppb.S3
55+
svc *s3.S3 // https://github.com/ks3sdklib/aws-sdk-go/issues/28
56+
options *backuppb.S3
57+
accessRec *recording.AccessStats
5758
}
5859

5960
// NewKS3Storage initialize a new s3 storage for metadata.
@@ -106,11 +107,11 @@ func NewKS3Storage(
106107
}
107108
c := s3.New(awsConfig)
108109

109-
if reqRec := recording.GetRequests(ctx); reqRec != nil {
110+
if opts.AccessRecording != nil {
110111
// unlike AWS SDK, ks3 only support change handlers after we initialize
111112
// the client, so no need to call defaults.Handlers().
112113
c.Handlers.Send.PushBack(func(r *aws.Request) {
113-
reqRec.Rec(r.HTTPRequest)
114+
opts.AccessRecording.RecRequest(r.HTTPRequest)
114115
})
115116
}
116117

@@ -126,8 +127,9 @@ func NewKS3Storage(
126127
}
127128

128129
return &KS3Storage{
129-
svc: c,
130-
options: &qs,
130+
svc: c,
131+
options: &qs,
132+
accessRec: opts.AccessRecording,
131133
}, nil
132134
}
133135

@@ -281,6 +283,7 @@ func (rs *KS3Storage) WriteFile(ctx context.Context, file string, data []byte) e
281283
// since aws-go-sdk already did it in #computeBodyHashes
282284
// https://github.com/aws/aws-sdk-go/blob/bcb2cf3fc2263c8c28b3119b07d2dbb44d7c93a0/service/s3/body_hash.go#L30
283285
_, err := rs.svc.PutObjectWithContext(ctx, input)
286+
rs.accessRec.RecWrite(len(data))
284287
return errors.Trace(err)
285288
}
286289

@@ -301,6 +304,7 @@ func (rs *KS3Storage) ReadFile(ctx context.Context, file string) ([]byte, error)
301304
if err != nil {
302305
return nil, errors.Trace(err)
303306
}
307+
rs.accessRec.RecRead(len(data))
304308
return data, nil
305309
}
306310

@@ -590,6 +594,7 @@ func (r *ks3ObjectReader) Read(p []byte) (n int, err error) {
590594
n, err = r.reader.Read(p[:maxCnt])
591595
}
592596

597+
r.storage.accessRec.RecRead(n)
593598
r.pos += int64(n)
594599
return
595600
}
@@ -737,7 +742,7 @@ func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOpt
737742
if option != nil && option.PartSize > 0 {
738743
bufSize = int(option.PartSize)
739744
}
740-
uploaderWriter := newBufferedWriter(uploader, bufSize, NoCompression)
745+
uploaderWriter := newBufferedWriter(uploader, bufSize, NoCompression, rs.accessRec)
741746
return uploaderWriter, nil
742747
}
743748

br/pkg/storage/local.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (
273273
return nil, errors.Trace(err)
274274
}
275275
buf := bufio.NewWriter(file)
276-
return newFlushStorageWriter(buf, buf, file), nil
276+
return newFlushStorageWriter(buf, buf, file, nil), nil
277277
}
278278

279279
// Rename implements ExternalStorage interface.

br/pkg/storage/recording/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,5 @@ go_test(
1313
srcs = ["recording_test.go"],
1414
embed = [":recording"],
1515
flaky = True,
16-
shard_count = 2,
1716
deps = ["@com_github_stretchr_testify//require"],
1817
)

0 commit comments

Comments
 (0)