Completely reworked blob storage. No longer care for potential desync between blob storage and world data on crashes. Use blobs as memory mapped byte buffers for drastically reduced memory usage.

This commit is contained in:
Florian Nücke
2021-07-19 23:43:31 +02:00
parent ab4d0ad3c3
commit fddcee9949
10 changed files with 240 additions and 566 deletions

View File

@@ -37,7 +37,7 @@ public final class CommonSetup {
}
private static void handleServerStopped(final FMLServerStoppedEvent event) {
BlobStorage.synchronize();
BlobStorage.close();
Allocator.resetAndCheckLeaks();
FileSystems.reset();
}

View File

@@ -10,6 +10,8 @@ public final class Constants {
public static final int MEGABYTE = 1024 * KILOBYTE;
public static final int GIGABYTE = 1024 * MEGABYTE;
public static int PAGE_SIZE = 4 * 1024;
public static final int CPU_FREQUENCY = 25_000_000;
public static final int SECONDS_TO_TICKS = 20;

View File

@@ -1,11 +1,10 @@
package li.cil.oc2.common.bus.device.item;
import com.google.common.eventbus.Subscribe;
import li.cil.oc2.api.bus.device.ItemDevice;
import li.cil.oc2.api.bus.device.vm.VMDevice;
import li.cil.oc2.api.bus.device.vm.VMDeviceLoadResult;
import li.cil.oc2.api.bus.device.vm.context.VMContext;
import li.cil.oc2.api.bus.device.vm.event.VMResumingRunningEvent;
import li.cil.oc2.common.Constants;
import li.cil.oc2.common.bus.device.util.IdentityProxy;
import li.cil.oc2.common.bus.device.util.OptionalAddress;
import li.cil.oc2.common.bus.device.util.OptionalInterrupt;
@@ -16,15 +15,18 @@ import li.cil.oc2.common.util.NBTTagIds;
import li.cil.sedna.api.device.BlockDevice;
import li.cil.sedna.device.virtio.VirtIOBlockDevice;
import net.minecraft.nbt.CompoundNBT;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
import java.util.UUID;
@SuppressWarnings("UnstableApiUsage")
public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TIdentity> extends IdentityProxy<TIdentity> implements VMDevice, ItemDevice {
private static final Logger LOGGER = LogManager.getLogger();
private static final String DEVICE_TAG_NAME = "device";
private static final String ADDRESS_TAG_NAME = "address";
private static final String INTERRUPT_TAG_NAME = "interrupt";
@@ -32,8 +34,6 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
///////////////////////////////////////////////////////////////
private BlobStorage.JobHandle jobHandle;
protected TBlock data;
protected VirtIOBlockDevice device;
///////////////////////////////////////////////////////////////
@@ -44,7 +44,7 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
private CompoundNBT deviceTag;
// Offline persisted data.
protected UUID blobHandle;
@Nullable protected UUID blobHandle;
///////////////////////////////////////////////////////////////
@@ -56,8 +56,6 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
@Override
public VMDeviceLoadResult mount(final VMContext context) {
data = createBlockDevice();
if (!allocateDevice(context)) {
return VMDeviceLoadResult.fail();
}
@@ -74,8 +72,6 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
context.getEventBus().register(this);
deserializeData();
if (deviceTag != null) {
NBTSerialization.deserialize(deviceTag, device);
}
@@ -85,12 +81,6 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
@Override
public void unmount() {
// Since we cannot serialize the data in a regular serialize call due to the
// actual data being unloaded at that point, but want to permanently persist
// it (it's the contents of the block device) we need to serialize it in the
// unload, too. Don't need to wait for the job, though.
serializeData();
suspend();
deviceTag = null;
address.clear();
@@ -99,22 +89,23 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
@Override
public void suspend() {
data = null;
jobHandle = null;
if (device != null) {
try {
device.close();
} catch (final IOException e) {
LOGGER.error(e);
}
}
if (blobHandle != null) {
BlobStorage.close(blobHandle);
}
device = null;
}
@Subscribe
public void handleResumingRunningEvent(final VMResumingRunningEvent event) {
awaitStorageOperation();
}
@Override
public void exportToItemStack(final CompoundNBT nbt) {
if (blobHandle == null && data != null) {
getSerializationStream(data).ifPresent(stream -> blobHandle = BlobStorage.validateHandle(blobHandle));
}
if (blobHandle != null) {
nbt.putUUID(BLOB_HANDLE_TAG_NAME, blobHandle);
}
@@ -131,7 +122,10 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
public CompoundNBT serializeNBT() {
final CompoundNBT tag = new CompoundNBT();
serializeData();
if (blobHandle != null) {
tag.putUUID(BLOB_HANDLE_TAG_NAME, blobHandle);
}
if (device != null) {
deviceTag = NBTSerialization.serialize(device);
}
@@ -145,10 +139,6 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
tag.putInt(INTERRUPT_TAG_NAME, interrupt.getAsInt());
}
if (blobHandle != null) {
tag.putUUID(BLOB_HANDLE_TAG_NAME, blobHandle);
}
return tag;
}
@@ -169,42 +159,9 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
}
}
public void serializeData() {
if (data != null) {
final Optional<InputStream> optional = getSerializationStream(data);
optional.ifPresent(stream -> {
blobHandle = BlobStorage.validateHandle(blobHandle);
jobHandle = BlobStorage.submitSave(blobHandle, stream);
});
if (!optional.isPresent()) {
BlobStorage.freeHandle(blobHandle);
blobHandle = null;
}
}
}
public void deserializeData() {
if (data != null && blobHandle != null) {
try {
jobHandle = BlobStorage.submitLoad(blobHandle, getDeserializationStream(data));
} catch (final UnsupportedOperationException ignored) {
// If logic producing the block data implementation changed between saves we
// can potentially end up in a state where we now have read only block data
// with some previously persisted data. A call to getDeserializationStream
// will, indirectly, lead to this exception. So we just ignore it.
}
}
}
///////////////////////////////////////////////////////////////
protected abstract int getSize();
protected abstract TBlock createBlockDevice();
protected abstract Optional<InputStream> getSerializationStream(TBlock device);
protected abstract OutputStream getDeserializationStream(TBlock device);
protected abstract TBlock createBlockDevice() throws IOException;
protected void handleDataAccess() {
}
@@ -212,24 +169,22 @@ public abstract class AbstractBlockDeviceVMDevice<TBlock extends BlockDevice, TI
///////////////////////////////////////////////////////////////
private boolean allocateDevice(final VMContext context) {
if (!context.getMemoryAllocator().claimMemory(getSize())) {
if (!context.getMemoryAllocator().claimMemory(Constants.PAGE_SIZE)) {
return false;
}
final ListenableBlockDevice listenableData = new ListenableBlockDevice(data);
listenableData.onAccess.add(this::handleDataAccess);
device = new VirtIOBlockDevice(context.getMemoryMap(), listenableData);
try {
final ListenableBlockDevice listenableData = new ListenableBlockDevice(createBlockDevice());
listenableData.onAccess.add(this::handleDataAccess);
device = new VirtIOBlockDevice(context.getMemoryMap(), listenableData);
} catch (final IOException e) {
LOGGER.error(e);
return false;
}
return true;
}
private void awaitStorageOperation() {
if (jobHandle != null) {
jobHandle.await();
jobHandle = null;
}
}
///////////////////////////////////////////////////////////////
private static final class ListenableBlockDevice implements BlockDevice {

View File

@@ -1,18 +1,20 @@
package li.cil.oc2.common.bus.device.item;
import li.cil.oc2.common.serialization.BlobStorage;
import li.cil.oc2.common.util.Location;
import li.cil.oc2.common.util.SoundEvents;
import li.cil.oc2.common.util.ThrottledSoundEmitter;
import li.cil.sedna.device.block.ByteBufferBlockDevice;
import net.minecraft.item.ItemStack;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Supplier;
public final class HardDriveVMDevice extends AbstractBlockDeviceVMDevice<ByteBufferBlockDevice, ItemStack> {
public class HardDriveVMDevice extends AbstractBlockDeviceVMDevice<ByteBufferBlockDevice, ItemStack> {
private final int size;
private final boolean readonly;
private final ThrottledSoundEmitter soundEmitter;
@@ -30,23 +32,11 @@ public final class HardDriveVMDevice extends AbstractBlockDeviceVMDevice<ByteBuf
///////////////////////////////////////////////////////////////////
@Override
protected int getSize() {
return size;
}
@Override
protected ByteBufferBlockDevice createBlockDevice() {
return ByteBufferBlockDevice.create(size, readonly);
}
@Override
protected Optional<InputStream> getSerializationStream(final ByteBufferBlockDevice device) {
return Optional.of(device.getInputStream());
}
@Override
protected OutputStream getDeserializationStream(final ByteBufferBlockDevice device) {
return device.getOutputStream();
protected ByteBufferBlockDevice createBlockDevice() throws IOException {
blobHandle = BlobStorage.validateHandle(blobHandle);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
return ByteBufferBlockDevice.wrap(buffer, readonly);
}
@Override

View File

@@ -0,0 +1,75 @@
package li.cil.oc2.common.bus.device.item;
import com.google.common.eventbus.Subscribe;
import com.google.common.io.ByteStreams;
import li.cil.oc2.api.bus.device.vm.event.VMResumingRunningEvent;
import li.cil.oc2.common.util.Location;
import li.cil.sedna.api.device.BlockDevice;
import li.cil.sedna.device.block.ByteBufferBlockDevice;
import net.minecraft.item.ItemStack;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
@SuppressWarnings("UnstableApiUsage")
public final class HardDriveVMDeviceWithInitialData extends HardDriveVMDevice {
private static final Logger LOGGER = LogManager.getLogger();
private static final ExecutorService WORKERS = Executors.newCachedThreadPool(r -> {
final Thread thread = new Thread(r, "Hard Drive Initializer");
thread.setDaemon(false);
return thread;
});
private final BlockDevice base;
private Future<Void> copyJob;
///////////////////////////////////////////////////////////////////
public HardDriveVMDeviceWithInitialData(final ItemStack identity, final BlockDevice base, final boolean readonly, final Supplier<Optional<Location>> location) {
super(identity, (int) base.getCapacity(), readonly, location);
this.base = base;
}
@Subscribe
public void handleResumingRunningEvent(final VMResumingRunningEvent event) {
if (copyJob != null) {
try {
copyJob.get();
} catch (final Throwable e) {
LOGGER.error(e);
} finally {
copyJob = null;
}
}
}
///////////////////////////////////////////////////////////////////
@Override
protected ByteBufferBlockDevice createBlockDevice() throws IOException {
final boolean isInitializing = blobHandle == null;
final ByteBufferBlockDevice device = super.createBlockDevice();
if (isInitializing) {
copyJob = CompletableFuture.runAsync(() -> {
try {
try (final InputStream input = base.getInputStream(0);
final OutputStream output = device.getOutputStream(0)) {
ByteStreams.copy(input, output);
}
} catch (final IOException e) {
LOGGER.error(e);
}
}, WORKERS);
}
return device;
}
}

View File

@@ -1,33 +1,35 @@
package li.cil.oc2.common.bus.device.item;
import com.google.common.eventbus.Subscribe;
import li.cil.oc2.api.bus.device.ItemDevice;
import li.cil.oc2.api.bus.device.vm.VMDevice;
import li.cil.oc2.api.bus.device.vm.VMDeviceLoadResult;
import li.cil.oc2.api.bus.device.vm.context.VMContext;
import li.cil.oc2.api.bus.device.vm.event.VMResumingRunningEvent;
import li.cil.oc2.common.Constants;
import li.cil.oc2.common.bus.device.util.IdentityProxy;
import li.cil.oc2.common.bus.device.util.OptionalAddress;
import li.cil.oc2.common.serialization.BlobStorage;
import li.cil.oc2.common.util.NBTTagIds;
import li.cil.sedna.api.device.PhysicalMemory;
import li.cil.sedna.device.memory.Memory;
import li.cil.sedna.memory.PhysicalMemoryInputStream;
import li.cil.sedna.memory.PhysicalMemoryOutputStream;
import li.cil.sedna.device.memory.ByteBufferMemory;
import net.minecraft.item.ItemStack;
import net.minecraft.nbt.CompoundNBT;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.UUID;
@SuppressWarnings("UnstableApiUsage")
public final class MemoryDevice extends IdentityProxy<ItemStack> implements VMDevice, ItemDevice {
private static final Logger LOGGER = LogManager.getLogger();
private static final String BLOB_HANDLE_TAG_NAME = "blob";
private static final String ADDRESS_TAG_NAME = "address";
///////////////////////////////////////////////////////////////
private final int size;
private BlobStorage.JobHandle jobHandle;
private PhysicalMemory device;
///////////////////////////////////////////////////////////////
@@ -54,8 +56,6 @@ public final class MemoryDevice extends IdentityProxy<ItemStack> implements VMDe
return VMDeviceLoadResult.fail();
}
loadPersistedState();
context.getEventBus().register(this);
return VMDeviceLoadResult.success();
@@ -66,32 +66,37 @@ public final class MemoryDevice extends IdentityProxy<ItemStack> implements VMDe
suspend();
// Memory is volatile, so free up our persisted blob when device is unloaded.
BlobStorage.freeHandle(blobHandle);
blobHandle = null;
jobHandle = null;
if (blobHandle != null) {
BlobStorage.delete(blobHandle);
blobHandle = null;
}
address.clear();
}
@Override
public void suspend() {
device = null;
}
if (device != null) {
try {
device.close();
} catch (final Exception e) {
LOGGER.error(e);
}
}
@Subscribe
public void handleResumingRunningEvent(final VMResumingRunningEvent event) {
awaitStorageOperation();
if (blobHandle != null) {
BlobStorage.close(blobHandle);
}
device = null;
}
@Override
public CompoundNBT serializeNBT() {
final CompoundNBT tag = new CompoundNBT();
if (device != null) {
blobHandle = BlobStorage.validateHandle(blobHandle);
if (blobHandle != null) {
tag.putUUID(BLOB_HANDLE_TAG_NAME, blobHandle);
jobHandle = BlobStorage.submitSave(blobHandle, new PhysicalMemoryInputStream(device));
}
if (address.isPresent()) {
tag.putLong(ADDRESS_TAG_NAME, address.getAsLong());
@@ -113,25 +118,19 @@ public final class MemoryDevice extends IdentityProxy<ItemStack> implements VMDe
///////////////////////////////////////////////////////////////
private boolean allocateDevice(final VMContext context) {
if (!context.getMemoryAllocator().claimMemory(size)) {
if (!context.getMemoryAllocator().claimMemory(Constants.PAGE_SIZE)) {
return false;
}
device = Memory.create(size);
try {
blobHandle = BlobStorage.validateHandle(blobHandle);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
device = new ByteBufferMemory(buffer);
} catch (final IOException e) {
return false;
}
return true;
}
private void loadPersistedState() {
if (blobHandle != null) {
jobHandle = BlobStorage.submitLoad(blobHandle, new PhysicalMemoryOutputStream(device));
}
}
private void awaitStorageOperation() {
if (jobHandle != null) {
jobHandle.await();
jobHandle = null;
}
}
}

View File

@@ -1,132 +0,0 @@
package li.cil.oc2.common.bus.device.item;
import li.cil.ceres.BinarySerialization;
import li.cil.oc2.common.util.Location;
import li.cil.oc2.common.util.SoundEvents;
import li.cil.oc2.common.util.ThrottledSoundEmitter;
import li.cil.sedna.api.device.BlockDevice;
import li.cil.sedna.device.block.SparseBlockDevice;
import li.cil.sedna.utils.ByteBufferInputStream;
import net.minecraft.item.ItemStack;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Supplier;
public final class SparseHardDriveVMDevice extends AbstractBlockDeviceVMDevice<SparseBlockDevice, ItemStack> {
private final BlockDevice base;
private final boolean readonly;
private final ThrottledSoundEmitter soundEmitter;
///////////////////////////////////////////////////////////////////
public SparseHardDriveVMDevice(final ItemStack identity, final BlockDevice base, final boolean readonly, final Supplier<Optional<Location>> location) {
super(identity);
this.base = base;
this.readonly = readonly;
this.soundEmitter = new ThrottledSoundEmitter(location, SoundEvents.HDD_ACCESS.get())
.withMinInterval(Duration.ofSeconds(1));
}
///////////////////////////////////////////////////////////////////
@Override
protected int getSize() {
return (int) base.getCapacity();
}
@Override
protected SparseBlockDevice createBlockDevice() {
return new SparseBlockDevice(base, readonly);
}
@Override
protected Optional<InputStream> getSerializationStream(final SparseBlockDevice device) {
if (device.getBlockCount() == 0) {
return Optional.empty();
}
return Optional.of(new SerializationStream(device));
}
@Override
protected OutputStream getDeserializationStream(final SparseBlockDevice device) {
return new DeserializationStream(device);
}
@Override
protected void handleDataAccess() {
soundEmitter.play();
}
///////////////////////////////////////////////////////////////////
private static final class SerializationStream extends InputStream {
private final SparseBlockDevice device;
private ByteBufferInputStream stream;
private SerializationStream(final SparseBlockDevice device) {
this.device = device;
}
@Override
public int read() {
ensureSerialized();
return stream.read();
}
@Override
public int read(final byte[] b, final int off, final int len) {
ensureSerialized();
return stream.read(b, off, len);
}
@Override
public long skip(final long n) throws IOException {
return stream.skip(n);
}
@Override
public int available() throws IOException {
return stream.available();
}
private void ensureSerialized() {
if (stream != null) {
return;
}
stream = new ByteBufferInputStream(BinarySerialization.serialize(device));
}
}
private static final class DeserializationStream extends OutputStream {
private final SparseBlockDevice device;
private final ByteArrayOutputStream stream;
public DeserializationStream(final SparseBlockDevice device) {
this.device = device;
stream = new ByteArrayOutputStream();
}
@Override
public void write(final int b) {
stream.write(b);
}
@Override
public void write(final byte[] b, final int off, final int len) {
stream.write(b, off, len);
}
@Override
public void close() {
BinarySerialization.deserialize(ByteBuffer.wrap(stream.toByteArray()), device);
}
}
}

View File

@@ -7,7 +7,7 @@ import li.cil.oc2.api.bus.device.data.BlockDeviceData;
import li.cil.oc2.api.bus.device.provider.ItemDeviceQuery;
import li.cil.oc2.common.Config;
import li.cil.oc2.common.Constants;
import li.cil.oc2.common.bus.device.item.SparseHardDriveVMDevice;
import li.cil.oc2.common.bus.device.item.HardDriveVMDeviceWithInitialData;
import li.cil.oc2.common.bus.device.provider.util.AbstractItemDeviceProvider;
import li.cil.oc2.common.item.HardDriveWithExternalDataItem;
import li.cil.oc2.common.util.LocationSupplierUtils;
@@ -31,7 +31,7 @@ public final class HardDriveWithExternalDataItemDeviceProvider extends AbstractI
return Optional.empty();
}
return Optional.of(new SparseHardDriveVMDevice(stack, data.getBlockDevice(), false, LocationSupplierUtils.of(query)));
return Optional.of(new HardDriveVMDeviceWithInitialData(stack, data.getBlockDevice(), false, LocationSupplierUtils.of(query)));
}
@Override

View File

@@ -1,8 +1,6 @@
package li.cil.oc2.common.serialization;
import com.google.common.collect.HashMultimap;
import li.cil.oc2.api.API;
import li.cil.oc2.common.Constants;
import net.minecraft.server.MinecraftServer;
import net.minecraft.world.storage.FolderName;
import org.apache.logging.log4j.LogManager;
@@ -10,20 +8,13 @@ import org.apache.logging.log4j.Logger;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Set;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
/**
* This class facilitates storing binary chunks of data in an efficient, parallelized fashion.
@@ -34,80 +25,12 @@ public final class BlobStorage {
///////////////////////////////////////////////////////////////////
private static final FolderName BLOBS_FOLDER_NAME = new FolderName(API.MOD_ID + "-blobs");
private static final HashMultimap<UUID, Future<Void>> WRITE_HANDLES = HashMultimap.create();
private static final HashMultimap<UUID, Future<Void>> READ_HANDLES = HashMultimap.create();
private static final ExecutorService WORKERS = Executors.newCachedThreadPool(r -> {
final Thread thread = new Thread(r, "Blob Storage I/O");
thread.setDaemon(false);
return thread;
});
private static final Map<UUID, FileChannel> BLOBS = new HashMap<>();
private static Path dataDirectory; // Directory blobs get saved to.
static {
Runtime.getRuntime().addShutdownHook(new Thread(BlobStorage::synchronize));
}
///////////////////////////////////////////////////////////////////
/**
* Represents a handle to a serialization or deserialization job submitted using
* {@link #submitSave(UUID, InputStream)} or {@link #submitLoad(UUID, OutputStream)}.
* <p>
* This can be used to guarantee the serialization job has been completed before making changes to
* the to-be-serialized data/the deserialization job has been completed before accessing the to-be-
* deserialized data.
*/
public static final class JobHandle {
private final Set<Future<Void>> futures;
private JobHandle(final Future<Void> future) {
this.futures = new HashSet<>();
futures.add(future);
}
private JobHandle(final Set<Future<Void>> futures) {
this.futures = futures;
}
/**
* Blocks until the jobs described by this job handle are complete.
*/
public void await() {
for (final Future<Void> future : futures) {
try {
future.get();
} catch (final Throwable e) {
LOGGER.error("Scheduled blob storage job threw an exception.", e);
}
}
futures.clear();
}
/**
* Combines a list of job handles into a single job handle.
* <p>
* This can make awaiting multiple job handles more convenient by simply rolling them
* into one job handle.
* <p>
* For convenience the list of job handles may contain {@code null} entries.
*
* @param jobHandles the job handles to merge into a single job handle.
* @return a job handle that can be used to await all passed job handles.
*/
public static JobHandle combine(final JobHandle... jobHandles) {
final HashSet<Future<Void>> futures = new HashSet<>();
for (final JobHandle jobHandle : jobHandles) {
if (jobHandle == null) {
continue;
}
futures.addAll(jobHandle.futures);
}
return new JobHandle(futures);
}
}
/**
* Sets the currently running server.
* <p>
@@ -118,7 +41,6 @@ public final class BlobStorage {
* @param server the currently active server.
*/
public static void setServer(final MinecraftServer server) {
synchronize();
dataDirectory = server.getWorldPath(BLOBS_FOLDER_NAME);
try {
Files.createDirectories(dataDirectory);
@@ -128,19 +50,24 @@ public final class BlobStorage {
}
/**
* Blocks until all currently in-flight serialization or deserialization jobs are completed and
* deletes all blobs marked for deletion.
* Closes all currently open blobs.
*/
public static void synchronize() {
awaitCompletion(READ_HANDLES);
awaitCompletion(WRITE_HANDLES);
public static void close() {
for (final FileChannel blob : BLOBS.values()) {
try {
blob.close();
} catch (final IOException e) {
LOGGER.error(e);
}
}
BLOBS.clear();
}
/**
* Allocates a new handle for a blob to store.
* <p>
* Use this in a call to {@link #submitSave(UUID, InputStream)} to write data into blob storage.
* Then in a call to {@link #submitLoad(UUID, OutputStream)} to read from blob storage.
* Use this in a call to {@link #getOrOpen(UUID)} to open the blob storage.
*
* @return a new handle.
*/
@@ -148,27 +75,6 @@ public final class BlobStorage {
return UUID.randomUUID();
}
/**
* Frees a handle, marking the data associated with it for deletion.
* <p>
* Deletions are enacted upon the next call to {@link #synchronize()}, which is called when
* the server is shut down.
* <p>
* Validating this handle or submitting a serialization or deserialization for this handle
* will "revive" the handle and the data associated with it will <em>not</em> be deleted with
* the next call to {@link #synchronize()}.
*
* @param handle the handle to free.
*/
public static void freeHandle(@Nullable final UUID handle) {
if (handle != null) {
awaitCompletion(READ_HANDLES, handle);
awaitCompletion(WRITE_HANDLES, handle);
submitJob(WRITE_HANDLES, handle, () -> delete(handle));
}
}
/**
* Validates a blob handle, returning a new one if it is {@code null} or invalid.
*
@@ -184,93 +90,50 @@ public final class BlobStorage {
}
/**
* Submits data to be saved to blob storage, identified by the specified handle.
* Get or opens a file channel for the blob with the specified handle.
* <p>
* The given input stream should support batch reading operations for best performance.
* A buffer will be used for copying, so it is not necessary to pass a buffered stream.
* <p>
* The returned {@link JobHandle} should be waited on before any changes are made to
* the to-be-written data after this call returns.
* <p>
* <b>The given input stream will be accessed from a worker thread.</b>
* The returned file channel supports random access.
*
* @param handle the handle identifying the stored blob data.
* @param dataAccess a stream used to read the data to be written to blob storage.
* @return a job handle that can be used to wait for the data to be completely written.
* @param handle the handle to obtain the file channel for.
* @return the file channel for the requested blob.
* @throws IOException if opening the blob fails.
*/
public static JobHandle submitSave(final UUID handle, final InputStream dataAccess) {
// No other job must access a handle when it is being written to.
awaitCompletion(READ_HANDLES, handle);
awaitCompletion(WRITE_HANDLES, handle);
public static FileChannel getOrOpen(final UUID handle) throws IOException {
FileChannel blob = BLOBS.get(handle);
if (blob != null && blob.isOpen()) {
return blob;
}
return submitJob(WRITE_HANDLES, handle, () -> save(handle, dataAccess));
final Path path = dataDirectory.resolve(handle.toString());
blob = new RandomAccessFile(path.toFile(), "rw").getChannel();
BLOBS.put(handle, blob);
return blob;
}
/**
* Submits data to be saved to blob storage, identified by the specified handle.
* <p>
* The given input stream should support batch reading operations for best performance.
* A buffer will be used for copying, so it is not necessary to pass a buffered stream.
* <p>
* The returned {@link JobHandle} should be waited on before any changes are made to
* the to-be-written data after this call returns.
* <p>
* <b>The given input stream will be accessed from a worker thread.</b>
* Closes the blob with the specified handle.
*
* @param handle the handle identifying the stored blob data.
* @param dataAccess a stream used to read the data to be written to blob storage.
* @return a job handle that can be used to wait for the data to be completely written.
* @param handle the handle of the blob to close.
*/
public static JobHandle submitLoad(final UUID handle, final OutputStream dataAccess) {
// Multiple jobs may read from a handle at a time but none may write to it when reading from it.
awaitCompletion(WRITE_HANDLES, handle);
return submitJob(READ_HANDLES, handle, () -> load(handle, dataAccess));
}
///////////////////////////////////////////////////////////////////
private static void save(final UUID handle, final InputStream input) {
public static void close(final UUID handle) {
try {
final Path path = dataDirectory.resolve(handle.toString());
try (final OutputStream output = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {
final GZIPOutputStream zipped = new GZIPOutputStream(output);
copyData(input, zipped);
zipped.finish();
final FileChannel blob = BLOBS.remove(handle);
if (blob != null) {
blob.close();
}
} catch (final Throwable e) {
} catch (final IOException e) {
LOGGER.error(e);
} finally {
try {
input.close();
} catch (final Throwable e) {
LOGGER.error(e);
}
}
}
private static void load(final UUID handle, final OutputStream output) {
try {
final Path path = dataDirectory.resolve(handle.toString());
if (!Files.exists(path)) {
return;
}
try (final InputStream input = Files.newInputStream(path, StandardOpenOption.READ)) {
final GZIPInputStream zipped = new GZIPInputStream(input);
copyData(zipped, output);
}
} catch (final Throwable e) {
LOGGER.error(e);
} finally {
try {
output.close();
} catch (final Throwable e) {
LOGGER.error(e);
}
}
}
/**
* Deletes the blob with the specified handle.
*
* @param handle the handle of the blob to delete.
*/
public static void delete(final UUID handle) {
close(handle);
private static void delete(final UUID handle) {
try {
final Path path = dataDirectory.resolve(handle.toString());
Files.deleteIfExists(path);
@@ -278,64 +141,4 @@ public final class BlobStorage {
LOGGER.error(e);
}
}
private static void copyData(final InputStream in, final OutputStream out) throws IOException {
final byte[] buffer = new byte[8 * Constants.KILOBYTE];
int count;
while ((count = in.read(buffer)) > 0) {
out.write(buffer, 0, count);
}
}
private static JobHandle submitJob(final HashMultimap<UUID, Future<Void>> map, final UUID handle, final Runnable runnable) {
final CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, WORKERS);
map.put(handle, future);
return new JobHandle(future.thenAccept(unused -> completeJob(map, handle, future)));
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") // I know what I'm doing - famous last words.
private static void completeJob(final HashMultimap<UUID, Future<Void>> handles, final UUID handle, final Future<Void> future) {
synchronized (handles) {
handles.remove(handle, future);
}
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") // I know what I'm doing - famous last words.
private static void awaitCompletion(final HashMultimap<UUID, Future<Void>> handles, final UUID handle) {
// Jobs remove themselves from the list on completion, so we must synchronize access to
// the map of pending jobs, but must release the lock when waiting for the job so as not
// to create a deadlock.
final Set<Future<Void>> futures;
synchronized (handles) {
futures = handles.removeAll(handle);
}
for (final Future<Void> future : futures) {
await(future);
}
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") // I know what I'm doing - famous last words.
private static void awaitCompletion(final HashMultimap<UUID, Future<Void>> handles) {
// Jobs remove themselves from the list on completion, so we must synchronize access to
// the map of pending jobs, but must release the lock when waiting for the job so as not
// to create a deadlock.
final Set<Future<Void>> futures;
synchronized (handles) {
futures = new HashSet<>(handles.values());
handles.clear();
}
for (final Future<Void> future : futures) {
await(future);
}
}
private static void await(final Future<Void> future) {
try {
future.get();
} catch (final Throwable e) {
LOGGER.error("Scheduled blob storage job threw an exception.", e);
}
}
}

View File

@@ -1,8 +1,6 @@
package li.cil.oc2.common.tileentity;
import li.cil.oc2.api.bus.device.vm.VMDevice;
import li.cil.oc2.api.bus.device.vm.VMDeviceLoadResult;
import li.cil.oc2.api.bus.device.vm.context.VMContext;
import li.cil.oc2.common.Config;
import li.cil.oc2.common.Constants;
import li.cil.oc2.common.block.DiskDriveBlock;
@@ -12,6 +10,7 @@ import li.cil.oc2.common.container.TypedItemStackHandler;
import li.cil.oc2.common.item.FloppyItem;
import li.cil.oc2.common.network.Network;
import li.cil.oc2.common.network.message.DiskDriveFloppyMessage;
import li.cil.oc2.common.serialization.BlobStorage;
import li.cil.oc2.common.tags.ItemTags;
import li.cil.oc2.common.util.ItemStackUtils;
import li.cil.oc2.common.util.LocationSupplierUtils;
@@ -30,12 +29,12 @@ import net.minecraftforge.api.distmarker.OnlyIn;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.time.Duration;
import java.util.Optional;
public final class DiskDriveTileEntity extends AbstractTileEntity {
private static final Logger LOGGER = LogManager.getLogger();
@@ -149,6 +148,7 @@ public final class DiskDriveTileEntity extends AbstractTileEntity {
}
@Override
@Nonnull
public ItemStack getStackInSlot(final int slot) {
final ItemStack stack = getStackInSlotRaw(slot);
exportDeviceDataToItemStack(stack);
@@ -156,6 +156,7 @@ public final class DiskDriveTileEntity extends AbstractTileEntity {
}
@Override
@Nonnull
public ItemStack extractItem(final int slot, final int amount, final boolean simulate) {
if (slot == 0 && !simulate && amount > 0) {
exportDeviceDataToItemStack(getStackInSlotRaw(0));
@@ -197,8 +198,6 @@ public final class DiskDriveTileEntity extends AbstractTileEntity {
return;
}
device.serializeData();
final CompoundNBT tag = new CompoundNBT();
device.exportToItemStack(tag);
ItemStackUtils.getOrCreateModDataTag(stack).put(DATA_TAG_NAME, tag);
@@ -206,46 +205,48 @@ public final class DiskDriveTileEntity extends AbstractTileEntity {
}
private final class DiskDriveVMDevice extends AbstractBlockDeviceVMDevice<BlockDevice, TileEntity> {
private VMContext context;
public DiskDriveVMDevice() {
super(DiskDriveTileEntity.this);
}
public void updateBlockDevice(final CompoundNBT tag) {
blobHandle = null;
if (device == null) {
return;
}
if (blobHandle != null) {
BlobStorage.close(blobHandle);
blobHandle = null;
}
importFromItemStack(tag);
if (data != null) {
data = createBlockDevice();
deserializeData();
assert device != null;
try {
device.setBlock(data);
} catch (final IOException e) {
LOGGER.error(e);
}
try {
device.setBlock(createBlockDevice());
} catch (final IOException e) {
LOGGER.error(e);
}
}
public void removeBlockDevice() {
updateBlockDevice(new CompoundNBT());
if (device == null) {
return;
}
if (blobHandle != null) {
BlobStorage.close(blobHandle);
blobHandle = null;
}
try {
device.setBlock(EMPTY_BLOCK_DEVICE);
} catch (final IOException e) {
LOGGER.error(e);
}
}
@Override
public VMDeviceLoadResult load(final VMContext context) {
this.context = context;
return super.load(context);
}
@Override
protected int getSize() {
return Config.maxFloppySize;
}
@Override
protected BlockDevice createBlockDevice() {
protected BlockDevice createBlockDevice() throws IOException {
final ItemStack stack = itemHandler.getStackInSlotRaw(0);
if (stack.isEmpty() || !(stack.getItem() instanceof FloppyItem)) {
return EMPTY_BLOCK_DEVICE;
@@ -257,29 +258,10 @@ public final class DiskDriveTileEntity extends AbstractTileEntity {
return EMPTY_BLOCK_DEVICE;
}
return ByteBufferBlockDevice.create(capacity, false);
}
@Override
protected Optional<InputStream> getSerializationStream(final BlockDevice device) {
if (context != null) {
context.joinWorkerThread();
}
if (device.isReadonly()) {
return Optional.empty();
} else {
return Optional.of(device.getInputStream());
}
}
@Override
protected OutputStream getDeserializationStream(final BlockDevice device) {
if (context != null) {
context.joinWorkerThread();
}
return device.getOutputStream();
blobHandle = BlobStorage.validateHandle(blobHandle);
final FileChannel channel = BlobStorage.getOrOpen(blobHandle);
final MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, capacity);
return ByteBufferBlockDevice.wrap(buffer, false);
}
@Override