2020-10-17-Flink-5(Sink)

1.Sink-文件

注意输出的地址不要是文件...

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._

case class Counter_reset(ID: Int, stamp: Long, once: Double)
object SinkTest {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    environment.setParallelism(1)
   // environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    var path = "src/main/resources/trans1"
    val filterText = environment.readTextFile(path)
    val value = filterText
      .map(data => {
        val strings = data.split("\\s+")
        Counter_reset(strings(0).toInt, strings(1).toLong, strings(2).toDouble)
      })

    //sink
    value.addSink(
       StreamingFileSink.forRowFormat(
           new Path("src/main/resources/"),
           new SimpleStringEncoder[Counter_reset]()
       ).build()
    )
    environment.execute("myjob")
  }
}
2.sink(kafka)
3.
4.sink(Redis)
5.sink(mysql)
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink._
import org.apache.flink.streaming.api.scala._

case class Counter_reset1(ID: Int, stamp: Long, once: Double)

object SinkMysql {
  def main(args: Array[String]): Unit = {
    var env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setNumberOfExecutionRetries(1)
    var path = "src/main/resources/trans1"
    var value = env.readTextFile(path)
    var result = value.map(txt => {
      var data = txt.split("\\s+")
      Counter_reset1(data(0).toInt, data(1).toLong, data(2).toDouble)
    }
    )
    //这里addSink红色 原因Counter_reset1.....我写成了Counter_reset
     result.addSink(new MysqlSinkFuntion)

    env.execute("mysqlsink")
  }
}

class MysqlSinkFuntion() extends RichSinkFunction[Counter_reset1] {
  Class.forName("com.mysql.jdbc.Driver")
  //先定义连接
  var connection: Connection = _
  //更新和查询操作
  var insert: PreparedStatement = _
  var update: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    connection = DriverManager.getConnection("JDBC:mysql://localhost:3306/test", "root", "123456");
    insert = connection.prepareStatement("insert into db_demo1 values (?,?,?)")
    update = connection.prepareStatement("update db_demo1 set ID = ? where once = ? ")

  }

  override def invoke(value: Counter_reset1): Unit = {
    //更新操作
    update.setInt(1, value.ID)
    update.setDouble(2, value.once)
    update.execute()
    if (update.getUpdateCount == 0) {
      insert.setInt(1, value.ID)
      insert.setLong(2, value.stamp)
      insert.setDouble(3, value.once)
      insert.execute()
    }
  }

  override def close(): Unit = {
    update.close()
    insert.close()
    //这里关闭了 connection 报错?????.........
  }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容