聊聊flink的MemoryPool

栏目: 编程工具 · 发布时间: 6年前

内容简介:本文主要研究一下flink的MemoryPoolflink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.javaflink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java

本文主要研究一下flink的MemoryPool

MemoryPool

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java

abstract static class MemoryPool {

        abstract int getNumberOfAvailableMemorySegments();

        abstract MemorySegment allocateNewSegment(Object owner);

        abstract MemorySegment requestSegmentFromPool(Object owner);

        abstract void returnSegmentToPool(MemorySegment segment);

        abstract void clear();
    }
  • MemoryPool定义了getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear这几个抽象方法;它有HybridHeapMemoryPool、HybridOffHeapMemoryPool这两个子类

HybridHeapMemoryPool

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java

static final class HybridHeapMemoryPool extends MemoryPool {

        /** The collection of available memory segments. */
        private final ArrayDeque<byte[]> availableMemory;

        private final int segmentSize;

        HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(new byte[segmentSize]);
            }
        }

        @Override
        MemorySegment allocateNewSegment(Object owner) {
            return MemorySegmentFactory.allocateUnpooledSegment(segmentSize, owner);
        }

        @Override
        MemorySegment requestSegmentFromPool(Object owner) {
            byte[] buf = availableMemory.remove();
            return  MemorySegmentFactory.wrapPooledHeapMemory(buf, owner);
        }

        @Override
        void returnSegmentToPool(MemorySegment segment) {
            if (segment.getClass() == HybridMemorySegment.class) {
                HybridMemorySegment heapSegment = (HybridMemorySegment) segment;
                availableMemory.add(heapSegment.getArray());
                heapSegment.free();
            }
            else {
                throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
            }
        }

        @Override
        protected int getNumberOfAvailableMemorySegments() {
            return availableMemory.size();
        }

        @Override
        void clear() {
            availableMemory.clear();
        }
    }
  • HybridHeapMemoryPool继承了MemoryPool,它使用的是jvm的heap内存;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为byte[]
  • allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledSegment,用于分配unpooled memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意
  • returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的byte[]归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()

HybridOffHeapMemoryPool

flink-runtime_2.11-1.7.2-sources.jar!/org/apache/flink/runtime/memory/MemoryManager.java

static final class HybridOffHeapMemoryPool extends MemoryPool {

        /** The collection of available memory segments. */
        private final ArrayDeque<ByteBuffer> availableMemory;

        private final int segmentSize;

        HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;

            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
            }
        }

        @Override
        MemorySegment allocateNewSegment(Object owner) {
            return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner);
        }

        @Override
        MemorySegment requestSegmentFromPool(Object owner) {
            ByteBuffer buf = availableMemory.remove();
            return MemorySegmentFactory.wrapPooledOffHeapMemory(buf, owner);
        }

        @Override
        void returnSegmentToPool(MemorySegment segment) {
            if (segment.getClass() == HybridMemorySegment.class) {
                HybridMemorySegment hybridSegment = (HybridMemorySegment) segment;
                ByteBuffer buf = hybridSegment.getOffHeapBuffer();
                availableMemory.add(buf);
                hybridSegment.free();
            }
            else {
                throw new IllegalArgumentException("Memory segment is not a " + HybridMemorySegment.class.getSimpleName());
            }
        }

        @Override
        protected int getNumberOfAvailableMemorySegments() {
            return availableMemory.size();
        }

        @Override
        void clear() {
            availableMemory.clear();
        }
    }
  • HybridOffHeapMemoryPool继承了MemoryPool,它使用的是OffHeap;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为ByteBuffer
  • allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledOffHeapMemory,用于分配unpooled off-heap memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledOffHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意
  • returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的ByteBuffer归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()

小结

  • MemoryPool定义了getNumberOfAvailableMemorySegments、allocateNewSegment、requestSegmentFromPool、returnSegmentToPool、clear这几个抽象方法;它有HybridHeapMemoryPool、HybridOffHeapMemoryPool这两个子类
  • HybridHeapMemoryPool继承了MemoryPool,它使用的是jvm的heap内存;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为byte[];allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledSegment,用于分配unpooled memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意;returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的byte[]归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()
  • HybridOffHeapMemoryPool继承了MemoryPool,它使用的是OffHeap;构造器接收numInitialSegments、segmentSize两个参数用于初始化availableMemory这个ArrayDeque,该queue的元素类型为ByteBuffer;allocateNewSegment方法调用的是MemorySegmentFactory.allocateUnpooledOffHeapMemory,用于分配unpooled off-heap memory;requestSegmentFromPool方法调用的是availableMemory.remove(),然后调用MemorySegmentFactory.wrapPooledOffHeapMemory包装为MemorySegment,这个方法没有判断ArrayDeque的大小就直接remove,需要注意;returnSegmentToPool方法只对HybridMemorySegment类型进行处理,首先将它的ByteBuffer归还到availableMemory,之后调用heapSegment.free()释放;getNumberOfAvailableMemorySegments方法返回的是availableMemory.size();clear方法调用的是availableMemory.clear()

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

社交的本质:扎克伯格的商业秘密

社交的本质:扎克伯格的商业秘密

兰迪•扎克伯格 / 谢天 / 中信出版集团股份有限公司 / 2016-6-1 / CNY 45.00

从发表个人观点到找工作,从交朋友到找伴侣,社会化媒体的广泛应用、互联技术的高速发展已经改变了我们生活的各个领域。 Facebook早期成员之一,兰迪·扎克伯格阐述了社交的本质,并首次披露Facebook的商业策略。她以社交媒体实践者的视角,分享了自己在Facebook负责营销的从业经历与成长故事,以及对互联网和社会未来变化趋势的思考,并给组织和个人提出了解决方案。一起来看看 《社交的本质:扎克伯格的商业秘密》 这本书的介绍吧!

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具