当前位置:   article > 正文

手推FlinkML2.2(二)_flink ml

flink ml

FeatureHasher(特征哈希器)是一种用于处理高维分类特征(如文本或类别特征)的方法。它将特征转换为固定长度的数值向量,以便在机器学习模型中使用。特征哈希通过哈希函数将特征映射到较低维度的向量空间,可以有效地处理大规模和高维数据集。

特征哈希的主要优点是内存效率和计算速度。由于它不需要存储词汇表或其他映射信息,因此可以处理大规模数据集。同时,特征哈希在转换过程中具有较高的计算速度。

特征哈希的主要缺点是信息损失。由于哈希函数可能将不同的特征映射到相同的索引,因此会出现哈希冲突。这种冲突可能导致特征信息的损失,从而影响模型性能。通过增加哈希向量的维度,可以降低哈希冲突的概率,但这会增加存储和计算成本。

特征哈希广泛应用于文本分类、推荐系统和在线学习等领域。例如,在处理文本数据时,特征哈希可以将单词或短语转换为固定长度的向量,而无需构建词袋模型(Bag-of-Words model)或使用词嵌入(Word Embeddings)。尽管特征哈希在某些情况下可能不如其他方法准确,但其内存和计算效率使其在处理大规模数据集时非常有用。

特征哈希器 #

FeatureHasher 将一组分类或数字特征转换为指定维度的稀疏向量。哈希分类列和数字列的规则如下:

对于数值列,该特征在输出向量中的索引为列名的哈希值,其对应值与输入相同。

对于分类列,该特征在输出向量中的索引是字符串“column_name=value”的哈希值,对应的值为1.0。

如果将多个特征投影到同一列中,则累加输出值。有关散列技巧,请参阅 https://en.wikipedia.org/wiki/Feature_hashing 了解详细信息。

Input Columns #

编辑

添加图片注释,不超过 140 字(可选)

Output Columns #

编辑

添加图片注释,不超过 140 字(可选)

Parameters #

编辑

添加图片注释,不超过 140 字(可选)

添加图片注释,不超过 140 字(可选)

Java

import org.apache.flink.ml.feature.featurehasher.FeatureHasher;

import org.apache.flink.ml.linalg.Vector;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

import org.apache.flink.util.CloseableIterator;

import java.util.Arrays;

/** Simple program that creates a FeatureHasher instance and uses it for feature engineering. */

public class FeatureHasherExample {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Generates input data.

DataStream<Row> dataStream =

env.fromCollection(

Arrays.asList(Row.of(0, "a", 1.0, true), Row.of(1, "c", 1.0, false)));

Table inputDataTable = tEnv.fromDataStream(dataStream).as("id", "f0", "f1", "f2");

// Creates a FeatureHasher object and initializes its parameters.

FeatureHasher featureHash =

new FeatureHasher()

.setInputCols("f0", "f1", "f2")

.setCategoricalCols("f0", "f2")

.setOutputCol("vec")

.setNumFeatures(1000);

// Uses the FeatureHasher object for feature transformations.

Table outputTable = featureHash.transform(inputDataTable)[0];

// Extracts and displays the results.

for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {

Row row = it.next();

Object[] inputValues = new Object[featureHash.getInputCols().length];

for (int i = 0; i < inputValues.length; i++) {

inputValues[i] = row.getField(featureHash.getInputCols()[i]);

}

Vector outputValue = (Vector) row.getField(featureHash.getOutputCol());

System.out.printf(

"Input Values: %s \tOutput Value: %s\n",

Arrays.toString(inputValues), outputValue);

}

}

}

Python

# Simple program that creates a FeatureHasher instance and uses it for feature

# engineering.

from pyflink.common import Types

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.ml.feature.featurehasher import FeatureHasher

from pyflink.table import StreamTableEnvironment

# create a new StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment

t_env = StreamTableEnvironment.create(env)

# generate input data

input_data_table = t_env.from_data_stream(

env.from_collection([

(0, 'a', 1.0, True),

(1, 'c', 1.0, False),

],

type_info=Types.ROW_NAMED(

['id', 'f0', 'f1', 'f2'],

[Types.INT(), Types.STRING(), Types.DOUBLE(), Types.BOOLEAN()])))

# create a feature hasher object and initialize its parameters

feature_hasher = FeatureHasher() \

.set_input_cols('f0', 'f1', 'f2') \

.set_categorical_cols('f0', 'f2') \

.set_output_col('vec') \

.set_num_features(1000)

# use the feature hasher for feature engineering

output = feature_hasher.transform(input_data_table)[0]

# extract and display the results

field_names = output.get_schema().get_field_names()

input_values = [None for _ in feature_hasher.get_input_cols()]

for result in t_env.to_data_stream(output).execute_and_collect():

for i in range(len(feature_hasher.get_input_cols())):

input_values[i] = result[field_names.index(feature_hasher.get_input_cols()[i])]

output_value = result[field_names.index(feature_hasher.get_output_col())]

print('Input Values: ' + str(input_values) + '\tOutput Value: ' + str(output_value))

HashingTF(哈希术语频率,Hashing Term Frequency)是一种处理文本数据的方法,将文本转换为固定长度的数值向量,以便在机器学习模型中使用。它利用哈希技巧(hashing trick)将文本中的单词映射到一个较低维度的向量空间,从而避免了维持一个完整词汇表的内存开销。HashingTF通常用于创建词袋模型(Bag-of-Words model)或TF-IDF表示,计算文本中各个单词的出现频率。

HashingTF的工作流程如下:

分词(Tokenization):将文本分解为单词(tokens)或其他基本文本单位。

哈希函数:对每个单词应用哈希函数,将其映射到一个固定范围内的整数。这个范围由用户定义的特征向量维度决定。

词频计数:计算每个哈希桶中单词的出现次数,将其作为特征向量的值。

HashingTF的主要优点是内存效率和计算速度。由于不需要存储词汇表,HashingTF可以有效地处理大规模文本数据集。此外,哈希函数的计算速度通常较快,使得HashingTF在处理实时或在线数据时具有优势。

然而,HashingTF的一个缺点是哈希冲突。由于哈希函数可能将不同的单词映射到相同的索引,因此会出现哈希冲突。这种冲突可能导致文本信息的损失,从而影响模型性能。通过增加特征向量的维度,可以降低哈希冲突的概率,但这会增加存储和计算成本。

尽管如此,在处理大规模数据集时,HashingTF仍然是一种有效且高效的文本表示方法。它广泛应用于文本分类、聚类、情感分析等自然语言处理任务中。

哈希TF #

HashingTF 使用散列技巧将一系列术语(字符串、数字、布尔值)映射到具有指定维度的稀疏向量。如果将多个特征投影到同一列中,默认情况下会累加输出值。

Input Columns #

编辑

添加图片注释,不超过 140 字(可选)

Output Columns #

编辑

添加图片注释,不超过 140 字(可选)

Parameters #

编辑切换为居中

添加图片注释,不超过 140 字(可选)

编辑

添加图片注释,不超过 140 字(可选)

Java

import org.apache.flink.ml.feature.hashingtf.HashingTF;

import org.apache.flink.ml.linalg.SparseVector;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

import org.apache.flink.util.CloseableIterator;

import java.util.Arrays;

import java.util.List;

/** Simple program that creates a HashingTF instance and uses it for feature engineering. */

public class HashingTFExample {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Generates input data.

DataStream<Row> inputStream =

env.fromElements(

Row.of(

Arrays.asList(

"HashingTFTest", "Hashing", "Term", "Frequency", "Test")),

Row.of(

Arrays.asList(

"HashingTFTest", "Hashing", "Hashing", "Test", "Test")));

Table inputTable = tEnv.fromDataStream(inputStream).as("input");

// Creates a HashingTF object and initializes its parameters.

HashingTF hashingTF =

new HashingTF().setInputCol("input").setOutputCol("output").setNumFeatures(128);

// Uses the HashingTF object for feature transformations.

Table outputTable = hashingTF.transform(inputTable)[0];

// Extracts and displays the results.

for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {

Row row = it.next();

List<Object> inputValue = (List<Object>) row.getField(hashingTF.getInputCol());

SparseVector outputValue = (SparseVector) row.getField(hashingTF.getOutputCol());

System.out.printf(

"Input Value: %s \tOutput Value: %s\n",

Arrays.toString(inputValue.stream().toArray()), outputValue);

}

}

}

Python

# Simple program that creates a HashingTF instance and uses it for feature

# engineering.

from pyflink.common import Types

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.ml.feature.hashingtf import HashingTF

from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

t_env = StreamTableEnvironment.create(env)

# Generates input data.

input_data_table = t_env.from_data_stream(

env.from_collection([

(['HashingTFTest', 'Hashing', 'Term', 'Frequency', 'Test'],),

(['HashingTFTest', 'Hashing', 'Hashing', 'Test', 'Test'],),

],

type_info=Types.ROW_NAMED(

["input", ],

[Types.OBJECT_ARRAY(Types.STRING())])))

# Creates a HashingTF object and initializes its parameters.

hashing_tf = HashingTF() \

.set_input_col('input') \

.set_num_features(128) \

.set_output_col('output')

# Uses the HashingTF object for feature transformations.

output = hashing_tf.transform(input_data_table)[0]

# Extracts and displays the results.

field_names = output.get_schema().get_field_names()

for result in t_env.to_data_stream(output).execute_and_collect():

input_value = result[field_names.index(hashing_tf.get_input_col())]

output_value = result[field_names.index(hashing_tf.get_output_col())]

print('Input Value: ' + ' '.join(input_value) + '\tOutput Value: ' + str(output_value))

IDF(逆文档频率,Inverse Document Frequency)是一种在自然语言处理和信息检索领域中常用的统计度量,用于衡量一个词语在文档集合中的重要性。IDF与TF(术语频率,Term Frequency)共同构成了TF-IDF方法,是一种广泛应用于文本表示的加权技术。

IDF的基本思想是:如果一个词在许多文档中出现,那么它对于区分文档的能力较低。相反,如果一个词仅在少数文档中出现,那么它可能具有较高的区分能力。因此,IDF的目的是降低常见词的权重,提高稀有词的权重。

IDF的计算公式为:

IDF(t) = log(N / (1 + DF(t)))

其中:

t:一个词语(term)。

N:文档集合中文档的总数。

DF(t):包含词语t的文档数量。

log:自然对数。

在TF-IDF方法中,一个词语的权重由其在文档中的术语频率(TF)和在文档集合中的逆文档频率(IDF)相乘得到:

TF-IDF(t, d) = TF(t, d) * IDF(t)

这里,TF(t, d)表示词语t在文档d中的频率。通过计算每个词语的TF-IDF权重,可以将文档表示为一个向量,用于在机器学习模型中进行文本分类、聚类、相似性计算等任务。

需要注意的是,TF-IDF方法不能捕捉上下文信息和词语之间的关系,因此在处理复杂文本问题时可能不够准确。针对这类问题,可以使用词嵌入(Word Embeddings)等更先进的文本表示方法。

IDF 计算输入文档的逆文档频率 (IDF)。IDF 计算如下 idf = log((m + 1) / (d(t) + 1)),其中m是文档总数,d(t)是包含 的文档数t。

IDFModel 进一步使用计算出的逆文档频率来计算tf-idf。

Input Columns #

编辑

添加图片注释,不超过 140 字(可选)

Output Columns #

编辑

添加图片注释,不超过 140 字(可选)

Parameters #

编辑

添加图片注释,不超过 140 字(可选)

Java

import org.apache.flink.ml.feature.idf.IDF;

import org.apache.flink.ml.feature.idf.IDFModel;

import org.apache.flink.ml.linalg.DenseVector;

import org.apache.flink.ml.linalg.Vectors;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

import org.apache.flink.util.CloseableIterator;

/** Simple program that trains an IDF model and uses it for feature engineering. */

public class IDFExample {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Generates input data.

DataStream<Row> inputStream =

env.fromElements(

Row.of(Vectors.dense(0, 1, 0, 2)),

Row.of(Vectors.dense(0, 1, 2, 3)),

Row.of(Vectors.dense(0, 1, 0, 0)));

Table inputTable = tEnv.fromDataStream(inputStream).as("input");

// Creates an IDF object and initializes its parameters.

IDF idf = new IDF().setMinDocFreq(2);

// Trains the IDF Model.

IDFModel model = idf.fit(inputTable);

// Uses the IDF Model for predictions.

Table outputTable = model.transform(inputTable)[0];

// Extracts and displays the results.

for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {

Row row = it.next();

DenseVector inputValue = (DenseVector) row.getField(idf.getInputCol());

DenseVector outputValue = (DenseVector) row.getField(idf.getOutputCol());

System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);

}

}

}

Python

# Simple program that trains an IDF model and uses it for feature

# engineering.

from pyflink.common import Types

from pyflink.ml.linalg import Vectors, DenseVectorTypeInfo

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.ml.feature.idf import IDF

from pyflink.table import StreamTableEnvironment

# Creates a new StreamExecutionEnvironment.

env = StreamExecutionEnvironment.get_execution_environment()

# Creates a StreamTableEnvironment.

t_env = StreamTableEnvironment.create(env)

# Generates input for training and prediction.

input_table = t_env.from_data_stream(

env.from_collection([

(Vectors.dense(0, 1, 0, 2),),

(Vectors.dense(0, 1, 2, 3),),

(Vectors.dense(0, 1, 0, 0),),

],

type_info=Types.ROW_NAMED(

['input', ],

[DenseVectorTypeInfo(), ])))

# Creates an IDF object and initializes its parameters.

idf = IDF().set_min_doc_freq(2)

# Trains the IDF Model.

model = idf.fit(input_table)

# Uses the IDF Model for predictions.

output = model.transform(input_table)[0]

# Extracts and displays the results.

field_names = output.get_schema().get_field_names()

for result in t_env.to_data_stream(output).execute_and_collect():

input_index = field_names.index(idf.get_input_col())

output_index = field_names.index(idf.get_output_col())

print('Input Value: ' + str(result[input_index]) +

'\tOutput Value: ' + str(result[output_index]))

Imputer(插值器)是一种用于处理缺失数据的方法,通过插值或估算的方式填补数据集中的缺失值。在数据预处理阶段,处理缺失值是一项重要任务,因为许多机器学习算法无法直接处理含有缺失值的数据。

Imputer根据不同的策略来填补缺失值,常见的策略包括:

均值插补(Mean imputation):使用特征列的均值填补缺失值。这种方法适用于连续数值特征,但可能会导致数据分布偏斜。

中位数插补(Median imputation):使用特征列的中位数填补缺失值。这种方法适用于连续数值特征,对异常值和偏斜分布的数据更具鲁棒性。

众数插补(Mode imputation):使用特征列的众数填补缺失值。这种方法适用于分类特征。

常数插补(Constant imputation):使用一个常数值填补缺失值。这种方法可以用于分类或数值特征,但可能会导致数据分布偏斜。

K近邻插补(K-Nearest Neighbors imputation):使用与具有缺失值的样本最近的K个样本的平均值或众数填补缺失值。这种方法可以用于分类或数值特征,但计算成本较高。

除了上述策略,还可以使用更复杂的方法处理缺失值,如使用回归模型预测缺失值、基于矩阵分解的插补方法等。在处理缺失值时,选择合适的插值策略取决于数据类型、数据分布以及实际问题的需求。

用于完成输入列缺失值的输入。

可以使用缺失值所在的每一列的统计数据(mean/median/most均值、中值或最频繁值)来估算缺失值。输入列应该是数字类型。

注意mean//值是在过滤掉缺失值和空值后计算的,空值始终被median视为most frequent缺失,因此也被估算。

注意该参数relativeError仅在策略为 时有效median。

Input Columns #

编辑

添加图片注释,不超过 140 字(可选)

Output Columns #

编辑

添加图片注释,不超过 140 字(可选)

Parameters #

编辑切换为居中

添加图片注释,不超过 140 字(可选)

缺失值的占位符。所有出现的缺失值都将被估算。

编辑切换为居中

添加图片注释,不超过 140 字(可选)

策略与相对误差

编辑

添加图片注释,不超过 140 字(可选)

Java

import org.apache.flink.ml.feature.imputer.Imputer;

import org.apache.flink.ml.feature.imputer.ImputerModel;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

import org.apache.flink.util.CloseableIterator;

import java.util.Arrays;

/** Simple program that trains a {@link Imputer} model and uses it for feature engineering. */

public class ImputerExample {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Generates input training and prediction data.

DataStream<Row> trainStream =

env.fromElements(

Row.of(Double.NaN, 9.0),

Row.of(1.0, 9.0),

Row.of(1.5, 9.0),

Row.of(2.5, Double.NaN),

Row.of(5.0, 5.0),

Row.of(5.0, 4.0));

Table trainTable = tEnv.fromDataStream(trainStream).as("input1", "input2");

// Creates an Imputer object and initialize its parameters

Imputer imputer =

new Imputer()

.setInputCols("input1", "input2")

.setOutputCols("output1", "output2")

.setStrategy("mean")

.setMissingValue(Double.NaN);

// Trains the Imputer model.

ImputerModel model = imputer.fit(trainTable);

// Uses the Imputer model for predictions.

Table outputTable = model.transform(trainTable)[0];

// Extracts and displays the results.

for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {

Row row = it.next();

double[] inputValues = new double[imputer.getInputCols().length];

double[] outputValues = new double[imputer.getInputCols().length];

for (int i = 0; i < inputValues.length; i++) {

inputValues[i] = (double) row.getField(imputer.getInputCols()[i]);

outputValues[i] = (double) row.getField(imputer.getOutputCols()[i]);

}

System.out.printf(

"Input Values: %s\tOutput Values: %s\n",

Arrays.toString(inputValues), Arrays.toString(outputValues));

}

}

}

Python

# Simple program that creates an Imputer instance and uses it for feature

# engineering.

from pyflink.common import Types

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.ml.feature.imputer import Imputer

from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

# create a StreamTableEnvironment

t_env = StreamTableEnvironment.create(env)

# generate input training and prediction data

train_data = t_env.from_data_stream(

env.from_collection([

(float('NaN'), 9.0,),

(1.0, 9.0,),

(1.5, 7.0,),

(1.5, float('NaN'),),

(4.0, 5.0,),

(None, 4.0,),

],

type_info=Types.ROW_NAMED(

['input1', 'input2'],

[Types.DOUBLE(), Types.DOUBLE()])

))

# Creates an Imputer object and initializes its parameters.

imputer = Imputer()\

.set_input_cols('input1', 'input2')\

.set_output_cols('output1', 'output2')\

.set_strategy('mean')\

.set_missing_value(float('NaN'))

# Trains the Imputer Model.

model = imputer.fit(train_data)

# Uses the Imputer Model for predictions.

output = model.transform(train_data)[0]

# Extracts and displays the results.

field_names = output.get_schema().get_field_names()

for result in t_env.to_data_stream(output).execute_and_collect():

input_values = []

output_values = []

for i in range(len(imputer.get_input_cols())):

input_values.append(result[field_names.index(imputer.get_input_cols()[i])])

output_values.append(result[field_names.index(imputer.get_output_cols()[i])])

print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))

IndexToString(索引到字符串)是一种在数据预处理和后处理过程中使用的转换方法,用于将数值索引映射回原始的类别标签(通常是字符串)。这种转换通常在编码和解码类别特征或机器学习模型的预测结果时使用。

在许多机器学习任务中,特征和标签需要以数值形式表示。对于类别数据(如文本标签),可以使用StringIndexer(字符串到索引)等编码方法将类别标签转换为整数索引。然后,可以将这些数值输入到机器学习模型中进行训练和预测。

在模型预测完成后,可能需要将预测结果(数值索引)转换回原始的类别标签。这时可以使用IndexToString方法进行转换。这种转换依赖于在编码过程中创建的映射关系(即字符串

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/988779
推荐阅读
相关标签
  

闽ICP备14008679号