spark structured streaming的source解析与自定义

下面是一段创建structured streaming的Dataset的代码:

val lines = spark.readStream.format("socket")
    .option("host", "localhost").option("port", 9999).load();

会创建一个socket类型的Source,该name2class的映射由DataSource.lookupDataSource()完成

val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
...
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList
...

应该是从当前类路径中查找所有的DataSourceRegister,并读取它的shortName(),如果是"socket"就确定了由该DataSourceRegister来创建对应的DataSource

果然,有一个TextSocketSourceProvider

class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
...
override def shortName(): String = "socket"

  override def createSource(
      sqlContext: SQLContext,
      metadataPath: String,
      schema: Option[StructType],
      providerName: String,
      parameters: Map[String, String]): Source = {
    val host = parameters("host")
    val port = parameters("port").toInt
    new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext)
  }
}

TextSocketSourceProvider的createSource创建一个TextSocketSource

TextSocketSource是一个Source,Source接口如下:

trait Source  {
  def schema: StructType
  def getOffset: Option[Offset]
  def getBatch(start: Option[Offset], end: Offset): DataFrame
  def commit(end: Offset) : Unit = {}
  def stop(): Unit
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容