聊聊flink的TableFactory

栏目: 编程工具 · 发布时间: 7年前

内容简介:本文主要研究一下flink的TableFactoryflink-table-common-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactory.javaflink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSourceFactory.scala

本文主要研究一下flink的TableFactory

实例

class MySystemTableSourceFactory implements StreamTableSourceFactory<Row> {

  @Override
  public Map<String, String> requiredContext() {
    Map<String, String> context = new HashMap<>();
    context.put("update-mode", "append");
    context.put("connector.type", "my-system");
    return context;
  }

  @Override
  public List<String> supportedProperties() {
    List<String> list = new ArrayList<>();
    list.add("connector.debug");
    return list;
  }

  @Override
  public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
    boolean isDebug = Boolean.valueOf(properties.get("connector.debug"));

    # additional validation of the passed properties can also happen here

    return new MySystemAppendTableSource(isDebug);
  }
}

public class MySystemConnector extends ConnectorDescriptor {

  public final boolean isDebug;

  public MySystemConnector(boolean isDebug) {
    super("my-system", 1, false);
    this.isDebug = isDebug;
  }

  @Override
  protected Map<String, String> toConnectorProperties() {
    Map<String, String> properties = new HashMap<>();
    properties.put("connector.debug", Boolean.toString(isDebug));
    return properties;
  }
}
  • 本实例定义了MySystemTableSourceFactory,它的requiredContext为update-mode=append及connector.type=my-system,它的supportedProperties为connector.debug,它的createStreamTableSource方法创建的是MySystemAppendTableSource;MySystemConnector继承了ConnectorDescriptor,它定义了connector.type值为my-system,connector.property-version值为1,formatNeeded属性为false,其toConnectorProperties定义了connector.debug的值

TableFactory

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactory.java

@PublicEvolving
public interface TableFactory {

    Map<String, String> requiredContext();

    List<String> supportedProperties();
}
  • TableFactory定义了requiredContext及supportedProperties两个方法,其中requiredContext用于进行factory的匹配,supportedProperties用于指定factory支持的属性,如果传入了factory不支持的属性则会抛出异常

BatchTableSourceFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSourceFactory.scala

trait BatchTableSourceFactory[T] extends TableFactory {

  def createBatchTableSource(properties: util.Map[String, String]): BatchTableSource[T]
}
  • BatchTableSourceFactory继承了TableFactory,定义了createBatchTableSource方法

BatchTableSinkFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/BatchTableSinkFactory.scala

trait BatchTableSinkFactory[T] extends TableFactory {

  def createBatchTableSink(properties: util.Map[String, String]): BatchTableSink[T]
}
  • BatchTableSinkFactory继承了TableFactory,定义了createBatchTableSink方法

StreamTableSourceFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSourceFactory.scala

trait StreamTableSourceFactory[T] extends TableFactory {

  def createStreamTableSource(properties: util.Map[String, String]): StreamTableSource[T]
}
  • StreamTableSourceFactory继承了TableFactory,定义了createStreamTableSource方法

StreamTableSinkFactory

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/StreamTableSinkFactory.scala

trait StreamTableSinkFactory[T] extends TableFactory {

  def createStreamTableSink(properties: util.Map[String, String]): StreamTableSink[T]
}
  • StreamTableSinkFactory继承了TableFactory,定义了createStreamTableSink方法

ConnectorDescriptor

flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectorDescriptor.java

@PublicEvolving
public abstract class ConnectorDescriptor extends DescriptorBase implements Descriptor {

    private String type;

    private int version;

    private boolean formatNeeded;

    /**
     * Constructs a {@link ConnectorDescriptor}.
     *
     * @param type string that identifies this connector
     * @param version property version for backwards compatibility
     * @param formatNeeded flag for basic validation of a needed format descriptor
     */
    public ConnectorDescriptor(String type, int version, boolean formatNeeded) {
        this.type = type;
        this.version = version;
        this.formatNeeded = formatNeeded;
    }

    @Override
    public final Map<String, String> toProperties() {
        final DescriptorProperties properties = new DescriptorProperties();
        properties.putString(CONNECTOR_TYPE, type);
        properties.putLong(CONNECTOR_PROPERTY_VERSION, version);
        properties.putProperties(toConnectorProperties());
        return properties.asMap();
    }

    /**
     * Returns if this connector requires a format descriptor.
     */
    protected final boolean isFormatNeeded() {
        return formatNeeded;
    }

    /**
     * Converts this descriptor into a set of connector properties. Usually prefixed with
     * {@link FormatDescriptorValidator#FORMAT}.
     */
    protected abstract Map<String, String> toConnectorProperties();
}
  • ConnectorDescriptor继承了DescriptorBase,实现了Descriptor接口,它重写了Descriptor接口的toProperties方法,定义内置属性CONNECTOR_TYPE及CONNECTOR_PROPERTY_VERSION,之后还通过抽象方法toConnectorProperties来合并子类定义的属性

TableFactoryUtil

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryUtil.scala

object TableFactoryUtil {

  /**
    * Returns a table source for a table environment.
    */
  def findAndCreateTableSource[T](
      tableEnvironment: TableEnvironment,
      descriptor: Descriptor)
    : TableSource[T] = {

    val javaMap = descriptor.toProperties

    tableEnvironment match {
      case _: BatchTableEnvironment =>
        TableFactoryService
          .find(classOf[BatchTableSourceFactory[T]], javaMap)
          .createBatchTableSource(javaMap)

      case _: StreamTableEnvironment =>
        TableFactoryService
          .find(classOf[StreamTableSourceFactory[T]], javaMap)
          .createStreamTableSource(javaMap)

      case e@_ =>
        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
    }
  }

  /**
    * Returns a table sink for a table environment.
    */
  def findAndCreateTableSink[T](
      tableEnvironment: TableEnvironment,
      descriptor: Descriptor)
    : TableSink[T] = {

    val javaMap = descriptor.toProperties

    tableEnvironment match {
      case _: BatchTableEnvironment =>
        TableFactoryService
          .find(classOf[BatchTableSinkFactory[T]], javaMap)
          .createBatchTableSink(javaMap)

      case _: StreamTableEnvironment =>
        TableFactoryService
          .find(classOf[StreamTableSinkFactory[T]], javaMap)
          .createStreamTableSink(javaMap)

      case e@_ =>
        throw new TableException(s"Unsupported table environment: ${e.getClass.getName}")
    }
  }
}
  • TableFactoryUtil是个 工具 类,主要用于根据指定的TableEnvironment及Descriptor来创建TableSource或TableSink;它内部利用TableFactoryService,使用descriptor.toProperties来寻找对应的TableFactory

TableFactoryService

flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/factories/TableFactoryService.scala

object TableFactoryService extends Logging {

  private lazy val defaultLoader = ServiceLoader.load(classOf[TableFactory])

  /**
    * Finds a table factory of the given class and descriptor.
    *
    * @param factoryClass desired factory class
    * @param descriptor descriptor describing the factory configuration
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], descriptor: Descriptor): T = {
    Preconditions.checkNotNull(descriptor)

    findInternal(factoryClass, descriptor.toProperties, None)
  }

  /**
    * Finds a table factory of the given class, descriptor, and classloader.
    *
    * @param factoryClass desired factory class
    * @param descriptor descriptor describing the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], descriptor: Descriptor, classLoader: ClassLoader): T = {
    Preconditions.checkNotNull(descriptor)
    Preconditions.checkNotNull(classLoader)

    findInternal(factoryClass, descriptor.toProperties, Some(classLoader))
  }

  /**
    * Finds a table factory of the given class and property map.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](factoryClass: Class[T], propertyMap: JMap[String, String]): T = {
    findInternal(factoryClass, propertyMap, None)
  }

  /**
    * Finds a table factory of the given class, property map, and classloader.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  def find[T](
      factoryClass: Class[T],
      propertyMap: JMap[String, String],
      classLoader: ClassLoader)
    : T = {
    Preconditions.checkNotNull(classLoader)

    findInternal(factoryClass, propertyMap, Some(classLoader))
  }

  /**
    * Finds a table factory of the given class, property map, and classloader.
    *
    * @param factoryClass desired factory class
    * @param propertyMap properties that describe the factory configuration
    * @param classLoader classloader for service loading
    * @tparam T factory class type
    * @return the matching factory
    */
  private def findInternal[T](
      factoryClass: Class[T],
      propertyMap: JMap[String, String],
      classLoader: Option[ClassLoader])
    : T = {

    Preconditions.checkNotNull(factoryClass)
    Preconditions.checkNotNull(propertyMap)

    val properties = propertyMap.asScala.toMap

    val foundFactories = discoverFactories(classLoader)

    val classFactories = filterByFactoryClass(
      factoryClass,
      properties,
      foundFactories)

    val contextFactories = filterByContext(
      factoryClass,
      properties,
      foundFactories,
      classFactories)

    filterBySupportedProperties(
      factoryClass,
      properties,
      foundFactories,
      contextFactories)
  }

  /**
    * Searches for factories using Java service providers.
    *
    * @return all factories in the classpath
    */
  private def discoverFactories[T](classLoader: Option[ClassLoader]): Seq[TableFactory] = {
    try {
      val iterator = classLoader match {
        case Some(customClassLoader) =>
          val customLoader = ServiceLoader.load(classOf[TableFactory], customClassLoader)
          customLoader.iterator()
        case None =>
          defaultLoader.iterator()
      }
      iterator.asScala.toSeq
    } catch {
      case e: ServiceConfigurationError =>
        LOG.error("Could not load service provider for table factories.", e)
        throw new TableException("Could not load service provider for table factories.", e)
    }
  }

  /**
    * Filters factories with matching context by factory class.
    */
  private def filterByFactoryClass[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val classFactories = foundFactories.filter(f => factoryClass.isAssignableFrom(f.getClass))
    if (classFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        s"No factory implements '${factoryClass.getCanonicalName}'.",
        factoryClass,
        foundFactories,
        properties)
    }
    classFactories
  }

  /**
    * Filters for factories with matching context.
    *
    * @return all matching factories
    */
  private def filterByContext[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory],
      classFactories: Seq[TableFactory])
    : Seq[TableFactory] = {

    val matchingFactories = classFactories.filter { factory =>
      val requestedContext = normalizeContext(factory)

      val plainContext = mutable.Map[String, String]()
      plainContext ++= requestedContext
      // we remove the version for now until we have the first backwards compatibility case
      // with the version we can provide mappings in case the format changes
      plainContext.remove(CONNECTOR_PROPERTY_VERSION)
      plainContext.remove(FORMAT_PROPERTY_VERSION)
      plainContext.remove(METADATA_PROPERTY_VERSION)
      plainContext.remove(STATISTICS_PROPERTY_VERSION)

      // check if required context is met
      plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2)
    }

    if (matchingFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        "No context matches.",
        factoryClass,
        foundFactories,
        properties)
    }

    matchingFactories
  }

  /**
    * Prepares the properties of a context to be used for match operations.
    */
  private def normalizeContext(factory: TableFactory): Map[String, String] = {
    val requiredContextJava = factory.requiredContext()
    if (requiredContextJava == null) {
      throw new TableException(
        s"Required context of factory '${factory.getClass.getName}' must not be null.")
    }
    requiredContextJava.asScala.map(e => (e._1.toLowerCase, e._2)).toMap
  }

  /**
    * Filters the matching class factories by supported properties.
    */
  private def filterBySupportedProperties[T](
      factoryClass: Class[T],
      properties: Map[String, String],
      foundFactories: Seq[TableFactory],
      classFactories: Seq[TableFactory])
    : T = {

    val plainGivenKeys = mutable.ArrayBuffer[String]()
    properties.keys.foreach { k =>
      // replace arrays with wildcard
      val key = k.replaceAll(".\\d+", ".#")
      // ignore duplicates
      if (!plainGivenKeys.contains(key)) {
        plainGivenKeys += key
      }
    }
    var lastKey: Option[String] = None
    val supportedFactories = classFactories.filter { factory =>
      val requiredContextKeys = normalizeContext(factory).keySet
      val (supportedKeys, wildcards) = normalizeSupportedProperties(factory)
      // ignore context keys
      val givenContextFreeKeys = plainGivenKeys.filter(!requiredContextKeys.contains(_))
      // perform factory specific filtering of keys
      val givenFilteredKeys = filterSupportedPropertiesFactorySpecific(
        factory,
        givenContextFreeKeys)

      givenFilteredKeys.forall { k =>
        lastKey = Option(k)
        supportedKeys.contains(k) || wildcards.exists(k.startsWith)
      }
    }

    if (supportedFactories.isEmpty && classFactories.length == 1 && lastKey.isDefined) {
      // special case: when there is only one matching factory but the last property key
      // was incorrect
      val factory = classFactories.head
      val (supportedKeys, _) = normalizeSupportedProperties(factory)
      throw new NoMatchingTableFactoryException(
        s"""
          |The matching factory '${factory.getClass.getName}' doesn't support '${lastKey.get}'.
          |
          |Supported properties of this factory are:
          |${supportedKeys.sorted.mkString("\n")}""".stripMargin,
        factoryClass,
        foundFactories,
        properties)
    } else if (supportedFactories.isEmpty) {
      throw new NoMatchingTableFactoryException(
        s"No factory supports all properties.",
        factoryClass,
        foundFactories,
        properties)
    } else if (supportedFactories.length > 1) {
      throw new AmbiguousTableFactoryException(
        supportedFactories,
        factoryClass,
        foundFactories,
        properties)
    }

    supportedFactories.head.asInstanceOf[T]
  }

  /**
    * Prepares the supported properties of a factory to be used for match operations.
    */
  private def normalizeSupportedProperties(factory: TableFactory): (Seq[String], Seq[String]) = {
    val supportedPropertiesJava = factory.supportedProperties()
    if (supportedPropertiesJava == null) {
      throw new TableException(
        s"Supported properties of factory '${factory.getClass.getName}' must not be null.")
    }
    val supportedKeys = supportedPropertiesJava.asScala.map(_.toLowerCase)

    // extract wildcard prefixes
    val wildcards = extractWildcardPrefixes(supportedKeys)

    (supportedKeys, wildcards)
  }

  /**
    * Converts the prefix of properties with wildcards (e.g., "format.*").
    */
  private def extractWildcardPrefixes(propertyKeys: Seq[String]): Seq[String] = {
    propertyKeys
      .filter(_.endsWith("*"))
      .map(s => s.substring(0, s.length - 1))
  }

  /**
    * Performs filtering for special cases (i.e. table format factories with schema derivation).
    */
  private def filterSupportedPropertiesFactorySpecific(
      factory: TableFactory,
      keys: Seq[String])
    : Seq[String] = factory match {

    case formatFactory: TableFormatFactory[_] =>
      val includeSchema = formatFactory.supportsSchemaDerivation()
      // ignore non-format (or schema) keys
      keys.filter { k =>
        if (includeSchema) {
          k.startsWith(SchemaValidator.SCHEMA + ".") ||
            k.startsWith(FormatDescriptorValidator.FORMAT + ".")
        } else {
          k.startsWith(FormatDescriptorValidator.FORMAT + ".")
        }
      }

    case _ =>
      keys
  }
}
  • TableFactoryService主要用于根据factoryClass及Descriptor( 或者descriptor.toProperties )来查找匹配的TableFactory,其主要的匹配逻辑在于filterByFactoryClass、filterByContext、filterBySupportedProperties这几个方法中;filterByFactoryClass方法根据指定的factoryClass查找classFactories;filterByContext方法根据descriptor.toProperties来进一步过滤classFactories得到contextFactories;filterBySupportedProperties方法则根据supportedProperties进一步过滤contextFactories

小结

或者descriptor.toProperties

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Mission Python

Mission Python

Sean McManus / No Starch Press / 2018-9-18 / GBP 24.99

Launch into coding with Mission Python, a space-themed guide to building a complete computer game in Python. You'll learn programming fundamentals like loops, strings, and lists as you build Escape!, ......一起来看看 《Mission Python》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

html转js在线工具
html转js在线工具

html转js在线工具