当前位置:   article > 正文

Flink实时计算斗鱼某直播间弹幕发言数TopN_弹幕机发言次数怎么算

弹幕机发言次数怎么算

先获取斗鱼直播间弹幕,由kafka发送至flink消费,每隔10min输出最近1H内发言数量前10的用户名。

主要pom依赖

  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>1.11.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>1.11.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.12</artifactId>
      <version>1.11.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.12</artifactId>
      <version>1.11.2</version>
    </dependency>

    <dependency>
      <groupId>org.java-websocket</groupId>
      <artifactId>Java-WebSocket</artifactId>
      <version>1.5.1</version>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

获取斗鱼弹幕JAVA类:

主要代码:

DouyuClient.java:
package cn.sven.bcvs.douyu;


import cn.sven.bcvs.utils.GetKafkaProducer;
import cn.sven.bcvs.utils.WriteFile;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Date;

public class DouyuClient {
    public static void main(String[] args) throws Exception {
        new DouyuClient().crawl();
    }


    public void crawl() throws Exception {
        //获取kafka实例
        final Producer<String, String> producer = GetKafkaProducer.get();

        WebSocketClient websocketclient = new WebSocketClient(new URI("wss://danmuproxy.douyu.com:8506/"
        ), new Draft_6455()) {
            @Override
            public void onOpen(ServerHandshake handshakedata) {
                try {
                    send(login());//发送登录请求
                    send(joinGroup());//发送加入群组请求
                    send(heartBeat());//发送心跳
                    Thread heartBeatThread = new Thread(new Runnable() {
                        @Override
                        public void run() {
                            while (true) {
                                try {
                                    send(heartBeat());
                                    System.out.println("发送心跳");
                                    Thread.sleep(45000);
                                }  catch (IOException | InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    });
                    heartBeatThread.start();
                } catch (IOException e) {

                    e.printStackTrace();
                }
                System.out.println("打开连接");
            }

            @Override
            public void onMessage(String message) {
            }

            public void onMessage(ByteBuffer bytes) {
                Charset charset = StandardCharsets.UTF_8;
                CharBuffer charBuffer = charset.decode(bytes);
                String str = charBuffer.toString();
                if (str.contains("txt")) {
                    String name = str.substring(str.indexOf("nn@=") + 4, str.indexOf("/", str.indexOf("nn@="))).replace("\001", "");
                    String danmu = str.substring(str.indexOf("txt@=") + 5, str.indexOf("/", str.indexOf("txt@="))).replace("\001", "");
                    String owner = str.substring(str.indexOf("bnn@=") + 5, str.indexOf("/", str.indexOf("bnn@="))).replace("\001", "");
                    if (owner.isEmpty()) owner = "N";
                    long timestamp = new Date().getTime() / 1000;
                    String msg = name + "\001" + owner + "\001" + danmu + "\001" + timestamp;
                    //将数据写入文件并传输到kafka中
                    producer.send(new ProducerRecord<String, String>("yyf", null, msg));
                    //将数据写入磁盘中
                    //WriteFile.write(msg);

                }
            }

            @Override
            public void onClose(int i, String s, boolean b) {
                System.out.println("连接关闭");
            }

            @Override
            public void onError(Exception e) {
                System.err.println(e);
                System.out.println("发生错误");
            }
        };
        websocketclient.run();
    }

    public byte[] login() throws IOException {
    	//根据直播间号获取实时弹幕
        String message = "type@=loginreq/roomid@=9999/";
        return douyuRequestEncode(message);
    }

    //加入群组请求
    public byte[] joinGroup() throws IOException {
   		 //根据直播间号获取实时弹幕
        String message = "type@=joingroup/rid@=9999/gid@=-9999/";
        return douyuRequestEncode(message);
    }

    //心跳
    public byte[] heartBeat() throws IOException {
        String message = "type@=mrkl/";
        return douyuRequestEncode(message);
    }

    //将传入的数据变成符合斗鱼协议要求的字节流返回
    public byte[] douyuRequestEncode(String message) throws IOException {
        int dataLen1 = message.length() + 9;//4 字节小端整数,表示整条消息(包括自身)长度(字节数)。
        int dataLen2 = message.length() + 9;//消息长度出现两遍,二者相同。
        int send = 689;//689 客户端发送给弹幕服务器的文本格式数据,暂时未用,默认为 0。保留字段:暂时未用,默认为 0。
        byte[] msgBytes = message.getBytes(StandardCharsets.UTF_8);
        int end = 0;
        byte[] endBytes = new byte[1];
        endBytes[0] = (byte) (end & 0xFF);
        ;//结尾必须为‘\0’。详细序列化、反序列化

        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        bytes.write(intToBytesLittle(dataLen1));
        bytes.write(intToBytesLittle(dataLen2));
        bytes.write(intToBytesLittle(send));
        bytes.write(msgBytes);
        bytes.write(endBytes);
        //返回byte[]
        return bytes.toByteArray();
    }

    //将整形转化为4位小端字节流
    public byte[] intToBytesLittle(int value) {
        return new byte[]{
                (byte) (value & 0xFF),
                (byte) ((value >> 8) & 0xFF),
                (byte) ((value >> 16) & 0xFF),
                (byte) ((value >> 24) & 0xFF)
        };
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
Draft_6455.java
此类原属于org.java_websocket.drafts,在DouyuClient.java中需要用到此列,但是直接调用会有异常需要修改部分地方。下面是已经修改好的。
/*
 * Copyright (c) 2010-2020 Nathan Rajlich
 *
 *  Permission is hereby granted, free of charge, to any person
 *  obtaining a copy of this software and associated documentation
 *  files (the "Software"), to deal in the Software without
 *  restriction, including without limitation the rights to use,
 *  copy, modify, merge, publish, distribute, sublicense, and/or sell
 *  copies of the Software, and to permit persons to whom the
 *  Software is furnished to do so, subject to the following
 *  conditions:
 *
 *  The above copyright notice and this permission notice shall be
 *  included in all copies or substantial portions of the Software.
 *
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
 *  OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 *  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
 *  WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 *  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
 *  OTHER DEALINGS IN THE SOFTWARE.
 */

package cn.sven.bcvs.douyu;

import org.java_websocket.WebSocketImpl;
import org.java_websocket.drafts.Draft;
import org.java_websocket.enums.*;
import org.java_websocket.exceptions.*;
import org.java_websocket.extensions.DefaultExtension;
import org.java_websocket.extensions.IExtension;
import org.java_websocket.framing.*;
import org.java_websocket.handshake.*;
import org.java_websocket.protocols.IProtocol;
import org.java_websocket.protocols.Protocol;
import org.java_websocket.util.Base64;
import org.java_websocket.util.Charsetfunctions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * Implementation for the RFC 6455 websocket protocol
 * This is the recommended class for your websocket connection
 */
public class Draft_6455 extends Draft {

	/**
	 * Handshake specific field for the key
	 */
	private static final String SEC_WEB_SOCKET_KEY = "Sec-WebSocket-Key";

	/**
	 * Handshake specific field for the protocol
	 */
	private static final String SEC_WEB_SOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

	/**
	 * Handshake specific field for the extension
	 */
	private static final String SEC_WEB_SOCKET_EXTENSIONS = "Sec-WebSocket-Extensions";

	/**
	 * Handshake specific field for the accept
	 */
	private static final String SEC_WEB_SOCKET_ACCEPT = "Sec-WebSocket-Accept";

	/**
	 * Handshake specific field for the upgrade
	 */
	private static final String UPGRADE = "Upgrade" ;

	/**
	 * Handshake specific field for the connection
	 */
	private static final String CONNECTION = "Connection";

	/**
	 * Logger instance
	 *
	 * @since 1.4.0
	 */
	private final Logger log = LoggerFactory.getLogger(Draft_6455.class);

	/**
	 * Attribute for the used extension in this draft
	 */
	private IExtension extension = new DefaultExtension();

	/**
	 * Attribute for all available extension in this draft
	 */
	private List<IExtension> knownExtensions;

	/**
	 * Attribute for the used protocol in this draft
	 */
	private IProtocol protocol;

	/**
	 * Attribute for all available protocols in this draft
	 */
	private List<IProtocol> knownProtocols;

	/**
	 * Attribute for the current continuous frame
	 */
	private Framedata currentContinuousFrame;

	/**
	 * Attribute for the payload of the current continuous frame
	 */
	private final List<ByteBuffer> byteBufferList;

	/**
	 * Attribute for the current incomplete frame
	 */
	private ByteBuffer incompleteframe;

	/**
	 * Attribute for the reusable random instance
	 */
	private final Random reuseableRandom = new Random();

	/**
	 * Attribute for the maximum allowed size of a frame
	 *
	 * @since 1.4.0
	 */
	private int maxFrameSize;

	/**
	 * Constructor for the websocket protocol specified by RFC 6455 with default extensions
	 * @since 1.3.5
	 */
	public Draft_6455() {
		this( Collections.<IExtension>emptyList() );
	}

	/**
	 * Constructor for the websocket protocol specified by RFC 6455 with custom extensions
	 *
	 * @param inputExtension the extension which should be used for this draft
	 * @since 1.3.5
	 */
	public Draft_6455(IExtension inputExtension ) {
		this( Collections.singletonList( inputExtension ) );
	}

	/**
	 * Constructor for the websocket protocol specified by RFC 6455 with custom extensions
	 *
	 * @param inputExtensions the extensions which should be used for this draft
	 * @since 1.3.5
	 */
	public Draft_6455(List<IExtension> inputExtensions ) {
		this( inputExtensions, Collections.<IProtocol>singletonList( new Protocol( "" ) ));
	}

	/**
	 * Constructor for the websocket protocol specified by RFC 6455 with custom extensions and protocols
	 *
	 * @param inputExtensions the extensions which should be used for this draft
	 * @param inputProtocols the protocols which should be used for this draft
	 *
	 * @since 1.3.7
	 */
	public Draft_6455(List<IExtension> inputExtensions , List<IProtocol> inputProtocols ) {
		this(inputExtensions, inputProtocols, Integer.MAX_VALUE);
	}

	/**
	 * Constructor for the websocket protocol specified by RFC 6455 with custom extensions and protocols
	 *
	 * @param inputExtensions the extensions which should be used for this draft
	 * @param inputMaxFrameSize the maximum allowed size of a frame (the real payload size, decoded frames can be bigger)
	 *
	 * @since 1.4.0
	 */
	public Draft_6455(List<IExtension> inputExtensions , int inputMaxFrameSize) {
		this(inputExtensions, Collections.<IProtocol>singletonList( new Protocol( "" )), inputMaxFrameSize);
	}

	/**
	 * Constructor for the websocket protocol specified by RFC 6455 with custom extensions and protocols
	 *
	 * @param inputExtensions the extensions which should be used for this draft
	 * @param inputProtocols the protocols which should be used for this draft
	 * @param inputMaxFrameSize the maximum allowed size of a frame (the real payload size, decoded frames can be bigger)
	 *
	 * @since 1.4.0
	 */
	public Draft_6455(List<IExtension> inputExtensions , List<IProtocol> inputProtocols, int inputMaxFrameSize ) {
		if (inputExtensions == null || inputProtocols == null || inputMaxFrameSize < 1) {
			throw new IllegalArgumentException();
		}
		knownExtensions = new ArrayList<IExtension>( inputExtensions.size());
		knownProtocols = new ArrayList<IProtocol>( inputProtocols.size());
		boolean hasDefault = false;
		byteBufferList = new ArrayList<ByteBuffer>();
		for( IExtension inputExtension : inputExtensions ) {
			if( inputExtension.getClass().equals( DefaultExtension.class ) ) {
				hasDefault = true;
			}
		}
		knownExtensions.addAll( inputExtensions );
		//We always add the DefaultExtension to implement the normal RFC 6455 specification
		if( !hasDefault ) {
			knownExtensions.add( this.knownExtensions.size(), extension );
		}
		knownProtocols.addAll( inputProtocols );
		maxFrameSize = inputMaxFrameSize;
	}

	@Override
	public HandshakeState acceptHandshakeAsServer( ClientHandshake handshakedata ) throws InvalidHandshakeException {
//		int v = readVersion( handshakedata );
//		if( v != 13 ) {
//			log.trace("acceptHandshakeAsServer - Wrong websocket version.");
//			return HandshakeState.NOT_MATCHED;
//		}
		HandshakeState extensionState = HandshakeState.NOT_MATCHED;
		String requestedExtension = handshakedata.getFieldValue(SEC_WEB_SOCKET_EXTENSIONS);
        for( IExtension knownExtension : knownExtensions ) {
            if( knownExtension.acceptProvidedExtensionAsServer( requestedExtension ) ) {
                extension = knownExtension;
                extensionState = HandshakeState.MATCHED;
                log.trace("acceptHandshakeAsServer - Matching extension found: {}", extension);
                break;
            }
        }
		HandshakeState protocolState = containsRequestedProtocol(handshakedata.getFieldValue(SEC_WEB_SOCKET_PROTOCOL));
		if (protocolState == HandshakeState.MATCHED && extensionState == HandshakeState.MATCHED) {
			return HandshakeState.MATCHED;
		}
		log.trace("acceptHandshakeAsServer - No matching extension or protocol found.");
		return HandshakeState.NOT_MATCHED;
	}

	/**
	 * Check if the requested protocol is part of this draft
	 * @param requestedProtocol the requested protocol
	 * @return MATCHED if it is matched, otherwise NOT_MATCHED
	 */
	private HandshakeState containsRequestedProtocol(String requestedProtocol) {
		for( IProtocol knownProtocol : knownProtocols ) {
			if( knownProtocol.acceptProvidedProtocol( requestedProtocol ) ) {
				protocol = knownProtocol;
				log.trace("acceptHandshake - Matching protocol found: {}", protocol);
				return HandshakeState.MATCHED;
			}
		}
		return HandshakeState.NOT_MATCHED;
	}

	@Override
	public HandshakeState acceptHandshakeAsClient( ClientHandshake request, ServerHandshake response ) throws InvalidHandshakeException {
		if (! basicAccept( response )) {
			log.trace("acceptHandshakeAsClient - Missing/wrong upgrade or connection in handshake.");
			return HandshakeState.NOT_MATCHED;
		}
		if( !request.hasFieldValue( SEC_WEB_SOCKET_KEY ) || !response.hasFieldValue( SEC_WEB_SOCKET_ACCEPT ) ) {
			log.trace("acceptHandshakeAsClient - Missing Sec-WebSocket-Key or Sec-WebSocket-Accept");
			return HandshakeState.NOT_MATCHED;
		}

		String seckeyAnswer = response.getFieldValue( SEC_WEB_SOCKET_ACCEPT );
		String seckeyChallenge = request.getFieldValue( SEC_WEB_SOCKET_KEY );
		seckeyChallenge = generateFinalKey( seckeyChallenge );

		if( !seckeyChallenge.equals( seckeyAnswer ) ) {
			log.trace("acceptHandshakeAsClient - Wrong key for Sec-WebSocket-Key.");
			return HandshakeState.NOT_MATCHED;
		}
		HandshakeState extensionState = HandshakeState.NOT_MATCHED;
		String requestedExtension = response.getFieldValue(SEC_WEB_SOCKET_EXTENSIONS);
		for( IExtension knownExtension : knownExtensions ) {
			if( knownExtension.acceptProvidedExtensionAsClient( requestedExtension ) ) {
				extension = knownExtension;
				extensionState = HandshakeState.MATCHED;
				log.trace("acceptHandshakeAsClient - Matching extension found: {}",extension);
				break;
			}
		}
		HandshakeState protocolState = containsRequestedProtocol(response.getFieldValue(SEC_WEB_SOCKET_PROTOCOL));
		if (protocolState == HandshakeState.MATCHED && extensionState == HandshakeState.MATCHED) {
			return HandshakeState.MATCHED;
		}
		log.trace("acceptHandshakeAsClient - No matching extension or protocol found.");
		return HandshakeState.NOT_MATCHED;
	}

	/**
	 * Getter for the extension which is used by this draft
	 *
	 * @return the extension which is used or null, if handshake is not yet done
	 */
	public IExtension getExtension() {
		return extension;
	}

	/**
	 * Getter for all available extensions for this draft
	 * @return the extensions which are enabled for this draft
	 */
	public List<IExtension> getKnownExtensions() {
		return knownExtensions;
	}

	/**
	 * Getter for the protocol which is used by this draft
	 *
	 * @return the protocol which is used or null, if handshake is not yet done or no valid protocols
	 * @since 1.3.7
	 */
	public IProtocol getProtocol() {
		return protocol;
	}


	/**
	 * Getter for the maximum allowed payload size which is used by this draft
	 *
	 * @return the size, which is allowed for the payload
	 * @since 1.4.0
	 */
	public int getMaxFrameSize() {
		return maxFrameSize;
	}

	/**
	 * Getter for all available protocols for this draft
	 * @return the protocols which are enabled for this draft
	 * @since 1.3.7
	 */
	public List<IProtocol> getKnownProtocols() {
		return knownProtocols;
	}

	@Override
	public ClientHandshakeBuilder postProcessHandshakeRequestAsClient( ClientHandshakeBuilder request ) {
		request.put( UPGRADE, "websocket" );
		request.put( CONNECTION, UPGRADE ); // to respond to a Connection keep alives
		byte[] random = new byte[16];
		reuseableRandom.nextBytes( random );
		request.put( SEC_WEB_SOCKET_KEY , Base64.encodeBytes( random ) );
		request.put( "Sec-WebSocket-Version", "13" );// overwriting the previous
		StringBuilder requestedExtensions = new StringBuilder();
		for( IExtension knownExtension : knownExtensions ) {
			if( knownExtension.getProvidedExtensionAsClient() != null && knownExtension.getProvidedExtensionAsClient().length() != 0 ) {
				if (requestedExtensions.length() > 0) {
					requestedExtensions.append( ", " );
				}
				requestedExtensions.append( knownExtension.getProvidedExtensionAsClient() );
			}
		}
		if( requestedExtensions.length() != 0 ) {
			request.put(SEC_WEB_SOCKET_EXTENSIONS, requestedExtensions.toString() );
		}
		StringBuilder requestedProtocols = new StringBuilder();
		for( IProtocol knownProtocol : knownProtocols ) {
			if( knownProtocol.getProvidedProtocol().length() != 0 ) {
				if (requestedProtocols.length() > 0) {
					requestedProtocols.append( ", " );
				}
				requestedProtocols.append( knownProtocol.getProvidedProtocol() );
			}
		}
		if( requestedProtocols.length() != 0 ) {
			request.put(SEC_WEB_SOCKET_PROTOCOL, requestedProtocols.toString() );
		}
		return request;
	}

	@Override
	public HandshakeBuilder postProcessHandshakeResponseAsServer( ClientHandshake request, ServerHandshakeBuilder response ) throws InvalidHandshakeException {
		response.put( UPGRADE, "websocket" );
		response.put( CONNECTION, request.getFieldValue( CONNECTION) ); // to respond to a Connection keep alives
		String seckey = request.getFieldValue(SEC_WEB_SOCKET_KEY);
		if( seckey == null )
			throw new InvalidHandshakeException( "missing Sec-WebSocket-Key" );
		response.put( SEC_WEB_SOCKET_ACCEPT, generateFinalKey( seckey ) );
		if( getExtension().getProvidedExtensionAsServer().length() != 0 ) {
			response.put(SEC_WEB_SOCKET_EXTENSIONS, getExtension().getProvidedExtensionAsServer() );
		}
		if( getProtocol() != null && getProtocol().getProvidedProtocol().length() != 0 ) {
			response.put(SEC_WEB_SOCKET_PROTOCOL, getProtocol().getProvidedProtocol() );
		}
		response.setHttpStatusMessage( "Web Socket Protocol Handshake" );
		response.put( "Server", "TooTallNate Java-WebSocket" );
		response.put( "Date", getServerTime() );
		return response;
	}

	@Override
	public Draft copyInstance() {
		ArrayList<IExtension> newExtensions = new ArrayList<IExtension>();
		for( IExtension iExtension : getKnownExtensions() ) {
			newExtensions.add( iExtension.copyInstance() );
		}
		ArrayList<IProtocol> newProtocols = new ArrayList<IProtocol>();
		for( IProtocol iProtocol : getKnownProtocols() ) {
			newProtocols.add( iProtocol.copyInstance() );
		}
		return new Draft_6455( newExtensions, newProtocols, maxFrameSize );
	}

	@Override
	public ByteBuffer createBinaryFrame( Framedata framedata ) {
		getExtension().encodeFrame( framedata );
		if (log.isTraceEnabled())
			log.trace( "afterEnconding({}): {}" , framedata.getPayloadData().remaining(), ( framedata.getPayloadData().remaining() > 1000 ? "too big to display" : new String( framedata.getPayloadData().array() ) ) );
		return createByteBufferFromFramedata( framedata );
	}

	private ByteBuffer createByteBufferFromFramedata( Framedata framedata ) {
		ByteBuffer mes = framedata.getPayloadData();
		boolean mask = role == Role.CLIENT;
		int sizebytes = getSizeBytes(mes);
		ByteBuffer buf = ByteBuffer.allocate( 1 + ( sizebytes > 1 ? sizebytes + 1 : sizebytes ) + ( mask ? 4 : 0 ) + mes.remaining() );
		byte optcode = fromOpcode( framedata.getOpcode() );
		byte one = ( byte ) ( framedata.isFin() ? -128 : 0 );
		one |= optcode;
		if(framedata.isRSV1())
			one |= getRSVByte(1);
		if(framedata.isRSV2())
			one |= getRSVByte(2);
		if(framedata.isRSV3())
			one |= getRSVByte(3);
		buf.put( one );
		byte[] payloadlengthbytes = toByteArray( mes.remaining(), sizebytes );
		assert ( payloadlengthbytes.length == sizebytes );

		if( sizebytes == 1 ) {
			buf.put( ( byte ) ( payloadlengthbytes[0] | getMaskByte(mask) ) );
		} else if( sizebytes == 2 ) {
			buf.put( ( byte ) ( ( byte ) 126 | getMaskByte(mask)));
			buf.put( payloadlengthbytes );
		} else if( sizebytes == 8 ) {
			buf.put( ( byte ) ( ( byte ) 127 | getMaskByte(mask)));
			buf.put( payloadlengthbytes );
		} else {
			throw new IllegalStateException("Size representation not supported/specified");
		}
		if( mask ) {
			ByteBuffer maskkey = ByteBuffer.allocate( 4 );
			maskkey.putInt( reuseableRandom.nextInt() );
			buf.put( maskkey.array() );
			for( int i = 0; mes.hasRemaining(); i++ ) {
				buf.put( ( byte ) ( mes.get() ^ maskkey.get( i % 4 ) ) );
			}
		} else {
			buf.put( mes );
			//Reset the position of the bytebuffer e.g. for additional use
			mes.flip();
		}
		assert ( buf.remaining() == 0 ) : buf.remaining();
		buf.flip();
		return buf;
	}

	private Framedata translateSingleFrame( ByteBuffer buffer ) throws IncompleteException, InvalidDataException {
		if (buffer == null)
			throw new IllegalArgumentException();
		int maxpacketsize = buffer.remaining();
		int realpacketsize = 2;
		translateSingleFrameCheckPacketSize(maxpacketsize, realpacketsize);
		byte b1 = buffer.get( /*0*/ );
		boolean fin = b1 >> 8 != 0;
		boolean rsv1 = ( b1 & 0x40 ) != 0;
		boolean rsv2 = ( b1 & 0x20 ) != 0;
		boolean rsv3 = ( b1 & 0x10 ) != 0;
		byte b2 = buffer.get( /*1*/ );
		boolean mask = ( b2 & -128 ) != 0;
		int payloadlength = ( byte ) ( b2 & ~( byte ) 128 );
		Opcode optcode = toOpcode( ( byte ) ( b1 & 15 ) );

		if( !( payloadlength >= 0 && payloadlength <= 125 ) ) {
			TranslatedPayloadMetaData payloadData = translateSingleFramePayloadLength(buffer, optcode, payloadlength ,maxpacketsize, realpacketsize);
			payloadlength = payloadData.getPayloadLength();
			realpacketsize = payloadData.getRealPackageSize();
		}
		translateSingleFrameCheckLengthLimit(payloadlength);
		realpacketsize += ( mask ? 4 : 0 );
		realpacketsize += payloadlength;
		translateSingleFrameCheckPacketSize(maxpacketsize, realpacketsize);

		ByteBuffer payload = ByteBuffer.allocate( checkAlloc( payloadlength ) );
		if( mask ) {
			byte[] maskskey = new byte[4];
			buffer.get( maskskey );
			for( int i = 0; i < payloadlength; i++ ) {
				payload.put( ( byte ) ( buffer.get( /*payloadstart + i*/ ) ^ maskskey[i % 4] ) );
			}
		} else {
			payload.put( buffer.array(), buffer.position(), payload.limit() );
			buffer.position( buffer.position() + payload.limit() );
		}

		FramedataImpl1 frame = FramedataImpl1.get( optcode );
		frame.setFin( fin );
		frame.setRSV1( rsv1 );
		frame.setRSV2( rsv2 );
		frame.setRSV3( rsv3 );
		payload.flip();
		frame.setPayload( payload );
		getExtension().isFrameValid(frame);
		getExtension().decodeFrame(frame);
		if (log.isTraceEnabled())
			log.trace( "afterDecoding({}): {}", frame.getPayloadData().remaining(), ( frame.getPayloadData().remaining() > 1000 ? "too big to display" : new String( frame.getPayloadData().array() ) ) );
		//frame.isValid();
		return frame;
	}

    /**
     * Translate the buffer depending when it has an extended payload length (126 or 127)
     * @param buffer the buffer to read from
     * @param optcode the decoded optcode
     * @param oldPayloadlength the old payload length
     * @param maxpacketsize the max packet size allowed
     * @param oldRealpacketsize the real packet size
     * @return the new payload data containing new payload length and new packet size
     * @throws InvalidFrameException thrown if a control frame has an invalid length
     * @throws IncompleteException if the maxpacketsize is smaller than the realpackagesize
     * @throws LimitExceededException if the payload length is to big
     */
    private TranslatedPayloadMetaData translateSingleFramePayloadLength(ByteBuffer buffer, Opcode optcode, int oldPayloadlength, int maxpacketsize, int oldRealpacketsize) throws InvalidFrameException, IncompleteException, LimitExceededException {
        int payloadlength = oldPayloadlength,
				realpacketsize = oldRealpacketsize;
    	if( optcode == Opcode.PING || optcode == Opcode.PONG || optcode == Opcode.CLOSING ) {
            log.trace( "Invalid frame: more than 125 octets" );
            throw new InvalidFrameException( "more than 125 octets" );
        }
        if( payloadlength == 126 ) {
            realpacketsize += 2; // additional length bytes
            translateSingleFrameCheckPacketSize(maxpacketsize, realpacketsize);
            byte[] sizebytes = new byte[3];
            sizebytes[1] = buffer.get( /*1 + 1*/ );
            sizebytes[2] = buffer.get( /*1 + 2*/ );
            payloadlength = new BigInteger( sizebytes ).intValue();
        } else {
            realpacketsize += 8; // additional length bytes
            translateSingleFrameCheckPacketSize(maxpacketsize, realpacketsize);
            byte[] bytes = new byte[8];
            for( int i = 0; i < 8; i++ ) {
                bytes[i] = buffer.get( /*1 + i*/ );
            }
            long length = new BigInteger( bytes ).longValue();
            translateSingleFrameCheckLengthLimit(length);
            payloadlength = ( int ) length;
        }
        return new TranslatedPayloadMetaData(payloadlength, realpacketsize);
    }

    /**
	 * Check if the frame size exceeds the allowed limit
	 * @param length the current payload length
	 * @throws LimitExceededException if the payload length is to big
	 */
	private void translateSingleFrameCheckLengthLimit(long length) throws LimitExceededException {
		if( length > Integer.MAX_VALUE ) {
			log.trace("Limit exedeed: Payloadsize is to big...");
			throw new LimitExceededException("Payloadsize is to big...");
		}
		if( length > maxFrameSize) {
			log.trace( "Payload limit reached. Allowed: {} Current: {}" , maxFrameSize, length);
			throw new LimitExceededException( "Payload limit reached.", maxFrameSize );
		}
		if( length < 0 ) {
			log.trace("Limit underflow: Payloadsize is to little...");
			throw new LimitExceededException("Payloadsize is to little...");
		}
	}

	/**
	 * Check if the max packet size is smaller than the real packet size
	 * @param maxpacketsize the max packet size
	 * @param realpacketsize the real packet size
	 * @throws IncompleteException if the maxpacketsize is smaller than the realpackagesize
	 */
	private void translateSingleFrameCheckPacketSize(int maxpacketsize, int realpacketsize) throws IncompleteException {
		if( maxpacketsize < realpacketsize ) {
			log.trace( "Incomplete frame: maxpacketsize < realpacketsize" );
			throw new IncompleteException( realpacketsize );
		}
	}

	/**
	 * Get a byte that can set RSV bits when OR(|)'d.
	 * 0 1 2 3 4 5 6 7
	 * +-+-+-+-+-------+
	 * |F|R|R|R| opcode|
	 * |I|S|S|S|  (4)  |
	 * |N|V|V|V|       |
	 * | |1|2|3|       |
	 * @param rsv Can only be {0, 1, 2, 3}
	 * @return byte that represents which RSV bit is set.
	 */
	private byte getRSVByte(int rsv){
		if(rsv == 1) // 0100 0000
			return 0x40;
		if(rsv == 2) // 0010 0000
			return 0x20;
		if(rsv == 3) // 0001 0000
			return 0x10;
		return 0;
	}

	/**
	 * Get the mask byte if existing
	 * @param mask is mask active or not
	 * @return -128 for true, 0 for false
	 */
	private byte getMaskByte(boolean mask) {
		return mask ? ( byte ) -128 : 0;
	}

	/**
	 * Get the size bytes for the byte buffer
	 * @param mes the current buffer
	 * @return the size bytes
	 */
	private int getSizeBytes(ByteBuffer mes) {
		if (mes.remaining() <= 125) {
			return 1;
		} else if (mes.remaining() <= 65535) {
			return 2;
		}
		return 8;
	}

	@Override
	public List<Framedata> translateFrame( ByteBuffer buffer ) throws InvalidDataException {
		while( true ) {
			List<Framedata> frames = new LinkedList<Framedata>();
			Framedata cur;
			if( incompleteframe != null ) {
				// complete an incomplete frame
				try {
					buffer.mark();
					int availableNextByteCount = buffer.remaining();// The number of bytes received
					int expectedNextByteCount = incompleteframe.remaining();// The number of bytes to complete the incomplete frame

					if( expectedNextByteCount > availableNextByteCount ) {
						// did not receive enough bytes to complete the frame
						incompleteframe.put( buffer.array(), buffer.position(), availableNextByteCount );
						buffer.position( buffer.position() + availableNextByteCount );
						return Collections.emptyList();
					}
					incompleteframe.put( buffer.array(), buffer.position(), expectedNextByteCount );
					buffer.position( buffer.position() + expectedNextByteCount );
					cur = translateSingleFrame( ( ByteBuffer ) incompleteframe.duplicate().position( 0 ) );
					frames.add( cur );
					incompleteframe = null;
				} catch ( IncompleteException e ) {
					// extending as much as suggested
					ByteBuffer extendedframe = ByteBuffer.allocate( checkAlloc( e.getPreferredSize() ) );
					assert ( extendedframe.limit() > incompleteframe.limit() );
					incompleteframe.rewind();
					extendedframe.put( incompleteframe );
					incompleteframe = extendedframe;
					continue;
				}
			}

			while( buffer.hasRemaining() ) {// Read as much as possible full frames
				buffer.mark();
				try {
					cur = translateSingleFrame( buffer );
					frames.add( cur );
				} catch ( IncompleteException e ) {
					// remember the incomplete data
					buffer.reset();
					int pref = e.getPreferredSize();
					incompleteframe = ByteBuffer.allocate( checkAlloc( pref ) );
					incompleteframe.put( buffer );
					break;
				}
			}
			return frames;
		}
	}

	@Override
	public List<Framedata> createFrames( ByteBuffer binary, boolean mask ) {
		BinaryFrame curframe = new BinaryFrame();
		curframe.setPayload( binary );
		curframe.setTransferemasked( mask );
		try {
			curframe.isValid();
		} catch ( InvalidDataException e ) {
			throw new NotSendableException( e );
		}
		return Collections.singletonList( ( Framedata ) curframe );
	}

	@Override
	public List<Framedata> createFrames( String text, boolean mask ) {
		TextFrame curframe = new TextFrame();
		curframe.setPayload( ByteBuffer.wrap( Charsetfunctions.utf8Bytes( text ) ) );
		curframe.setTransferemasked( mask );
		try {
			curframe.isValid();
		} catch ( InvalidDataException e ) {
			throw new NotSendableException( e );
		}
		return Collections.singletonList( ( Framedata ) curframe );
	}

	@Override
	public void reset() {
		incompleteframe = null;
		if( extension != null ) {
			extension.reset();
		}
		extension = new DefaultExtension();
		protocol = null;
	}

	/**
	 * Generate a date for for the date-header
	 *
	 * @return the server time
	 */
	private String getServerTime() {
		Calendar calendar = Calendar.getInstance();
		SimpleDateFormat dateFormat = new SimpleDateFormat(
				"EEE, dd MMM yyyy HH:mm:ss z", Locale.US );
		dateFormat.setTimeZone( TimeZone.getTimeZone( "GMT" ) );
		return dateFormat.format( calendar.getTime() );
	}

	/**
	 * Generate a final key from a input string
	 * @param in the input string
	 * @return a final key
	 */
	private String generateFinalKey( String in ) {
		String seckey = in.trim();
		String acc = seckey + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
		MessageDigest sh1;
		try {
			sh1 = MessageDigest.getInstance( "SHA1" );
		} catch ( NoSuchAlgorithmException e ) {
			throw new IllegalStateException( e );
		}
		return Base64.encodeBytes( sh1.digest( acc.getBytes() ) );
	}

	private byte[] toByteArray( long val, int bytecount ) {
		byte[] buffer = new byte[bytecount];
		int highest = 8 * bytecount - 8;
		for( int i = 0; i < bytecount; i++ ) {
			buffer[i] = ( byte ) ( val >>> ( highest - 8 * i ) );
		}
		return buffer;
	}


	private byte fromOpcode( Opcode opcode ) {
		if( opcode == Opcode.CONTINUOUS )
			return 0;
		else if( opcode == Opcode.TEXT )
			return 1;
		else if( opcode == Opcode.BINARY )
			return 2;
		else if( opcode == Opcode.CLOSING )
			return 8;
		else if( opcode == Opcode.PING )
			return 9;
		else if( opcode == Opcode.PONG )
			return 10;
		throw new IllegalArgumentException( "Don't know how to handle " + opcode.toString() );
	}

	private Opcode toOpcode( byte opcode ) throws InvalidFrameException {
		switch(opcode) {
			case 0:
				return Opcode.CONTINUOUS;
			case 1:
				return Opcode.TEXT;
			case 2:
				return Opcode.BINARY;
			// 3-7 are not yet defined
			case 8:
				return Opcode.CLOSING;
			case 9:
				return Opcode.PING;
			case 10:
				return Opcode.PONG;
			// 11-15 are not yet defined
			default:
				throw new InvalidFrameException( "Unknown opcode " + ( short ) opcode );
		}
	}

	@Override
	public void processFrame( WebSocketImpl webSocketImpl, Framedata frame ) throws InvalidDataException {
		Opcode curop = frame.getOpcode();
		if( curop == Opcode.CLOSING ) {
			processFrameClosing(webSocketImpl, frame);
		} else if( curop == Opcode.PING ) {
			webSocketImpl.getWebSocketListener().onWebsocketPing( webSocketImpl, frame );
		} else if( curop == Opcode.PONG ) {
			webSocketImpl.updateLastPong();
			webSocketImpl.getWebSocketListener().onWebsocketPong( webSocketImpl, frame );
		} else if( !frame.isFin() || curop == Opcode.CONTINUOUS ) {
            processFrameContinuousAndNonFin(webSocketImpl, frame, curop);
		} else if( currentContinuousFrame != null ) {
			log.error( "Protocol error: Continuous frame sequence not completed." );
			throw new InvalidDataException( CloseFrame.PROTOCOL_ERROR, "Continuous frame sequence not completed." );
		} else if( curop == Opcode.TEXT ) {
			processFrameText(webSocketImpl, frame);
		} else if( curop == Opcode.BINARY ) {
			processFrameBinary(webSocketImpl, frame);
		} else {
			log.error( "non control or continious frame expected");
			throw new InvalidDataException( CloseFrame.PROTOCOL_ERROR, "non control or continious frame expected" );
		}
	}

    /**
     * Process the frame if it is a continuous frame or the fin bit is not set
     * @param webSocketImpl the websocket implementation to use
     * @param frame the current frame
     * @param curop the current Opcode
     * @throws InvalidDataException if there is a protocol error
     */
    private void processFrameContinuousAndNonFin(WebSocketImpl webSocketImpl, Framedata frame, Opcode curop) throws InvalidDataException {
        if( curop != Opcode.CONTINUOUS ) {
            processFrameIsNotFin(frame);
        } else if( frame.isFin() ) {
            processFrameIsFin(webSocketImpl, frame);
        } else if( currentContinuousFrame == null ) {
            log.error( "Protocol error: Continuous frame sequence was not started." );
            throw new InvalidDataException( CloseFrame.PROTOCOL_ERROR, "Continuous frame sequence was not started." );
        }
        //Check if the whole payload is valid utf8, when the opcode indicates a text
        if( curop == Opcode.TEXT && !Charsetfunctions.isValidUTF8( frame.getPayloadData() ) ) {
            log.error( "Protocol error: Payload is not UTF8" );
            throw new InvalidDataException( CloseFrame.NO_UTF8 );
        }
        //Checking if the current continuous frame contains a correct payload with the other frames combined
        if( curop == Opcode.CONTINUOUS && currentContinuousFrame != null ) {
            addToBufferList(frame.getPayloadData());
        }
    }

    /**
	 * Process the frame if it is a binary frame
	 * @param webSocketImpl the websocket impl
	 * @param frame the frame
	 */
	private void processFrameBinary(WebSocketImpl webSocketImpl, Framedata frame) {
		try {
			webSocketImpl.getWebSocketListener().onWebsocketMessage( webSocketImpl, frame.getPayloadData() );
		} catch ( RuntimeException e ) {
			logRuntimeException(webSocketImpl, e);
		}
	}

	/**
	 * Log the runtime exception to the specific WebSocketImpl
	 * @param webSocketImpl the implementation of the websocket
	 * @param e the runtime exception
	 */
	private void logRuntimeException(WebSocketImpl webSocketImpl, RuntimeException e) {
		log.error( "Runtime exception during onWebsocketMessage", e );
		webSocketImpl.getWebSocketListener().onWebsocketError( webSocketImpl, e );
	}

	/**
	 * Process the frame if it is a text frame
	 * @param webSocketImpl the websocket impl
	 * @param frame the frame
	 */
	private void processFrameText(WebSocketImpl webSocketImpl, Framedata frame) throws InvalidDataException {
		try {
			webSocketImpl.getWebSocketListener().onWebsocketMessage( webSocketImpl, Charsetfunctions.stringUtf8( frame.getPayloadData() ) );
		} catch ( RuntimeException e ) {
			logRuntimeException(webSocketImpl, e);
		}
	}

	/**
	 * Process the frame if it is the last frame
	 * @param webSocketImpl the websocket impl
	 * @param frame the frame
	 * @throws InvalidDataException if there is a protocol error
	 */
	private void processFrameIsFin(WebSocketImpl webSocketImpl, Framedata frame) throws InvalidDataException {
		if( currentContinuousFrame == null ) {
			log.trace( "Protocol error: Previous continuous frame sequence not completed." );
			throw new InvalidDataException( CloseFrame.PROTOCOL_ERROR, "Continuous frame sequence was not started." );
		}
		addToBufferList(frame.getPayloadData());
		checkBufferLimit();
		if( currentContinuousFrame.getOpcode() == Opcode.TEXT ) {
			((FramedataImpl1) currentContinuousFrame).setPayload( getPayloadFromByteBufferList() );
			((FramedataImpl1) currentContinuousFrame).isValid();
			try {
				webSocketImpl.getWebSocketListener().onWebsocketMessage( webSocketImpl, Charsetfunctions.stringUtf8( currentContinuousFrame.getPayloadData() ) );
			} catch ( RuntimeException e ) {
				logRuntimeException(webSocketImpl, e);
			}
		} else if( currentContinuousFrame.getOpcode() == Opcode.BINARY ) {
			((FramedataImpl1) currentContinuousFrame).setPayload( getPayloadFromByteBufferList() );
			((FramedataImpl1) currentContinuousFrame).isValid();
			try {
				webSocketImpl.getWebSocketListener().onWebsocketMessage( webSocketImpl, currentContinuousFrame.getPayloadData() );
			} catch ( RuntimeException e ) {
				logRuntimeException(webSocketImpl, e);
			}
		}
		currentContinuousFrame = null;
		clearBufferList();
	}

	/**
	 * Process the frame if it is not the last frame
	 * @param frame the frame
	 * @throws InvalidDataException if there is a protocol error
	 */
	private void processFrameIsNotFin(Framedata frame) throws InvalidDataException {
		if( currentContinuousFrame != null ) {
			log.trace( "Protocol error: Previous continuous frame sequence not completed." );
			throw new InvalidDataException( CloseFrame.PROTOCOL_ERROR, "Previous continuous frame sequence not completed." );
		}
		currentContinuousFrame = frame;
		addToBufferList(frame.getPayloadData());
		checkBufferLimit();
	}

	/**
	 * Process the frame if it is a closing frame
	 * @param webSocketImpl the websocket impl
	 * @param frame the frame
	 */
	private void processFrameClosing(WebSocketImpl webSocketImpl, Framedata frame) {
		int code = CloseFrame.NOCODE;
		String reason = "";
		if( frame instanceof CloseFrame ) {
			CloseFrame cf = ( CloseFrame ) frame;
			code = cf.getCloseCode();
			reason = cf.getMessage();
		}
		if( webSocketImpl.getReadyState() == ReadyState.CLOSING ) {
			// complete the close handshake by disconnecting
			webSocketImpl.closeConnection( code, reason, true );
		} else {
			// echo close handshake
			if( getCloseHandshakeType() == CloseHandshakeType.TWOWAY )
				webSocketImpl.close( code, reason, true );
			else
				webSocketImpl.flushAndClose( code, reason, false );
		}
	}

	/**
	 * Clear the current bytebuffer list
	 */
	private void clearBufferList() {
		synchronized (byteBufferList) {
			byteBufferList.clear();
		}
	}

	/**
	 * Add a payload to the current bytebuffer list
	 * @param payloadData the new payload
	 */
	private void addToBufferList(ByteBuffer payloadData) {
		synchronized (byteBufferList) {
			byteBufferList.add(payloadData);
		}
	}

	/**
	 * Check the current size of the buffer and throw an exception if the size is bigger than the max allowed frame size
	 * @throws LimitExceededException if the current size is bigger than the allowed size
	 */
	private void checkBufferLimit() throws LimitExceededException {
		long totalSize = getByteBufferListSize();
		if( totalSize > maxFrameSize ) {
			clearBufferList();
			log.trace("Payload limit reached. Allowed: {} Current: {}", maxFrameSize, totalSize);
			throw new LimitExceededException(maxFrameSize);
		}
	}

	@Override
	public CloseHandshakeType getCloseHandshakeType() {
		return CloseHandshakeType.TWOWAY;
	}

	@Override
	public String toString() {
		String result = super.toString();
		if( getExtension() != null )
			result += " extension: " + getExtension().toString();
		if ( getProtocol() != null )
			result += " protocol: " + getProtocol().toString();
		result += " max frame size: " + this.maxFrameSize;
		return result;
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) return true;
		if (o == null || getClass() != o.getClass()) return false;

		Draft_6455 that = (Draft_6455) o;

		if (maxFrameSize != that.getMaxFrameSize()) return false;
		if (extension != null ? !extension.equals(that.getExtension()) : that.getExtension() != null) return false;
		return protocol != null ? protocol.equals(that.getProtocol()) : that.getProtocol() == null;
	}

	@Override
	public int hashCode() {
		int result = extension != null ? extension.hashCode() : 0;
		result = 31 * result + (protocol != null ? protocol.hashCode() : 0);
		result = 31 * result + (maxFrameSize ^ (maxFrameSize >>> 32));
		return result;
	}

	/**
	 * Method to generate a full bytebuffer out of all the fragmented frame payload
	 * @return a bytebuffer containing all the data
	 * @throws LimitExceededException will be thrown when the totalSize is bigger then Integer.MAX_VALUE due to not being able to allocate more
	 */
	private ByteBuffer getPayloadFromByteBufferList() throws LimitExceededException {
		long totalSize = 0;
		ByteBuffer resultingByteBuffer;
		synchronized (byteBufferList) {
			for (ByteBuffer buffer : byteBufferList) {
				totalSize += buffer.limit();
			}
			checkBufferLimit();
			resultingByteBuffer = ByteBuffer.allocate( (int) totalSize );
			for (ByteBuffer buffer : byteBufferList) {
				resultingByteBuffer.put( buffer );
			}
		}
		resultingByteBuffer.flip();
		return resultingByteBuffer;
	}

	/**
	 * Get the current size of the resulting bytebuffer in the bytebuffer list
	 * @return the size as long (to not get an integer overflow)
	 */
	private long getByteBufferListSize() {
		long totalSize = 0;
		synchronized (byteBufferList) {
			for (ByteBuffer buffer : byteBufferList) {
				totalSize += buffer.limit();
			}
		}
		return totalSize;
	}

	private class TranslatedPayloadMetaData {
		private int payloadLength;
		private int realPackageSize;

		private int getPayloadLength() {
			return payloadLength;
		}

		private int getRealPackageSize() {
			return realPackageSize;
		}

		TranslatedPayloadMetaData(int newPayloadLength, int newRealPackageSize) {
			this.payloadLength = newPayloadLength;
			this.realPackageSize = newRealPackageSize;
		}
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
  • 389
  • 390
  • 391
  • 392
  • 393
  • 394
  • 395
  • 396
  • 397
  • 398
  • 399
  • 400
  • 401
  • 402
  • 403
  • 404
  • 405
  • 406
  • 407
  • 408
  • 409
  • 410
  • 411
  • 412
  • 413
  • 414
  • 415
  • 416
  • 417
  • 418
  • 419
  • 420
  • 421
  • 422
  • 423
  • 424
  • 425
  • 426
  • 427
  • 428
  • 429
  • 430
  • 431
  • 432
  • 433
  • 434
  • 435
  • 436
  • 437
  • 438
  • 439
  • 440
  • 441
  • 442
  • 443
  • 444
  • 445
  • 446
  • 447
  • 448
  • 449
  • 450
  • 451
  • 452
  • 453
  • 454
  • 455
  • 456
  • 457
  • 458
  • 459
  • 460
  • 461
  • 462
  • 463
  • 464
  • 465
  • 466
  • 467
  • 468
  • 469
  • 470
  • 471
  • 472
  • 473
  • 474
  • 475
  • 476
  • 477
  • 478
  • 479
  • 480
  • 481
  • 482
  • 483
  • 484
  • 485
  • 486
  • 487
  • 488
  • 489
  • 490
  • 491
  • 492
  • 493
  • 494
  • 495
  • 496
  • 497
  • 498
  • 499
  • 500
  • 501
  • 502
  • 503
  • 504
  • 505
  • 506
  • 507
  • 508
  • 509
  • 510
  • 511
  • 512
  • 513
  • 514
  • 515
  • 516
  • 517
  • 518
  • 519
  • 520
  • 521
  • 522
  • 523
  • 524
  • 525
  • 526
  • 527
  • 528
  • 529
  • 530
  • 531
  • 532
  • 533
  • 534
  • 535
  • 536
  • 537
  • 538
  • 539
  • 540
  • 541
  • 542
  • 543
  • 544
  • 545
  • 546
  • 547
  • 548
  • 549
  • 550
  • 551
  • 552
  • 553
  • 554
  • 555
  • 556
  • 557
  • 558
  • 559
  • 560
  • 561
  • 562
  • 563
  • 564
  • 565
  • 566
  • 567
  • 568
  • 569
  • 570
  • 571
  • 572
  • 573
  • 574
  • 575
  • 576
  • 577
  • 578
  • 579
  • 580
  • 581
  • 582
  • 583
  • 584
  • 585
  • 586
  • 587
  • 588
  • 589
  • 590
  • 591
  • 592
  • 593
  • 594
  • 595
  • 596
  • 597
  • 598
  • 599
  • 600
  • 601
  • 602
  • 603
  • 604
  • 605
  • 606
  • 607
  • 608
  • 609
  • 610
  • 611
  • 612
  • 613
  • 614
  • 615
  • 616
  • 617
  • 618
  • 619
  • 620
  • 621
  • 622
  • 623
  • 624
  • 625
  • 626
  • 627
  • 628
  • 629
  • 630
  • 631
  • 632
  • 633
  • 634
  • 635
  • 636
  • 637
  • 638
  • 639
  • 640
  • 641
  • 642
  • 643
  • 644
  • 645
  • 646
  • 647
  • 648
  • 649
  • 650
  • 651
  • 652
  • 653
  • 654
  • 655
  • 656
  • 657
  • 658
  • 659
  • 660
  • 661
  • 662
  • 663
  • 664
  • 665
  • 666
  • 667
  • 668
  • 669
  • 670
  • 671
  • 672
  • 673
  • 674
  • 675
  • 676
  • 677
  • 678
  • 679
  • 680
  • 681
  • 682
  • 683
  • 684
  • 685
  • 686
  • 687
  • 688
  • 689
  • 690
  • 691
  • 692
  • 693
  • 694
  • 695
  • 696
  • 697
  • 698
  • 699
  • 700
  • 701
  • 702
  • 703
  • 704
  • 705
  • 706
  • 707
  • 708
  • 709
  • 710
  • 711
  • 712
  • 713
  • 714
  • 715
  • 716
  • 717
  • 718
  • 719
  • 720
  • 721
  • 722
  • 723
  • 724
  • 725
  • 726
  • 727
  • 728
  • 729
  • 730
  • 731
  • 732
  • 733
  • 734
  • 735
  • 736
  • 737
  • 738
  • 739
  • 740
  • 741
  • 742
  • 743
  • 744
  • 745
  • 746
  • 747
  • 748
  • 749
  • 750
  • 751
  • 752
  • 753
  • 754
  • 755
  • 756
  • 757
  • 758
  • 759
  • 760
  • 761
  • 762
  • 763
  • 764
  • 765
  • 766
  • 767
  • 768
  • 769
  • 770
  • 771
  • 772
  • 773
  • 774
  • 775
  • 776
  • 777
  • 778
  • 779
  • 780
  • 781
  • 782
  • 783
  • 784
  • 785
  • 786
  • 787
  • 788
  • 789
  • 790
  • 791
  • 792
  • 793
  • 794
  • 795
  • 796
  • 797
  • 798
  • 799
  • 800
  • 801
  • 802
  • 803
  • 804
  • 805
  • 806
  • 807
  • 808
  • 809
  • 810
  • 811
  • 812
  • 813
  • 814
  • 815
  • 816
  • 817
  • 818
  • 819
  • 820
  • 821
  • 822
  • 823
  • 824
  • 825
  • 826
  • 827
  • 828
  • 829
  • 830
  • 831
  • 832
  • 833
  • 834
  • 835
  • 836
  • 837
  • 838
  • 839
  • 840
  • 841
  • 842
  • 843
  • 844
  • 845
  • 846
  • 847
  • 848
  • 849
  • 850
  • 851
  • 852
  • 853
  • 854
  • 855
  • 856
  • 857
  • 858
  • 859
  • 860
  • 861
  • 862
  • 863
  • 864
  • 865
  • 866
  • 867
  • 868
  • 869
  • 870
  • 871
  • 872
  • 873
  • 874
  • 875
  • 876
  • 877
  • 878
  • 879
  • 880
  • 881
  • 882
  • 883
  • 884
  • 885
  • 886
  • 887
  • 888
  • 889
  • 890
  • 891
  • 892
  • 893
  • 894
  • 895
  • 896
  • 897
  • 898
  • 899
  • 900
  • 901
  • 902
  • 903
  • 904
  • 905
  • 906
  • 907
  • 908
  • 909
  • 910
  • 911
  • 912
  • 913
  • 914
  • 915
  • 916
  • 917
  • 918
  • 919
  • 920
  • 921
  • 922
  • 923
  • 924
  • 925
  • 926
  • 927
  • 928
  • 929
  • 930
  • 931
  • 932
  • 933
  • 934
  • 935
  • 936
  • 937
  • 938
  • 939
  • 940
  • 941
  • 942
  • 943
  • 944
  • 945
  • 946
  • 947
  • 948
  • 949
  • 950
  • 951
  • 952
  • 953
  • 954
  • 955
  • 956
  • 957
  • 958
  • 959
  • 960
  • 961
  • 962
  • 963
  • 964
  • 965
  • 966
  • 967
  • 968
  • 969
  • 970
  • 971
  • 972
  • 973
  • 974
  • 975
  • 976
  • 977
  • 978
  • 979
  • 980
  • 981
  • 982
  • 983
  • 984
  • 985
  • 986
  • 987
  • 988
  • 989
  • 990
  • 991
  • 992
  • 993
  • 994
  • 995
  • 996
  • 997
  • 998
  • 999
  • 1000
  • 1001
  • 1002
  • 1003
  • 1004
  • 1005
  • 1006
  • 1007
  • 1008
  • 1009
  • 1010
  • 1011
  • 1012
  • 1013
  • 1014
  • 1015
  • 1016
  • 1017
  • 1018
  • 1019
  • 1020
  • 1021
  • 1022
  • 1023
  • 1024
  • 1025
  • 1026
  • 1027
  • 1028
  • 1029
  • 1030
  • 1031
  • 1032
  • 1033
  • 1034
  • 1035
  • 1036
  • 1037
  • 1038
  • 1039
  • 1040
  • 1041
  • 1042
  • 1043
  • 1044
  • 1045
  • 1046
  • 1047
  • 1048
  • 1049
  • 1050
  • 1051
  • 1052
  • 1053
  • 1054
  • 1055
  • 1056
  • 1057
  • 1058
  • 1059
  • 1060
  • 1061
  • 1062
  • 1063
  • 1064
  • 1065
  • 1066
  • 1067
  • 1068
  • 1069
  • 1070
  • 1071
  • 1072
  • 1073
  • 1074
  • 1075
  • 1076
  • 1077
  • 1078
  • 1079
  • 1080
  • 1081
  • 1082
  • 1083
  • 1084
  • 1085
  • 1086
  • 1087
  • 1088
  • 1089
MyFlink.scala
package cn.sven.bcvs

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import java.io.FileInputStream
import java.util
import java.util.Properties
import scala.collection.convert.ImplicitConversions.`list asScalaBuffer`


object MyFlink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties
    val in = new FileInputStream("D:\\mywork\\bigdata\\src\\main\\resources\\kafka.properties")
    props.load(in)

    import org.apache.flink.api.scala._
    //数据源配置,是一个kafka消息的消费者
    val consumer = new FlinkKafkaConsumer("yyf", new SimpleStringSchema(), props)
    val ds = env.addSource(consumer)

    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val count = ds.map(line => {
      val arr = line.split("\001")
		//
      Message(arr(0), arr(1), arr(2), arr(3).toLong)
    })
      //*1000转换成毫秒值,前面给的时间戳是秒级。
      // ps:assignAscendingTimestamps这个方法一般不在生产环境下使用。
      .assignAscendingTimestamps(_.timestamp*1000L)
      .keyBy(_.uname)
      //每隔十分钟输出最近一小时内数据
      .timeWindow(Time.hours(1), Time.minutes(10))
      //先做用户发言的wordcount
      .aggregate(new CountMsg,new WindowMsg)
      .keyBy(_.windowEnd)
      //再进行topN计算,此处用的KeyedProcessFunction,也可使用ProcessAllWindowFunction去实现。
      .process(new KeyedProcessFunction[Long, MsgCount, String] {
        //设置状态
        private var itemState: ListState[MsgCount] = _
        //给状态赋值
        override def open(parameters: Configuration): Unit = {
          itemState = getRuntimeContext.getListState(new ListStateDescriptor[MsgCount]("item-state",
            classOf[MsgCount]))
        }

        
        override def processElement(value: MsgCount, ctx: KeyedProcessFunction[Long, MsgCount, String]#Context, out: Collector[String]): Unit = {
          //把每条数据存入状态列表
          itemState.add(value)
          //注册一个定时器  +1 是延迟时间
          ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)

        }
        
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, MsgCount, String]#OnTimerContext, out: Collector[String]): Unit = {
          //将所有state中的数据取出,放入到一个list中
          val allItems: util.ArrayList[MsgCount] = new util.ArrayList[MsgCount]()
          val iterator = itemState.get().iterator()
          //遍历迭代器
          while (iterator.hasNext) {
            allItems.add(iterator.next())
          }
          //按照Count大小排序并取前N个
          val sortedItems = allItems.sortBy(_.ucount)(Ordering.Int.reverse).take(10)
          itemState.clear()
          //简单输出一下
          out.collect(sortedItems.toString())
        }
      })
    .print()

    env.execute("danmuTopN")

  }


  /**
   * <IN> – The type of the values that are aggregated (input values) 输入类型:Message样例类
   * <ACC> – The type of the accumulator (intermediate aggregate state). 聚合类型:Int
   * <OUT> – The type of the aggregated result 输出类型:Int
   */
  class CountMsg() extends AggregateFunction[Message, Int, Int] {
    override def createAccumulator(): Int = 0

    override def add(value: Message, accumulator: Int): Int = accumulator + 1

    override def getResult(accumulator: Int): Int = accumulator

    override def merge(a: Int, b: Int): Int = a + b
  }


  /**
   * 类型形参:
      IN – The type of the input value. 输入类型:要聚合的参数,这里是弹幕数量
      OUT – The type of the output value. 输出类型:以样例类输出,MsgCount
      KEY – The type of the key. keyby的key类型:uname:String
   */
  class WindowMsg() extends WindowFunction[Int,MsgCount,String,TimeWindow] {
    override def apply(key: String, window: TimeWindow, input: Iterable[Int], out: Collector[MsgCount]): Unit = {
      out.collect(MsgCount(key,input.iterator.next(),window.getEnd))
    }
  }

  /**
   *
   * @param uname 用户名
   * @param uowner 牌子
   * @param umsg 弹幕
   * @param timestamp 时间戳
   */
  case class Message(uname: String, uowner: String, umsg: String, timestamp: Long)

  /**
   *
   * @param uname 用户名
   * @param ucount 弹幕数
   * @param windowEnd 结束时间戳
   */
  case class MsgCount(uname: String, ucount: Int, windowEnd: Long)


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138

运行结果:

ArrayBuffer(MsgCount(续2106856,15,1667817000000), MsgCount(改名字能中奖吗rua,11,1667817000000), MsgCount(我是究极大穷逼,8,1667817000000), MsgCount(ForeverYeon,7,1667817000000), MsgCount(猪脚炖花生米,7,1667817000000), MsgCount(醉卧丨美人兮,6,1667817000000), MsgCount(幽古刹千年钟痴人说梦,5,1667817000000), MsgCount(愿大家平安喜乐丶,5,1667817000000), MsgCount(浮生一丝暖暖Aria,5,1667817000000), MsgCount(羽川翼酱,4,1667817000000))
ArrayBuffer(MsgCount(分狂的石头,15,1667817600000), MsgCount(续2106856,15,1667817600000), MsgCount(幽古刹千年钟痴人说梦,13,1667817600000), MsgCount(改名字能中奖吗rua,12,1667817600000), MsgCount(愿大家平安喜乐丶,11,1667817600000), MsgCount(菩提呆,10,1667817600000), MsgCount(雪花凋零之时,10,1667817600000), MsgCount(我是究极大穷逼,9,1667817600000), MsgCount(教授与喷子齐飞,9,1667817600000), MsgCount(团团转圈,9,1667817600000))
ArrayBuffer(MsgCount(分狂的石头,25,1667818200000), MsgCount(幽古刹千年钟痴人说梦,18,1667818200000), MsgCount(我很怂但我不说,16,1667818200000), MsgCount(愿大家平安喜乐丶,15,1667818200000), MsgCount(续2106856,15,1667818200000), MsgCount(斋森美世,15,1667818200000), MsgCount(菩提呆,13,1667818200000), MsgCount(GoldIsSilence,13,1667818200000), MsgCount(z急了急了急了急了,13,1667818200000), MsgCount(改名字能中奖吗rua,12,1667818200000))
ArrayBuffer(MsgCount(分狂的石头,29,1667818800000), MsgCount(nokalo,22,1667818800000), MsgCount(法国奶爸,21,1667818800000), MsgCount(愿大家平安喜乐丶,19,1667818800000), MsgCount(幽古刹千年钟痴人说梦,18,1667818800000), MsgCount(会打篮球的酷酷的萌萌,18,1667818800000), MsgCount(GoldIsSilence,17,1667818800000), MsgCount(斋森美世,17,1667818800000), MsgCount(飘来荡去犀利强,16,1667818800000), MsgCount(疯青蛙,16,1667818800000))
ArrayBuffer(MsgCount(分狂的石头,35,1667819400000), MsgCount(nokalo,25,1667819400000), MsgCount(法国奶爸,23,1667819400000), MsgCount(愿大家平安喜乐丶,23,1667819400000), MsgCount(斋森美世,21,1667819400000), MsgCount(飘来荡去犀利强,20,1667819400000), MsgCount(丶残忍的慈悲丶,19,1667819400000), MsgCount(暴走的小苹果,19,1667819400000), MsgCount(GoldIsSilence,18,1667819400000), MsgCount(幽古刹千年钟痴人说梦,18,1667819400000))
ArrayBuffer(MsgCount(分狂的石头,45,1667820000000), MsgCount(斋森美世,33,1667820000000), MsgCount(法国奶爸,28,1667820000000), MsgCount(nokalo,28,1667820000000), MsgCount(永恒之光xx,26,1667820000000), MsgCount(愿大家平安喜乐丶,25,1667820000000), MsgCount(歌无峎Like,25,1667820000000), MsgCount(飘来荡去犀利强,24,1667820000000), MsgCount(疯青蛙,24,1667820000000), MsgCount(浮生一丝暖暖Aria,24,1667820000000))
ArrayBuffer(MsgCount(分狂的石头,49,1667820600000), MsgCount(永恒之光xx,35,1667820600000), MsgCount(斋森美世,35,1667820600000), MsgCount(飘来荡去犀利强,30,1667820600000), MsgCount(黄浦江底走一遭,30,1667820600000), MsgCount(法国奶爸,29,1667820600000), MsgCount(歌无峎Like,29,1667820600000), MsgCount(丶残忍的慈悲丶,26,1667820600000), MsgCount(CASIOfx991CN,26,1667820600000), MsgCount(nokalo,25,1667820600000))
ArrayBuffer(MsgCount(永恒之光xx,41,1667821200000), MsgCount(分狂的石头,38,1667821200000), MsgCount(斋森美世,36,1667821200000), MsgCount(飘来荡去犀利强,33,1667821200000), MsgCount(黄浦江底走一遭,30,1667821200000), MsgCount(法国奶爸,29,1667821200000), MsgCount(nokalo,29,1667821200000), MsgCount(CASIOfx991CN,29,1667821200000), MsgCount(008号,27,1667821200000), MsgCount(丶残忍的慈悲丶,26,1667821200000))
ArrayBuffer(MsgCount(永恒之光xx,46,1667821800000), MsgCount(分狂的石头,37,1667821800000), MsgCount(008号,36,1667821800000), MsgCount(毒蘑菇奇诺比奥,33,1667821800000), MsgCount(飘来荡去犀利强,29,1667821800000), MsgCount(黄浦江底走一遭,28,1667821800000), MsgCount(玖窝小神,28,1667821800000), MsgCount(斋森美世,27,1667821800000), MsgCount(小灬撸神,25,1667821800000), MsgCount(nokalo,24,1667821800000))
ArrayBuffer(MsgCount(永恒之光xx,48,1667822400000), MsgCount(008号,39,1667822400000), MsgCount(分狂的石头,36,1667822400000), MsgCount(毒蘑菇奇诺比奥,32,1667822400000), MsgCount(玖窝小神,29,1667822400000), MsgCount(westzizi,29,1667822400000), MsgCount(拾巟,27,1667822400000), MsgCount(ZEUSDJOE,26,1667822400000), MsgCount(飘来荡去犀利强,25,1667822400000), MsgCount(斋森美世,25,1667822400000))
ArrayBuffer(MsgCount(永恒之光xx,43,1667823000000), MsgCount(008号,37,1667823000000), MsgCount(分狂的石头,36,1667823000000), MsgCount(westzizi,33,1667823000000), MsgCount(陈彦川,31,1667823000000), MsgCount(看直播的剑鱼,25,1667823000000), MsgCount(拾巟,25,1667823000000), MsgCount(歌无峎Like,24,1667823000000), MsgCount(毒蘑菇奇诺比奥,24,1667823000000), MsgCount(浮生一丝暖暖Aria,24,1667823000000))
bArrayBuffer(MsgCount(永恒之光xx,39,1667823600000), MsgCount(陈彦川,34,1667823600000), MsgCount(westzizi,31,1667823600000), MsgCount(分狂的石头,30,1667823600000), MsgCount(008号,29,1667823600000), MsgCount(Oeng晴芯,27,1667823600000), MsgCount(看直播的剑鱼,25,1667823600000), MsgCount(林羽青,25,1667823600000), MsgCount(愿大家平安喜乐丶,24,1667823600000), MsgCount(沙漠中的胖头鱼,23,1667823600000))
ArrayBuffer(MsgCount(永恒之光xx,40,1667824200000), MsgCount(陈彦川,34,1667824200000), MsgCount(westzizi,32,1667824200000), MsgCount(分狂的石头,29,1667824200000), MsgCount(沙漠中的胖头鱼,27,1667824200000), MsgCount(Oeng晴芯,27,1667824200000), MsgCount(寅子蛋糕,27,1667824200000), MsgCount(林羽青,27,1667824200000), MsgCount(毒蘑菇奇诺比奥,26,1667824200000), MsgCount(佩达拉,26,1667824200000))
ArrayBuffer(MsgCount(TLoop,51,1667824800000), MsgCount(永恒之光xx,44,1667824800000), MsgCount(寅子蛋糕,38,1667824800000), MsgCount(偷心小贼魔理沙,37,1667824800000), MsgCount(浮生一丝暖暖Aria,36,1667824800000), MsgCount(陈彦川,34,1667824800000), MsgCount(CLian,31,1667824800000), MsgCount(zk1015,30,1667824800000), MsgCount(分狂的石头,28,1667824800000), MsgCount(晨霜攀黛瓦啊,28,1667824800000))
ArrayBuffer(MsgCount(TLoop,52,1667825400000), MsgCount(寅子蛋糕,46,1667825400000), MsgCount(永恒之光xx,43,1667825400000), MsgCount(偷心小贼魔理沙,42,1667825400000), MsgCount(浮生一丝暖暖Aria,42,1667825400000), MsgCount(CLian,40,1667825400000), MsgCount(陈彦川,34,1667825400000), MsgCount(我是究极大穷逼,33,1667825400000), MsgCount(zk1015,32,1667825400000), MsgCount(晨霜攀黛瓦啊,30,1667825400000))
ArrayBuffer(MsgCount(TLoop,63,1667826000000), MsgCount(偷心小贼魔理沙,45,1667826000000), MsgCount(CLian,45,1667826000000), MsgCount(寅子蛋糕,44,1667826000000), MsgCount(浮生一丝暖暖Aria,43,1667826000000), MsgCount(永恒之光xx,41,1667826000000), MsgCount(我是究极大穷逼,35,1667826000000), MsgCount(FangPige,32,1667826000000), MsgCount(zk1015,31,1667826000000), MsgCount(沙漠中的胖头鱼,28,1667826000000))
ArrayBuffer(MsgCount(TLoop,63,1667826600000), MsgCount(CLian,50,1667826600000), MsgCount(偷心小贼魔理沙,40,1667826600000), MsgCount(永恒之光xx,36,1667826600000), MsgCount(寅子蛋糕,36,1667826600000), MsgCount(FangPige,36,1667826600000), MsgCount(沙漠中的胖头鱼,34,1667826600000), MsgCount(浮生一丝暖暖Aria,32,1667826600000), MsgCount(zk1015,31,1667826600000), MsgCount(我是究极大穷逼,29,1667826600000))
ArrayBuffer(MsgCount(TLoop,63,1667827200000), MsgCount(CLian,53,1667827200000), MsgCount(FangPige,42,1667827200000), MsgCount(熊瞎子老岑,41,1667827200000), MsgCount(沙漠中的胖头鱼,39,1667827200000), MsgCount(偷心小贼魔理沙,37,1667827200000), MsgCount(永恒之光xx,32,1667827200000), MsgCount(寅子蛋糕,30,1667827200000), MsgCount(zk1015,26,1667827200000), MsgCount(sai1024,26,1667827200000))
ArrayBuffer(MsgCount(FangPige,58,1667827800000), MsgCount(TLoop,55,1667827800000), MsgCount(CLian,53,1667827800000), MsgCount(每次取名字都好纠结啊,37,1667827800000), MsgCount(熊瞎子老岑,37,1667827800000), MsgCount(沙漠中的胖头鱼,36,1667827800000), MsgCount(sai1024,34,1667827800000), MsgCount(偷心小贼魔理沙,31,1667827800000), MsgCount(永恒之光xx,25,1667827800000), MsgCount(黄浦江底走一遭,25,1667827800000))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号