聊聊flink的BlobStoreService

栏目: 编程工具 · 发布时间: 7年前

内容简介:本文主要研究一下flink的BlobStoreServiceflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java

本文主要研究一下flink的BlobStoreService

BlobView

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java

public interface BlobView {

    /**
     * Copies a blob to a local file.
     *
     * @param jobId     ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
     * @param blobKey   The blob ID
     * @param localFile The local file to copy to
     *
     * @return whether the file was copied (<tt>true</tt>) or not (<tt>false</tt>)
     * @throws IOException If the copy fails
     */
    boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException;
}
  • BlobView定义了get方法,将指定的blob拷贝到localFile

BlobStore

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java

public interface BlobStore extends BlobView {

    /**
     * Copies the local file to the blob store.
     *
     * @param localFile The file to copy
     * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
     * @param blobKey   The ID for the file in the blob store
     *
     * @return whether the file was copied (<tt>true</tt>) or not (<tt>false</tt>)
     * @throws IOException If the copy fails
     */
    boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException;

    /**
     * Tries to delete a blob from storage.
     *
     * <p>NOTE: This also tries to delete any created directories if empty.</p>
     *
     * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated)
     * @param blobKey The blob ID
     *
     * @return  <tt>true</tt> if the given blob is successfully deleted or non-existing;
     *          <tt>false</tt> otherwise
     */
    boolean delete(JobID jobId, BlobKey blobKey);

    /**
     * Tries to delete all blobs for the given job from storage.
     *
     * <p>NOTE: This also tries to delete any created directories if empty.</p>
     *
     * @param jobId The JobID part of all blobs to delete
     *
     * @return  <tt>true</tt> if the job directory is successfully deleted or non-existing;
     *          <tt>false</tt> otherwise
     */
    boolean deleteAll(JobID jobId);
}
  • BlobStore继承了BlobView,它定义了put、delete、deleteAll方法

BlobStoreService

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java

public interface BlobStoreService extends BlobStore, Closeable {

    /**
     * Closes and cleans up the store. This entails the deletion of all blobs.
     */
    void closeAndCleanupAllData();
}
  • BlobStoreService继承了BlobStore及Closeable接口,它定义了closeAndCleanupAllData方法;它有两个实现类,分别是VoidBlobStore、FileSystemBlobStore

VoidBlobStore

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java

public class VoidBlobStore implements BlobStoreService {

    @Override
    public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
        return false;
    }

    @Override
    public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
        return false;
    }

    @Override
    public boolean delete(JobID jobId, BlobKey blobKey) {
        return true;
    }

    @Override
    public boolean deleteAll(JobID jobId) {
        return true;
    }

    @Override
    public void closeAndCleanupAllData() {}

    @Override
    public void close() throws IOException {}
}
  • VoidBlobStore实现了BlobStoreService接口,它执行空操作

FileSystemBlobStore

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java

public class FileSystemBlobStore implements BlobStoreService {

    private static final Logger LOG = LoggerFactory.getLogger(FileSystemBlobStore.class);

    /** The file system in which blobs are stored. */
    private final FileSystem fileSystem;

    /** The base path of the blob store. */
    private final String basePath;

    public FileSystemBlobStore(FileSystem fileSystem, String storagePath) throws IOException {
        this.fileSystem = checkNotNull(fileSystem);
        this.basePath = checkNotNull(storagePath) + "/blob";

        LOG.info("Creating highly available BLOB storage directory at {}", basePath);

        fileSystem.mkdirs(new Path(basePath));
        LOG.debug("Created highly available BLOB storage directory at {}", basePath);
    }

    // - Put ------------------------------------------------------------------

    @Override
    public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws IOException {
        return put(localFile, BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
    }

    private boolean put(File fromFile, String toBlobPath) throws IOException {
        try (OutputStream os = fileSystem.create(new Path(toBlobPath), FileSystem.WriteMode.OVERWRITE)) {
            LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
            Files.copy(fromFile, os);
        }
        return true;
    }

    // - Get ------------------------------------------------------------------

    @Override
    public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOException {
        return get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey), localFile, blobKey);
    }

    private boolean get(String fromBlobPath, File toFile, BlobKey blobKey) throws IOException {
        checkNotNull(fromBlobPath, "Blob path");
        checkNotNull(toFile, "File");
        checkNotNull(blobKey, "Blob key");

        if (!toFile.exists() && !toFile.createNewFile()) {
            throw new IOException("Failed to create target file to copy to");
        }

        final Path fromPath = new Path(fromBlobPath);
        MessageDigest md = BlobUtils.createMessageDigest();

        final int buffSize = 4096; // like IOUtils#BLOCKSIZE, for chunked file copying

        boolean success = false;
        try (InputStream is = fileSystem.open(fromPath);
            FileOutputStream fos = new FileOutputStream(toFile)) {
            LOG.debug("Copying from {} to {}.", fromBlobPath, toFile);

            // not using IOUtils.copyBytes(is, fos) here to be able to create a hash on-the-fly
            final byte[] buf = new byte[buffSize];
            int bytesRead = is.read(buf);
            while (bytesRead >= 0) {
                fos.write(buf, 0, bytesRead);
                md.update(buf, 0, bytesRead);

                bytesRead = is.read(buf);
            }

            // verify that file contents are correct
            final byte[] computedKey = md.digest();
            if (!Arrays.equals(computedKey, blobKey.getHash())) {
                throw new IOException("Detected data corruption during transfer");
            }

            success = true;
        } finally {
            // if the copy fails, we need to remove the target file because
            // outside code relies on a correct file as long as it exists
            if (!success) {
                try {
                    toFile.delete();
                } catch (Throwable ignored) {}
            }
        }

        return true; // success is always true here
    }

    // - Delete ---------------------------------------------------------------

    @Override
    public boolean delete(JobID jobId, BlobKey blobKey) {
        return delete(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
    }

    @Override
    public boolean deleteAll(JobID jobId) {
        return delete(BlobUtils.getStorageLocationPath(basePath, jobId));
    }

    private boolean delete(String blobPath) {
        try {
            LOG.debug("Deleting {}.", blobPath);

            Path path = new Path(blobPath);

            boolean result = fileSystem.delete(path, true);

            // send a call to delete the directory containing the file. This will
            // fail (and be ignored) when some files still exist.
            try {
                fileSystem.delete(path.getParent(), false);
                fileSystem.delete(new Path(basePath), false);
            } catch (IOException ignored) {}
            return result;
        }
        catch (Exception e) {
            LOG.warn("Failed to delete blob at " + blobPath);
            return false;
        }
    }

    @Override
    public void closeAndCleanupAllData() {
        try {
            LOG.debug("Cleaning up {}.", basePath);

            fileSystem.delete(new Path(basePath), true);
        }
        catch (Exception e) {
            LOG.error("Failed to clean up recovery directory.", e);
        }
    }

    @Override
    public void close() throws IOException {
        // nothing to do for the FileSystemBlobStore
    }
}
  • FileSystemBlobStore实现了BlobStoreService,它的构造器要求传入fileSystem及storagePath;put方法通过fileSystem.create来创建目标OutputStream,然后通过Files.copy把localFile拷贝到toBlobPath;get方法通过fileSystem.open打开要读取的blob,然后写入到localFile;delete及deleteAll方法通过BlobUtils.getStorageLocationPath获取blobPath,然后调用fileSystem.delete来删除;closeAndCleanupAllData方法直接调用fileSystem.delete来递归删除整个storagePath

小结

  • BlobView定义了get方法,将指定的blob拷贝到localFile;BlobStore继承了BlobView,它定义了put、delete、deleteAll方法
  • BlobStoreService继承了BlobStore及Closeable接口,它定义了closeAndCleanupAllData方法;它有两个实现类,分别是VoidBlobStore、FileSystemBlobStore
  • VoidBlobStore实现了BlobStoreService接口,它执行空操作;FileSystemBlobStore实现了BlobStoreService,它的构造器要求传入fileSystem及storagePath;put方法通过fileSystem.create来创建目标OutputStream,然后通过Files.copy把localFile拷贝到toBlobPath;get方法通过fileSystem.open打开要读取的blob,然后写入到localFile;delete及deleteAll方法通过BlobUtils.getStorageLocationPath获取blobPath,然后调用fileSystem.delete来删除;closeAndCleanupAllData方法直接调用fileSystem.delete来递归删除整个storagePath

doc


以上所述就是小编给大家介绍的《聊聊flink的BlobStoreService》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Web Application Hacker's Handbook

The Web Application Hacker's Handbook

Dafydd Stuttard、Marcus Pinto / Wiley / 2011-9-27 / USD 50.00

The highly successful security book returns with a new edition, completely updated Web applications are the front door to most organizations, exposing them to attacks that may disclose personal infor......一起来看看 《The Web Application Hacker's Handbook》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具