赞
踩
在开发中遇到了一张表的数据因为只做了同步插入而没有做同步更新的操作,导致了百万数据不准确。面对大量数据需要更新,传统的循环逐条插入以及拼接1000条数据插入都比较耗时,网上有博主做出了相关测试。
根据以上场景,新建控制台程序。config添加数据库连接配置,sqlHelper连接更新数据源,sqlBulkCopyHelper连接更新目标库。
1,注意三点:第一点是:
String sqlstring = @""; 即 String sqlstring = @"SELECT [order_no] ,[qty] ,[total_amount] FROM [dbo].[tb_bulk]";,用到几个字段,查找几个字段,要与下面第二条配合
第二点: String crateTemplateSql = @"
[order_no] [varchar](32) NULL,
[qty] [int] NULL,
[total_amount] [decimal](18, 2) NULL,";
这个要与第一点匹配
第三点: String updateSql = @"Merge into tb_bulk AS T
Using #TmpTable AS S
ON (T.order_no = S.order_no )
WHEN MATCHED
THEN UPDATE SET T.[qty]=S.[qty],T.[total_amount]=S.[total_amount];DROP TABLE #TmpTable;";
要与第一点,第二点匹配,字段多少要匹配,字段类型要匹配,特别重要
创建sqlHelper类
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SqlBulkCopyHelper
{
public class sqlHelper
{
/// <summary>
/// 数据库操作帮助类
/// 此段基础代码为SQLServer数据库帮助类
/// 如需操作MySQL
/// 1.将代码中Sql改为MySql
/// 2.添加MySql.Data.dll引用(可通过官网或NuGet)
/// 3.using System.Data.SqlClient;替换为using MySql.Data.MySqlClient;
/// </summary>
/// <summary>
/// 数据库连接字符串
/// </summary>
private static string connectionStr =
System.Configuration.ConfigurationManager.ConnectionStrings["ConnectionName"].ConnectionString;
public sqlHelper() { }
public sqlHelper(string connectionStr)
{
sqlHelper.connectionStr = connectionStr;
}
/// <summary>
/// 得到连接对象
/// </summary>
/// <returns></returns>
public static SqlConnection GetConn()
{
SqlConnection sqlconn = null;
sqlconn = new SqlConnection(connectionStr);
return sqlconn;
}
/// <summary>
/// 查询操作
/// </summary>
/// <param name="sql"></param>
/// <returns></returns>
public static DataTable GetDataTable(string sql, params SqlParameter[] sp)
{
using (SqlConnection conn = GetConn())
{
conn.Open();
using (SqlDataAdapter sda = new SqlDataAdapter(sql, conn))
{
sda.SelectCommand.Parameters.AddRange(sp);
DataTable dt = new DataTable();
sda.Fill(dt);
return dt;
}
}
}
/// <summary>
/// 增删改操作
/// </summary>
/// <param name="sql">sql语句</param>
/// <returns>执行后的条数</returns>
public static int ExecuteNonQuery(string sql, params SqlParameter[] sp)
{
using (SqlConnection conn = GetConn())
{
conn.Open();
using (SqlCommand cmd = new SqlCommand(sql, conn))
{
cmd.Parameters.AddRange(sp);
int i = cmd.ExecuteNonQuery();
return i;
}
}
}
/// <summary>
/// 执行一条SQL语句,返回首行首列
/// </summary>
/// <param name="sql">sql语句</param>
/// <returns>首行首列</returns>
public static object ExecuteScalar(string sql, params SqlParameter[] sp)
{
using (SqlConnection conn = GetConn())
{
conn.Open();
using (SqlCommand cmd = new SqlCommand(sql, conn))
{
cmd.Parameters.AddRange(sp);
return cmd.ExecuteScalar();
}
}
}
}
}
创建sqlBulkCopyHelper
/// <summary>
/// SqlBulkCopy 批量更新数据
/// </summary>
/// <param name="dataTable">数据集</param>
/// <param name="crateTemplateSql">临时表创建字段</param>
/// <param name="updateSql">更新语句</param>
public static void BulkUpdateData(DataTable dataTable, string crateTemplateSql, string updateSql)
{
ConfigurationManager.OpenExeConfiguration(ConfigurationUserLevel.PerUserRoamingAndLocal);
using (var conn = new SqlConnection(ConfigurationManager.ConnectionStrings["DefaultConnection"].ConnectionString))
{
using (var command = new SqlCommand("", conn))
{
try
{
conn.Open();
//数据库并创建一个临时表来保存数据表的数据
command.CommandText = String.Format(" CREATE TABLE #TmpTable ({0})", crateTemplateSql);
command.ExecuteNonQuery();
//使用SqlBulkCopy 加载数据到临时表中
using (var bulkCopy = new SqlBulkCopy(conn))
{
foreach (DataColumn dcPrepped in dataTable.Columns)
{
bulkCopy.ColumnMappings.Add(dcPrepped.ColumnName, dcPrepped.ColumnName);
}
bulkCopy.BulkCopyTimeout = 660;
bulkCopy.DestinationTableName = "#TmpTable";
bulkCopy.WriteToServer(dataTable);
bulkCopy.Close();
}
// 执行Command命令 使用临时表的数据去更新目标表中的数据 然后删除临时表
command.CommandTimeout = 300;
command.CommandText = updateSql;
command.ExecuteNonQuery();
}
finally
{
conn.Close();
}
}
}
}
Program代码
/// <summary>
/// 更新表数据
/// </summary>
public static void update()
{
String sqlstring = @"";
System.Diagnostics.Stopwatch stopwatch = new Stopwatch();
stopwatch.Start(); // 开始监视代码运行时间
DataTable dt = sqlHelper.GetDataTable(sqlstring);
stopwatch.Stop(); // 停止监视
Console.WriteLine("执行查询sql用时:" + stopwatch.Elapsed.TotalSeconds + "秒,共查询到:" + dt.Rows.Count + "行");
String updateSql = @"Merge into Table AS T
Using #TmpTable AS S
ON (T.order_no = S.order_no and T.item_code = S.item_code )
WHEN MATCHED
THEN UPDATE SET T.[qty]=S.[qty],T.[total_amount]=S.[total_amount];";
String crateTemplateSql = @"
[order_no] [varchar](32) NULL,
[qty] [int] NULL,
[total_amount] [decimal](18, 2) NULL,
[item_code] [varchar](32) NULL,";
for (int i = 0; i < (dt.Rows.Count + 10000 - 1) / 10000; i++)
{
System.Diagnostics.Stopwatch stopwatch2 = new Stopwatch();
stopwatch2.Start();
sqlBulkCopyHelper.BulkUpdateData(dt.AsEnumerable().Skip(i * 10000).Take(10000).CopyToDataTable(), crateTemplateSql, updateSql);
stopwatch2.Stop();
Console.WriteLine("更新第" + (i + 1) + "次耗时:" + stopwatch2.Elapsed.TotalSeconds + "秒,剩余" + ((dt.Rows.Count + 10000 - 1) / 10000 - i - 1) + "次");
}
Console.ReadLine();
}
创建数据库表
CREATE TABLE [dbo].[tb_bulk](
[ID] [int] IDENTITY(1,1) NOT NULL,
[order_no] [varchar](50) NULL,
[qty] [int] NULL,
[total_amount] [decimal](18, 2) NULL,
[item_code] [varchar](50) NULL,
CONSTRAINT [PK_tb_bulk] PRIMARY KEY CLUSTERED
(
[ID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。