聊聊flink的BoundedOutOfOrdernessTimestampExtractor

news/2024/7/7 7:00:11

本文主要研究一下flink的BoundedOutOfOrdernessTimestampExtractor

BoundedOutOfOrdernessTimestampExtractor

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java

/**
 * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with
 * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can
 * help reduce the number of elements that are ignored due to lateness when computing the final result for a
 * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time
 * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp.
 * */
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

	private static final long serialVersionUID = 1L;

	/** The current maximum timestamp seen so far. */
	private long currentMaxTimestamp;

	/** The timestamp of the last emitted watermark. */
	private long lastEmittedWatermark = Long.MIN_VALUE;

	/**
	 * The (fixed) interval between the maximum seen timestamp seen in the records
	 * and that of the watermark to be emitted.
	 */
	private final long maxOutOfOrderness;

	public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
		if (maxOutOfOrderness.toMilliseconds() < 0) {
			throw new RuntimeException("Tried to set the maximum allowed " +
				"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
		}
		this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
		this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
	}

	public long getMaxOutOfOrdernessInMillis() {
		return maxOutOfOrderness;
	}

	/**
	 * Extracts the timestamp from the given element.
	 *
	 * @param element The element that the timestamp is extracted from.
	 * @return The new timestamp.
	 */
	public abstract long extractTimestamp(T element);

	@Override
	public final Watermark getCurrentWatermark() {
		// this guarantees that the watermark never goes backwards.
		long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
		if (potentialWM >= lastEmittedWatermark) {
			lastEmittedWatermark = potentialWM;
		}
		return new Watermark(lastEmittedWatermark);
	}

	@Override
	public final long extractTimestamp(T element, long previousElementTimestamp) {
		long timestamp = extractTimestamp(element);
		if (timestamp > currentMaxTimestamp) {
			currentMaxTimestamp = timestamp;
		}
		return timestamp;
	}
}
复制代码
  • BoundedOutOfOrdernessTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现
  • BoundedOutOfOrdernessTimestampExtractor的构造器接收maxOutOfOrderness参数用于指定element允许滞后(t-t_w,t为element的eventTime,t_w为前一次watermark的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略
  • BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法会调用子类的extractTimestamp方法抽取时间,如果该时间大于currentMaxTimestamp,则更新currentMaxTimestamp;getCurrentWatermark先计算potentialWM,如果potentialWM大于等于lastEmittedWatermark则更新lastEmittedWatermark(currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,这里表示lastEmittedWatermark太小了所以差值超过了maxOutOfOrderness,因而调大lastEmittedWatermark),最后返回Watermark(lastEmittedWatermark)

实例

	public static void main(String[] args) throws Exception {

		final int popThreshold = 20; // threshold for popular places

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.getConfig().setAutoWatermarkInterval(1000);

		// configure the Kafka consumer
		Properties kafkaProps = new Properties();
		kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
		kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
		kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
		// always read the Kafka topic from the start
		kafkaProps.setProperty("auto.offset.reset", "earliest");

		// create a Kafka consumer
		FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
				"cleansedRides",
				new TaxiRideSchema(),
				kafkaProps);
		// assign a timestamp extractor to the consumer
		consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

		// create a TaxiRide data stream
		DataStream<TaxiRide> rides = env.addSource(consumer);

		// find popular places
		DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
				// match ride to grid cell and event type (start or end)
				.map(new GridCellMatcher())
				// partition by cell id and event type
				.keyBy(0, 1)
				// build sliding window
				.timeWindow(Time.minutes(15), Time.minutes(5))
				// count ride events in window
				.apply(new RideCounter())
				// filter by popularity threshold
				.filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
				// map grid cell to coordinates
				.map(new GridToCoordinates());

		popularPlaces.print();

		// execute the transformation pipeline
		env.execute("Popular Places from Kafka");
	}

	/**
	 * Assigns timestamps to TaxiRide records.
	 * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
	 */
	public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

		public TaxiRideTSExtractor() {
			super(Time.seconds(MAX_EVENT_DELAY));
		}

		@Override
		public long extractTimestamp(TaxiRide ride) {
			if (ride.isStart) {
				return ride.startTime.getMillis();
			}
			else {
				return ride.endTime.getMillis();
			}
		}
	}
复制代码
  • 该实例使用的是AssignerWithPeriodicWatermarks,通过env.getConfig().setAutoWatermarkInterval(1000)设置了watermark的时间间隔,通过assignTimestampsAndWatermarks指定了AssignerWithPeriodicWatermarks为TaxiRideTSExtractor,它继承了BoundedOutOfOrdernessTimestampExtractor抽象类

小结

  • flink为了方便开发提供了几个内置的Pre-defined Timestamp Extractors / Watermark Emitters,其中一个就是BoundedOutOfOrdernessTimestampExtractor
  • BoundedOutOfOrdernessTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现
  • BoundedOutOfOrdernessTimestampExtractor的构造器接收maxOutOfOrderness参数用于指定element允许滞后(t-t_w,t为element的eventTime,t_w为前一次watermark的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略

doc

  • Pre-defined Timestamp Extractors / Watermark Emitters

http://www.niftyadmin.cn/n/3039988.html

相关文章

Node.js学习(6)----事件

Node.js所有的异步I/O操作在完成时都会发送一个事件到事件队列。在开发者看来&#xff0c;事件由EventEmitter对象提供。 /*** New node file*/ var EventEmitterrequire(events).EventEmitter; var eventnew EventEmitter();event.on(some_event,function(){ console.log(some…

什么是云主机?

为啥总有人念叨云计算、云主机、大数据&#xff0c;不明觉厉&#xff0c;好像都很高端&#xff0c;感觉距离生活太遥远了。但其实云主机对你生活的影响一点也不小&#xff0c;云计算也潜移默化的影响的你的生活&#xff0c;我们工作生活很多地方都可以用到云主机&#xff0c;利…

开源工具 | 手游自动化框架GAutomator,新增iOS系统和UE4引擎支

作者&#xff1a;WeTest小编商业转载请联系腾讯WeTest获得授权&#xff0c;非商业转载请注明出处。原文链接&#xff1a;https://wetest.qq.com/lab/view/430.html WeTest 导读 GAutomator是腾讯WeTest推出的手游自动化测试框架&#xff0c;已用于腾讯多个手游项目组的自动化测…

千元悬赏修复 OSC iPhone 客户端网络连接问题

在使用 OSChina 的 iPhone 客户端的时候有这样一个问题&#xff1a;在查看最新动弹的界面里&#xff0c;一旦出现网络问题无法获取数据后&#xff0c;等网络恢复后也一直无法获取数据。只能是强行退出程序后再次启动方可获取。 一旦出问题后动弹界面一直保持如下图所示状况&…

GotHub Strman-java – 字符串处理

Strmen-java是一个字符串处理工具&#xff0c;你可以通过maven将它引入到项目中。除了Java本身的字符串处理方式外&#xff0c;我们还可以使用Apache Common Langs里的StringUtils来简化String的操作。但以上两种方式对于我们日常编程中最容易碰到的字符串处理来说&#xff0c;…

[windows server 2008 站点系列一]AD的站点建立与子网的管理

本次课程将给大家介绍AD中站点和子网的功能、站点和子网之间的关联&#xff0c;以及相关的设置步骤。应用背景介绍&#xff1a;contoso公司的总部在西安&#xff08;Xian&#xff09;&#xff0c;陕南的汉中&#xff08;Shannan&#xff09;和陕北的榆林&#xff08;Shanbei&am…

在CentOS 7上部署Ghost博客

作者&#xff1a;waringid一、简介跟静态博客不同的是&#xff0c;Ghost 这种轻量级的动态博客&#xff0c;有一个管理后台&#xff0c;可以直接写作和管理博客。本质上&#xff0c;跟 WordPress 是相通的&#xff0c;只是 Ghost 搭建在 Node.js 环境上&#xff0c;轻量&#x…

云主机的建站的优势

无论是公司还是说个人&#xff0c;在做网站的时候&#xff0c;往往都有一些比较基本的要求&#xff0c;他们希望可以浏览顺畅、响应较快、不会宕机等。而云主机所具有的优异性能&#xff0c;给网站运营者营造了一个十分良好的基础环境&#xff0c;要比虚拟主机、物理服务器这些…