- 配置Flume文件(配置与Flume文件conf文件夹中)
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#source
a1.sources.r1.type=exec
a1.sources.r1.command=nc 0.0.0.0 7777
#sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=flinktestTopic
a1.sinks.k1.kafka.bootstrap.servers=xueai:9092
a1.sinks.k1.kafka.produces.acks=1
#channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#bind
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.12</artifactId>
<version>1.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>
import org.apache.flink.configuration
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import java.util.Properties
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction}
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.time.Time
object test01 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val prop = new Properties()
prop.setProperty("bootstrap.servers","ip地址:9092")
val inputStream = env.addSource(new FlinkKafkaConsumer010[String]("flinktestTopic",new SimpleStringSchema(),prop))
inputStream.print()
env.execute()
}
}
-
步骤
- 第一步:先运行flink代码
- 第二步:正常启动集群
- 第三步:先执行连接端口输入消费单(输入数据):nc -lk 7777
- 第四步:执行Flume(我这里用的是绝对路径,因为没有配环境变量
-
最后结果