1.Sink-文件
注意输出的地址不要是文件...
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._
case class Counter_reset(ID: Int, stamp: Long, once: Double)
object SinkTest {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
// environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
var path = "src/main/resources/trans1"
val filterText = environment.readTextFile(path)
val value = filterText
.map(data => {
val strings = data.split("\\s+")
Counter_reset(strings(0).toInt, strings(1).toLong, strings(2).toDouble)
})
//sink
value.addSink(
StreamingFileSink.forRowFormat(
new Path("src/main/resources/"),
new SimpleStringEncoder[Counter_reset]()
).build()
)
environment.execute("myjob")
}
}
2.sink(kafka)
3.
4.sink(Redis)
5.sink(mysql)
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink._
import org.apache.flink.streaming.api.scala._
case class Counter_reset1(ID: Int, stamp: Long, once: Double)
object SinkMysql {
def main(args: Array[String]): Unit = {
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setNumberOfExecutionRetries(1)
var path = "src/main/resources/trans1"
var value = env.readTextFile(path)
var result = value.map(txt => {
var data = txt.split("\\s+")
Counter_reset1(data(0).toInt, data(1).toLong, data(2).toDouble)
}
)
//这里addSink红色 原因Counter_reset1.....我写成了Counter_reset
result.addSink(new MysqlSinkFuntion)
env.execute("mysqlsink")
}
}
class MysqlSinkFuntion() extends RichSinkFunction[Counter_reset1] {
Class.forName("com.mysql.jdbc.Driver")
//先定义连接
var connection: Connection = _
//更新和查询操作
var insert: PreparedStatement = _
var update: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
connection = DriverManager.getConnection("JDBC:mysql://localhost:3306/test", "root", "123456");
insert = connection.prepareStatement("insert into db_demo1 values (?,?,?)")
update = connection.prepareStatement("update db_demo1 set ID = ? where once = ? ")
}
override def invoke(value: Counter_reset1): Unit = {
//更新操作
update.setInt(1, value.ID)
update.setDouble(2, value.once)
update.execute()
if (update.getUpdateCount == 0) {
insert.setInt(1, value.ID)
insert.setLong(2, value.stamp)
insert.setDouble(3, value.once)
insert.execute()
}
}
override def close(): Unit = {
update.close()
insert.close()
//这里关闭了 connection 报错?????.........
}
}
