Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util;

/**
* Resource lifecycle interface for end-to-end tests.
*
* <p>This interface provides hooks for resource setup and cleanup, allowing resources to
* differentiate between successful and failed tests in their cleanup methods.
*
* <p>This is a JUnit5-compatible version that does not extend TestRule.
*/
public interface ExternalResource {

/** Called before the test execution. */
void before() throws Exception;

/** Called after successful test execution. */
void afterTestSuccess();

/** Called after failed test execution. Defaults to calling {@link #afterTestSuccess()}. */
default void afterTestFailure() {
afterTestSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.flink.util.OperatingSystem;

import org.junit.Assume;
import org.junit.AssumptionViolatedException;
import org.junit.jupiter.api.Assumptions;
import org.opentest4j.TestAbortedException;

import java.util.Arrays;
import java.util.EnumSet;
Expand All @@ -39,26 +39,27 @@ public enum OperatingSystemRestriction {
*
* @param reason reason for the restriction
* @param operatingSystems allowed operating systems
* @throws AssumptionViolatedException if this method is called on a forbidden operating system
* @throws TestAbortedException if this method is called on a forbidden operating system
*/
public static void restrictTo(final String reason, final OperatingSystem... operatingSystems)
throws AssumptionViolatedException {
throws TestAbortedException {
final EnumSet<OperatingSystem> allowed = EnumSet.copyOf(Arrays.asList(operatingSystems));
Assume.assumeTrue(reason, allowed.contains(OperatingSystem.getCurrentOperatingSystem()));
Assumptions.assumeTrue(
allowed.contains(OperatingSystem.getCurrentOperatingSystem()), reason);
}

/**
* Forbids the execution on the given set of operating systems.
*
* @param reason reason for the restriction
* @param forbiddenSystems forbidden operating systems
* @throws AssumptionViolatedException if this method is called on a forbidden operating system
* @throws TestAbortedException if this method is called on a forbidden operating system
*/
public static void forbid(final String reason, final OperatingSystem... forbiddenSystems)
throws AssumptionViolatedException {
throws TestAbortedException {
final OperatingSystem os = OperatingSystem.getCurrentOperatingSystem();
for (final OperatingSystem forbiddenSystem : forbiddenSystems) {
Assume.assumeTrue(reason, os != forbiddenSystem);
Assumptions.assumeTrue(os != forbiddenSystem, reason);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package org.apache.flink.tests.util.cache;

import org.apache.flink.tests.util.ExternalResource;
import org.apache.flink.tests.util.util.FactoryUtils;
import org.apache.flink.util.ExternalResource;

import java.io.IOException;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.flink.tests.util.cache;

import org.junit.rules.TemporaryFolder;
import org.apache.flink.util.FileUtils;

import java.io.IOException;
import java.nio.file.Path;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -31,16 +33,20 @@
public final class LolCache extends AbstractDownloadCache {

private static final Pattern CACHE_FILE_NAME_PATTERN = Pattern.compile(".*");
private final TemporaryFolder folder;
private final Path tempDirectory;

public LolCache(TemporaryFolder folder) {
super(folder.getRoot().toPath());
this.folder = folder;
public LolCache(Path tempDirectory) {
super(tempDirectory);
this.tempDirectory = tempDirectory;
}

@Override
public void afterTestSuccess() {
folder.delete();
try {
FileUtils.deleteDirectory(tempDirectory.toFile());
} catch (IOException e) {
// Ignore cleanup failures
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@

package org.apache.flink.tests.util.cache;

import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

/** A {@link DownloadCacheFactory} for the {@link LolCache}. */
public final class LolCacheFactory implements DownloadCacheFactory {

@Override
public DownloadCache create() throws IOException {
final TemporaryFolder folder = new TemporaryFolder();
folder.create();
return new LolCache(folder);
final Path tempDirectory = Files.createTempDirectory("flink-lol-cache-");
return new LolCache(tempDirectory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.tests.util.flink;

import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.tests.util.ExternalResource;
import org.apache.flink.tests.util.util.FactoryUtils;
import org.apache.flink.util.ExternalResource;

import java.io.IOException;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@
import org.apache.flink.test.util.SQLJobSubmission;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;

import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
Expand All @@ -58,12 +59,12 @@ public class LocalStandaloneFlinkResource implements FlinkResource {

private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneFlinkResource.class);

private final TemporaryFolder temporaryFolder = new TemporaryFolder();
private final Path distributionDirectory;
@Nullable private final Path logBackupDirectory;
private final FlinkResourceSetup setup;

private FlinkDistribution distribution;
private Path tempDirectory;

LocalStandaloneFlinkResource(
Path distributionDirectory,
Expand All @@ -77,8 +78,8 @@ public class LocalStandaloneFlinkResource implements FlinkResource {

@Override
public void before() throws Exception {
temporaryFolder.create();
Path tmp = temporaryFolder.newFolder().toPath();
tempDirectory = Files.createTempDirectory("flink-local-standalone-");
Path tmp = Files.createDirectory(tempDirectory.resolve("dist"));
LOG.info("Copying distribution to {}.", tmp);
TestUtils.copyDirectory(distributionDirectory, tmp);

Expand All @@ -98,7 +99,7 @@ public void before() throws Exception {
@Override
public void afterTestSuccess() {
shutdownCluster();
temporaryFolder.delete();
cleanupTempDirectory();
}

@Override
Expand All @@ -107,7 +108,17 @@ public void afterTestFailure() {
shutdownCluster();
backupLogs();
}
temporaryFolder.delete();
cleanupTempDirectory();
}

private void cleanupTempDirectory() {
if (tempDirectory != null) {
try {
FileUtils.deleteDirectory(tempDirectory.toFile());
} catch (IOException e) {
LOG.warn("Failed to delete temporary directory: {}", tempDirectory, e);
}
}
}

private void shutdownCluster() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,23 @@
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import static org.junit.jupiter.api.Assertions.assertTrue;

/** Tests for {@link TestUtils}. */
public class TestUtilsTest extends TestLogger {

@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@TempDir Path temporaryFolder;

@BeforeClass
@BeforeAll
public static void setupClass() {
OperatingSystemRestriction.forbid(
"Symbolic links usually require special permissions on Windows.",
Expand All @@ -50,7 +50,7 @@ public void copyDirectory() throws IOException {
Paths.get("file1"), Paths.get("dir1", "file2"),
};

Path source = temporaryFolder.newFolder("source").toPath();
Path source = Files.createDirectory(temporaryFolder.resolve("source"));
for (Path file : files) {
Files.createDirectories(source.resolve(file).getParent());
Files.createFile(source.resolve(file));
Expand All @@ -63,7 +63,7 @@ public void copyDirectory() throws IOException {
TestUtils.copyDirectory(symbolicLink, target);

for (Path file : files) {
Assert.assertTrue(Files.exists(target.resolve(file)));
assertTrue(Files.exists(target.resolve(file)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import org.apache.flink.test.util.FileUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -34,18 +32,20 @@
import java.util.List;
import java.util.regex.Pattern;

import static org.junit.jupiter.api.Assertions.assertEquals;

/** Tests for {@link FileUtils}. */
public class FileUtilsTest extends TestLogger {

@ClassRule public static final TemporaryFolder TMP = new TemporaryFolder();
@TempDir static Path tempDir;

private static final List<String> ORIGINAL_LINES =
Collections.unmodifiableList(Arrays.asList("line1", "line2", "line3"));
private Path testFile;

@Before
@BeforeEach
public void setupFile() throws IOException {
Path path = TMP.newFile().toPath();
Path path = Files.createTempFile(tempDir, null, null);

Files.write(path, ORIGINAL_LINES);

Expand All @@ -56,7 +56,7 @@ public void setupFile() throws IOException {
public void replaceSingleMatch() throws IOException {
FileUtils.replace(testFile, Pattern.compile("line1"), matcher -> "removed");

Assert.assertEquals(
assertEquals(
Arrays.asList("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)),
Files.readAllLines(testFile));
}
Expand All @@ -65,14 +65,14 @@ public void replaceSingleMatch() throws IOException {
public void replaceMultipleMatch() throws IOException {
FileUtils.replace(testFile, Pattern.compile("line(.*)"), matcher -> matcher.group(1));

Assert.assertEquals(Arrays.asList("1", "2", "3"), Files.readAllLines(testFile));
assertEquals(Arrays.asList("1", "2", "3"), Files.readAllLines(testFile));
}

@Test
public void replaceWithEmptyLine() throws IOException {
FileUtils.replace(testFile, Pattern.compile("line2"), matcher -> "");

Assert.assertEquals(
assertEquals(
Arrays.asList(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2)),
Files.readAllLines(testFile));
}
Expand Down
2 changes: 1 addition & 1 deletion flink-end-to-end-tests/flink-sql-client-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ under the License.
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<artifactId>testcontainers-kafka</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.File;
Expand Down Expand Up @@ -80,7 +80,9 @@ public class SqlClientITCase {

@Container
public static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
new KafkaContainer(
DockerImageName.parse(DockerImageVersions.KAFKA)
.asCompatibleSubstituteFor("apache/kafka"))
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
.withLogConsumer(LOG_CONSUMER);
Expand Down
Loading