This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new ac80267fcb Add utility to create an empty wal file. (#4116)
ac80267fcb is described below

commit ac80267fcb797dd57c5fead1cdd4e96d8a480a00
Author: EdColeman <d...@etcoleman.com>
AuthorDate: Tue Apr 9 11:24:18 2024 -0400

    Add utility to create an empty wal file. (#4116)
    
    Updated the CreateEmpty utility with an option to create empty wal file(s).
---
 .../accumulo/core/file/rfile/CreateEmpty.java      | 113 ---------
 .../apache/accumulo/tserver/util/CreateEmpty.java  | 198 ++++++++++++++++
 .../accumulo/tserver/util/CreateEmptyTest.java     | 263 +++++++++++++++++++++
 .../test/functional/RecoveryWithEmptyRFileIT.java  |   3 +-
 .../apache/accumulo/test/start/KeywordStartIT.java |   2 +-
 5 files changed, 464 insertions(+), 115 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java 
b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
deleted file mode 100644
index 5a8d4dc104..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.file.rfile;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.accumulo.core.cli.Help;
-import org.apache.accumulo.core.conf.DefaultConfiguration;
-import org.apache.accumulo.core.file.FileSKVWriter;
-import org.apache.accumulo.core.file.rfile.bcfile.Compression;
-import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
-import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
-import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
-import org.apache.accumulo.start.spi.KeywordExecutable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.beust.jcommander.IParameterValidator;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.google.auto.service.AutoService;
-
-/**
- * Create an empty RFile for use in recovering from data loss where Accumulo 
still refers internally
- * to a path.
- */
-@AutoService(KeywordExecutable.class)
-public class CreateEmpty implements KeywordExecutable {
-  private static final Logger log = LoggerFactory.getLogger(CreateEmpty.class);
-
-  public static class NamedLikeRFile implements IParameterValidator {
-    @Override
-    public void validate(String name, String value) throws ParameterException {
-      if (!value.endsWith(".rf")) {
-        throw new ParameterException("File must end with .rf and '" + value + 
"' does not.");
-      }
-    }
-  }
-
-  public static class IsSupportedCompressionAlgorithm implements 
IParameterValidator {
-    @Override
-    public void validate(String name, String value) throws ParameterException {
-      List<String> algorithms = Compression.getSupportedAlgorithms();
-      if (!algorithms.contains(value)) {
-        throw new ParameterException("Compression codec must be one of " + 
algorithms);
-      }
-    }
-  }
-
-  static class Opts extends Help {
-    @Parameter(names = {"-c", "--codec"}, description = "the compression codec 
to use.",
-        validateWith = IsSupportedCompressionAlgorithm.class)
-    String codec = new NoCompression().getName();
-    @Parameter(
-        description = " <path> { <path> ... } Each path given is a URL."
-            + " Relative paths are resolved according to the default 
filesystem defined in"
-            + " your Hadoop configuration, which is usually an HDFS instance.",
-        required = true, validateWith = NamedLikeRFile.class)
-    List<String> files = new ArrayList<>();
-  }
-
-  public static void main(String[] args) throws Exception {
-    new CreateEmpty().execute(args);
-  }
-
-  @Override
-  public String keyword() {
-    return "create-empty";
-  }
-
-  @Override
-  public String description() {
-    return "Creates an empty rfile";
-  }
-
-  @Override
-  public void execute(String[] args) throws Exception {
-    Configuration conf = new Configuration();
-
-    Opts opts = new Opts();
-    opts.parseArgs("accumulo create-empty", args);
-
-    for (String arg : opts.files) {
-      UnreferencedTabletFile file = UnreferencedTabletFile.of(conf, new 
Path(arg));
-      log.info("Writing to file '{}'", file);
-      FileSKVWriter writer = new RFileOperations().newWriterBuilder()
-          .forFile(file, file.getPath().getFileSystem(conf), conf, 
NoCryptoServiceFactory.NONE)
-          
.withTableConfiguration(DefaultConfiguration.getInstance()).withCompression(opts.codec)
-          .build();
-      writer.close();
-    }
-  }
-
-}
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/util/CreateEmpty.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/util/CreateEmpty.java
new file mode 100644
index 0000000000..2f15d2a382
--- /dev/null
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/util/CreateEmpty.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.tserver.log.DfsLogger.LOG_FILE_HEADER_V4;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
+import org.apache.accumulo.core.crypto.CryptoUtils;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression;
+import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.file.rfile.compression.NoCompression;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.auto.service.AutoService;
+
+/**
+ * Create an empty RFile for use in recovering from data loss where Accumulo 
still refers internally
+ * to a path.
+ */
+@AutoService(KeywordExecutable.class)
+public class CreateEmpty implements KeywordExecutable {
+  private static final Logger LOG = LoggerFactory.getLogger(CreateEmpty.class);
+  public static final String RF_EXTENSION = ".rf";
+  public static final String WAL_EXTENSION = ".wal";
+
+  public static class MatchesValidFileExtension implements IParameterValidator 
{
+    @Override
+    public void validate(String name, String value) throws ParameterException {
+      if (value.endsWith(RF_EXTENSION) || value.endsWith(WAL_EXTENSION)) {
+        return;
+      }
+      throw new ParameterException("File must end with either " + RF_EXTENSION 
+ " or "
+          + WAL_EXTENSION + " and '" + value + "' does not.");
+    }
+  }
+
+  public static class IsSupportedCompressionAlgorithm implements 
IParameterValidator {
+    @Override
+    public void validate(String name, String value) throws ParameterException {
+      List<String> algorithms = Compression.getSupportedAlgorithms();
+      if (!algorithms.contains(value)) {
+        throw new ParameterException("Compression codec must be one of " + 
algorithms);
+      }
+    }
+  }
+
+  static class Opts extends ConfigOpts {
+    @Parameter(names = {"-c", "--codec"}, description = "the compression codec 
to use.",
+        validateWith = IsSupportedCompressionAlgorithm.class)
+    String codec = new NoCompression().getName();
+    @Parameter(
+        description = " <path> { <path> ... } Each path given is a URL."
+            + " Relative paths are resolved according to the default 
filesystem defined in"
+            + " your Hadoop configuration, which is usually an HDFS instance.",
+        required = true, validateWith = MatchesValidFileExtension.class)
+    List<String> files = new ArrayList<>();
+
+    public enum OutFileType {
+      RF, WAL
+    }
+
+    // rfile as default keeps previous behaviour
+    @Parameter(names = "--type")
+    public OutFileType fileType = OutFileType.RF;
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    new CreateEmpty().execute(args);
+  }
+
+  @Override
+  public String keyword() {
+    return "create-empty";
+  }
+
+  @Override
+  public String description() {
+    return "Creates empty RFiles (RF) or empty write-ahead log (WAL) files for 
emergency recovery";
+  }
+
+  @Override
+  public void execute(String[] args) throws Exception {
+
+    Opts opts = new Opts();
+    opts.parseArgs("accumulo create-empty", args);
+
+    var siteConfig = opts.getSiteConfiguration();
+    try (ServerContext context = new ServerContext(siteConfig)) {
+      switch (opts.fileType) {
+        case RF:
+          createEmptyRFile(opts, context);
+          break;
+        case WAL:
+          createEmptyWal(opts, context);
+          break;
+        default:
+          throw new ParameterException("file type must be RF or WAL, received: 
" + opts.fileType);
+      }
+    }
+  }
+
+  void createEmptyRFile(final Opts opts, final ServerContext context) throws 
IOException {
+    var vm = context.getVolumeManager();
+
+    CryptoEnvironment env = new 
CryptoEnvironmentImpl(CryptoEnvironment.Scope.TABLE);
+    CryptoService cryptoService = context.getCryptoFactory().getService(env,
+        context.getConfiguration().getAllCryptoProperties());
+
+    for (String filename : opts.files) {
+      Path path = new Path(filename);
+      checkFileExists(path, vm);
+      UnreferencedTabletFile tabletFile =
+          UnreferencedTabletFile.of(vm.getFileSystemByPath(path), path);
+      LOG.info("Writing to file '{}'", tabletFile);
+      FileSKVWriter writer = new RFileOperations().newWriterBuilder()
+          .forFile(tabletFile, vm.getFileSystemByPath(path), 
context.getHadoopConf(), cryptoService)
+          
.withTableConfiguration(DefaultConfiguration.getInstance()).withCompression(opts.codec)
+          .build();
+      writer.close();
+    }
+  }
+
+  void createEmptyWal(Opts opts, ServerContext context) throws IOException {
+    final LogFileValue EMPTY = new LogFileValue();
+
+    var vm = context.getVolumeManager();
+
+    for (String filename : opts.files) {
+      Path path = new Path(filename);
+      checkFileExists(path, vm);
+      try (var out = new DataOutputStream(vm.create(path))) {
+        LOG.info("Output file: {}", path);
+
+        out.write(LOG_FILE_HEADER_V4.getBytes(UTF_8));
+
+        CryptoEnvironment env = new 
CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL);
+        CryptoService cryptoService = 
context.getCryptoFactory().getService(env,
+            context.getConfiguration().getAllCryptoProperties());
+
+        byte[] cryptoParams = 
cryptoService.getFileEncrypter(env).getDecryptionParameters();
+        CryptoUtils.writeParams(cryptoParams, out);
+
+        LogFileKey key = new LogFileKey();
+        key.event = OPEN;
+        key.tserverSession = "";
+
+        key.write(out);
+        EMPTY.write(out);
+      }
+    }
+  }
+
+  private void checkFileExists(final Path path, final VolumeManager vm) throws 
IOException {
+    if (vm.exists(path)) {
+      throw new IllegalArgumentException(path + " exists");
+    }
+  }
+}
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/util/CreateEmptyTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/util/CreateEmptyTest.java
new file mode 100644
index 0000000000..ad9f73f948
--- /dev/null
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/util/CreateEmptyTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.tserver.log.DfsLogger.LOG_FILE_HEADER_V4;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.client.rfile.RFile;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
+import org.apache.accumulo.core.crypto.CryptoUtils;
+import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
+import org.apache.accumulo.core.spi.crypto.CryptoService;
+import org.apache.accumulo.core.spi.crypto.GenericCryptoServiceFactory;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.tserver.log.DfsLogger;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public class CreateEmptyTest {
+  @TempDir
+  private static File tempDir;
+
+  private ServerContext context;
+
+  @BeforeEach
+  public void init() throws IOException {
+    ConfigurationCopy config = new 
ConfigurationCopy(DefaultConfiguration.getInstance());
+    config.set(Property.INSTANCE_VOLUMES.getKey(), "file:///");
+
+    context = mock(ServerContext.class);
+    expect(context.getCryptoFactory()).andReturn(new 
GenericCryptoServiceFactory()).anyTimes();
+    expect(context.getConfiguration()).andReturn(config).anyTimes();
+    expect(context.getHadoopConf()).andReturn(new Configuration()).anyTimes();
+    VolumeManager volumeManager = VolumeManagerImpl.get(config, new 
Configuration());
+    expect(context.getVolumeManager()).andReturn(volumeManager).anyTimes();
+    replay(context);
+  }
+
+  @AfterEach
+  public void verifyMock() {
+    verify(context);
+  }
+
+  @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "path 
provided by test")
+  @Test
+  public void exceptionOnFileExistsTest() throws Exception {
+    CreateEmpty createEmpty = new CreateEmpty();
+
+    String wal1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".wal");
+    String rf1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".rf");
+
+    // create the file so it exists
+    File f = new File(wal1);
+    assertTrue(f.createNewFile());
+
+    String[] walArgs = {"--type", "WAL", wal1};
+    CreateEmpty.Opts walOpts = new CreateEmpty.Opts();
+    walOpts.parseArgs("accumulo create-empty", walArgs);
+
+    assertThrows(IllegalArgumentException.class,
+        () -> createEmpty.createEmptyWal(walOpts, context));
+
+    // create the file so it exists
+    File f2 = new File(rf1);
+    assertTrue(f2.createNewFile());
+
+    String[] rfArgs = {"--type", "RF", rf1};
+    CreateEmpty.Opts rfOpts = new CreateEmpty.Opts();
+    rfOpts.parseArgs("accumulo create-empty", rfArgs);
+    assertThrows(IllegalArgumentException.class,
+        () -> createEmpty.createEmptyRFile(walOpts, context));
+  }
+
+  @Test
+  public void createRfileTest() throws Exception {
+    CreateEmpty createEmpty = new CreateEmpty();
+
+    String file1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".rf");
+    String file2 = genFilename(tempDir.getAbsolutePath() + "/empty", ".rf");
+
+    String[] args = {"--type", "RF", file1, file2};
+    CreateEmpty.Opts opts = new CreateEmpty.Opts();
+    opts.parseArgs("accumulo create-empty", args);
+
+    createEmpty.createEmptyRFile(opts, context);
+    VolumeManager vm = context.getVolumeManager();
+    assertTrue(vm.exists(new Path(file1)));
+    try (var scanner = RFile.newScanner().from(file1).build()) {
+      assertEquals(0, scanner.stream().count());
+    }
+
+    assertTrue(vm.exists(new Path(file2)));
+    try (var scanner = RFile.newScanner().from(file2).build()) {
+      assertEquals(0, scanner.stream().count());
+    }
+
+  }
+
+  /**
+   * Validate that the default type is RF (RecoveryWithEmptyRFileIT also needs 
this(
+   */
+  @Test
+  public void createRfileDefaultTest() throws Exception {
+    CreateEmpty createEmpty = new CreateEmpty();
+
+    String file1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".rf");
+
+    String[] args = {file1};
+    CreateEmpty.Opts opts = new CreateEmpty.Opts();
+    opts.parseArgs("accumulo create-empty", args);
+
+    createEmpty.createEmptyRFile(opts, context);
+    VolumeManager vm = context.getVolumeManager();
+    assertTrue(vm.exists(new Path(file1)));
+    try (var scanner = RFile.newScanner().from(file1).build()) {
+      assertEquals(0, scanner.stream().count());
+    }
+  }
+
+  @Test
+  public void createWalTest() throws Exception {
+    CreateEmpty createEmpty = new CreateEmpty();
+
+    String file1 = genFilename(tempDir.getAbsolutePath() + "/empty", ".wal");
+    String file2 = genFilename(tempDir.getAbsolutePath() + "/empty", ".wal");
+
+    String[] args = {"--type", "WAL", file1, file2};
+    CreateEmpty.Opts opts = new CreateEmpty.Opts();
+    opts.parseArgs("accumulo create-empty", args);
+
+    createEmpty.createEmptyWal(opts, context);
+
+    checkWalContext(file1);
+    readLogFile(file1);
+
+    checkWalContext(file2);
+  }
+
+  /**
+   * Reads the log file and looks for specific information (crypto id, event 
== OPEN)
+   */
+  private void checkWalContext(final String expected) throws IOException {
+    Path path = new Path(expected);
+    VolumeManager vm = context.getVolumeManager();
+    assertTrue(vm.exists(path));
+
+    vm.open(path);
+    try (InputStream inputStream = vm.open(path).getWrappedStream();
+        DataInputStream dis = new DataInputStream(inputStream)) {
+      byte[] headerBuf = new byte[1024];
+      int len = dis.read(headerBuf, 0, LOG_FILE_HEADER_V4.length());
+      assertEquals(LOG_FILE_HEADER_V4.length(), len);
+      assertEquals(LOG_FILE_HEADER_V4,
+          new String(headerBuf, 0, LOG_FILE_HEADER_V4.length(), UTF_8));
+
+      CryptoEnvironment env = new 
CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL);
+      CryptoService cryptoService = context.getCryptoFactory().getService(env,
+          context.getConfiguration().getAllCryptoProperties());
+
+      byte[] decryptionParams = 
cryptoService.getFileEncrypter(env).getDecryptionParameters();
+
+      var cryptParams = CryptoUtils.readParams(dis);
+      assertArrayEquals(decryptionParams, cryptParams);
+
+      LogFileKey key = new LogFileKey();
+      key.readFields(dis);
+
+      assertEquals(key.event, LogEvents.OPEN);
+      assertEquals("", key.tserverSession);
+      assertNull(key.filename);
+    }
+  }
+
+  /**
+   * Scan through log file and check that there is one event.
+   */
+  private void readLogFile(final String filename) throws Exception {
+    Path path = new Path(filename);
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+
+    FileSystem fs = context.getVolumeManager().getFileSystemByPath(path);
+
+    CryptoEnvironment env = new 
CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL);
+    CryptoService cryptoService = context.getCryptoFactory().getService(env,
+        context.getConfiguration().getAllCryptoProperties());
+
+    int eventCount = 0;
+    try (final FSDataInputStream fsinput = fs.open(path);
+        DataInputStream input = DfsLogger.getDecryptingStream(fsinput, 
cryptoService)) {
+      while (true) {
+        try {
+          key.readFields(input);
+          value.readFields(input);
+        } catch (EOFException ex) {
+          break;
+        }
+        eventCount++;
+      }
+    } catch (DfsLogger.LogHeaderIncompleteException e) {
+      fail("Could not read header for {}" + path);
+    } finally {
+      // empty wal has 1 event (OPEN)
+      assertEquals(1, eventCount);
+    }
+  }
+
+  // tempDir is per test suite - generate a one-up count file for each call.
+  private static final AtomicInteger fileCount = new AtomicInteger(0);
+
+  private String genFilename(final String prefix, final String extension) {
+    return prefix + fileCount.incrementAndGet() + extension;
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
index 5524ae188f..c65cd0af5f 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/RecoveryWithEmptyRFileIT.java
@@ -31,12 +31,12 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.rfile.CreateEmpty;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.tserver.util.CreateEmpty;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -95,6 +95,7 @@ public class RecoveryWithEmptyRFileIT extends 
ConfigurableMacBase {
           Path rfile = 
StoredTabletFile.of(entry.getKey().getColumnQualifier()).getPath();
           log.debug("Removing rfile '{}'", rfile);
           cluster.getFileSystem().delete(rfile, false);
+          // following depends on create-empty defaults to rfile.
           Process processInfo = cluster.exec(CreateEmpty.class, 
rfile.toString()).getProcess();
           assertEquals(0, processInfo.waitFor());
         }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java 
b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
index bab15197eb..07d45a0238 100644
--- a/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/start/KeywordStartIT.java
@@ -39,7 +39,6 @@ import java.util.TreeMap;
 
 import org.apache.accumulo.compactor.CompactorExecutable;
 import org.apache.accumulo.coordinator.CoordinatorExecutable;
-import org.apache.accumulo.core.file.rfile.CreateEmpty;
 import org.apache.accumulo.core.file.rfile.GenerateSplits;
 import org.apache.accumulo.core.file.rfile.PrintInfo;
 import org.apache.accumulo.core.file.rfile.SplitLarge;
@@ -72,6 +71,7 @@ import org.apache.accumulo.tserver.ScanServerExecutable;
 import org.apache.accumulo.tserver.TServerExecutable;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.logger.LogReader;
+import org.apache.accumulo.tserver.util.CreateEmpty;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;

Reply via email to