赞
踩
1、c#对多种数据库批量插入封装,方便调用;
MySql社区驱动MySqlConnector提供的批量插入方式是SqlBulkCopy,基于MySql自身的文件上传机制进行批量插入,参数为一个dataTable对象,原生的批量插入代码如下,计时方式与SqlServer相同,同时,MySql的连接字符串里要添加";AllowLoadLocalInfile=true",即连接字符串的形式应该是"Server= ;Database=;User ID=;Password=;AllowLoadLocalInfile=true",同时在MySql数据库上执行"set global local_infile=1"开启批量上传
- /// <summary>
- /// MySqlBulkInsert
- /// </summary>
- /// <typeparam name="T">插入的表对应的model</typeparam>
- /// <param name="list">插入的数据</param>
- /// <param name="tableName">泛型中无法获取到表,直接传表名称</param>
- /// <returns></returns>
- private bool MySqlBulkInsert<T>(IEnumerable<T> list, string tableName = "") where T : class
- {
- var connectionString = MSEApplication.GetConfigValue("ConnectionStrings:BulkConn");
- using (var conn = new MySqlConnection(connectionString))
- {
- try
- {
- var tableEntity = DataCommonUtil.GetTableEntity<T>();
- if (!string.IsNullOrWhiteSpace(tableName))
- tableEntity.TableName = tableName;
- MySqlBulkCopy sqlBulkCopy = new MySqlBulkCopy(conn, null);
- sqlBulkCopy.DestinationTableName = tableEntity.TableName;
- var dataTable = list.ToDataTable();
-
- int i = 0;
- foreach (DataColumn dataColumn in dataTable.Columns)
- {
- sqlBulkCopy.ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, dataColumn.ColumnName));
- i = i + 1;
- }
-
- conn.Open();
- var bulkCopyResult = sqlBulkCopy.WriteToServer(dataTable);
- if (bulkCopyResult.RowsInserted == list.Count())
- return true;
- else
- return false;
- }
- catch (Exception ex)
- {
- conn.Close();
- conn.Dispose();
- return false;
- }
- finally
- {
- conn.Close();
- conn.Dispose();
- }
- }
- }
SqlServer官方提供的批量插入方式是SqlBulkCopy。
- /// <summary>
- /// SqlServerBulkInsert
- /// </summary>
- /// <typeparam name="T">插入的表对应的model</typeparam>
- /// <param name="list">插入的数据</param>
- /// <param name="tableName">泛型中无法获取到表,直接传表名称</param>
- /// <returns></returns>
- private bool SqlServerBulkInsert<T>(IEnumerable<T> list, string tableName = "") where T : class
- {
- var connectionString = MSEApplication.GetConfigValue("ConnectionStrings:BulkConn");
- using (var conn = new SqlConnection(connectionString))
- {
- try
- {
- var tableEntity = DataCommonUtil.GetTableEntity<T>();
- if (!string.IsNullOrWhiteSpace(tableName))
- tableEntity.TableName = tableName;
- SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity, null);
- sqlBulkCopy.DestinationTableName = tableEntity.TableName;
- sqlBulkCopy.BatchSize = 20000;
- var dataTable = list.ToDataTable();
-
- foreach (DataColumn dataColumn in dataTable.Columns)
- {
- sqlBulkCopy.ColumnMappings.Add(dataColumn.ColumnName, dataColumn.ColumnName);
- }
- conn.Open();
- sqlBulkCopy.WriteToServer(dataTable);
- return true;
- }
- catch (Exception ex)
- {
- conn.Close();
- conn.Dispose();
- return false;
- }
- finally
- {
- conn.Close();
- conn.Dispose();
- }
- }
- }
- /// <summary>
- /// OracleBulkInsert
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="list"></param>
- /// <param name="tableName">泛型中无法获取到表,直接传表名称</param>
- /// <returns></returns>
- private bool OracleBulkInsert<T>(IEnumerable<T> list, string tableName = "") where T : class
- {
- var connectionString = MSEApplication.GetConfigValue("ConnectionStrings:BulkConn");
- using (var conn = new OracleConnection(connectionString))
- {
- using (var adapter = new OracleDataAdapter())
- {
- var tableEntity = DataCommonUtil.GetTableEntity<T>();
- if (!string.IsNullOrWhiteSpace(tableName))
- tableEntity.TableName = tableName;
- var dataTable = list.ToDataTable();
- adapter.InsertCommand = new OracleCommand();
- adapter.InsertCommand.CommandText = $"INSERT INTO {tableEntity.TableName} ({string.Join(",", dataTable.Columns.Cast<DataColumn>().Select(c => c.ColumnName))})" +
- $" VALUES ({string.Join(",", dataTable.Columns.Cast<DataColumn>().Select(c => ":" + c.ColumnName))})";
- adapter.InsertCommand.Connection = conn;
- foreach (DataColumn column in dataTable.Columns)
- {
- adapter.InsertCommand.Parameters.Add($":{column.ColumnName}", column.DataType.ConvertToOracleDbType(), column.MaxLength, column.ColumnName);
- }
- adapter.UpdateBatchSize = 20000;
- conn.Open();
- using (var transaction = conn.BeginTransaction())
- {
- try
- {
- adapter.InsertCommand.Transaction = transaction;
- adapter.Update(dataTable);
- transaction.Commit();
- return true;
- }
- catch (Exception ex)
- {
- transaction.Rollback();
- conn.Close();
- conn.Dispose();
- return false;
- }
- finally
- {
- conn.Close();
- conn.Dispose();
- }
- }
- }
- }
- }
- /// <summary>
- /// PostgresqlBulkInsert
- /// </summary>
- /// <typeparam name="T">插入的表对应的model</typeparam>
- /// <param name="list">插入的数据</param>
- /// <param name="tableName">泛型中无法获取到表,直接传表名称</param>
- /// <returns></returns>
-
- private bool PostgresqlBulkInsert<T>(IEnumerable<T> list, string tableName = "") where T : class
- {
- var connectionString = MSEApplication.GetConfigValue("ConnectionStrings:BulkConn");
- using (var conn = new NpgsqlConnection(connectionString))
- {
- conn.Open();
- try
- {
- var tableEntity = DataCommonUtil.GetTableEntity<T>();
- if (!string.IsNullOrWhiteSpace(tableName))
- tableEntity.TableName = tableName;
- PropertyInfo[] properties = typeof(T).GetProperties();
- string[] propertyNames = properties.Select(pro => pro.Name).ToArray();
- string joinedPropertyNames = string.Join(", ", propertyNames);
- //缓存属性值访问器
- Func<T, object>[] propertyValue = properties.Select(pro => (Func<T, object>)pro.GetValue).ToArray();
- using (var writer = conn.BeginBinaryImport($"COPY {tableEntity.TableName} ({joinedPropertyNames}) FROM STDIN BINARY"))
- {
- foreach (var item in list)
- {
- writer.StartRow();
- foreach (var valueAccessor in propertyValue)
- {
- var fieldValue = valueAccessor(item);
- // 处理 null 值
- fieldValue = fieldValue ?? DBNull.Value;
- if (fieldValue == null)
- {
- writer.WriteNull();
- }
- else
- {
- writer.Write(fieldValue);
- }
- }
- }
- writer.Complete();
- }
- return true;
- }
- catch (Exception ex)
- {
- conn.Close();
- conn.Dispose();
- return false;
- }
- finally
- {
- conn.Close();
- conn.Dispose();
- }
- }
- }
- public static partial class BulkUtil
- {
-
- /// <summary>
- /// 判断一个类型是否为可空类型
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="o"></param>
- /// <returns></returns>
- public static bool IsNullable<T>(this T o)
- {
- var type = typeof(T);
- return type.IsNullable();
- }
-
- /// <summary>
- /// 判断一个类型是否为可空类型
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="type"></param>
- /// <returns></returns>
- public static bool IsNullable(this Type type)
- {
- return Nullable.GetUnderlyingType(type) != null;
- }
- /// <summary>
- /// 判断type是否为集合类型
- /// </summary>
- /// <param name="type"></param>
- /// <returns></returns>
- public static bool IsCollection(this Type type)
- {
- return type.GetInterfaces().Any(it => it == typeof(ICollection));
- }
-
- /// <summary>
- /// 判断type是否为迭代器类型
- /// </summary>
- /// <param name="type"></param>
- /// <returns></returns>
- public static bool IsEnumerable(this Type type)
- {
- return type.GetInterfaces().Any(it => it == typeof(IEnumerable));
- }
-
- /// <summary>
- /// 判断type是否为查询器类型
- /// </summary>
- /// <param name="type"></param>
- /// <returns></returns>
- public static bool IsQueryable(this Type type)
- {
- return type.GetInterfaces().Any(it => it == typeof(IQueryable));
- }
-
- /// <summary>
- /// 判断type是否为字符串类型
- /// </summary>
- /// <param name="type"></param>
- /// <returns></returns>
- public static bool IsString(this Type type)
- {
- return type == typeof(string);
- }
-
- /// <summary>
- /// 判断type是否为支持async的类型
- /// </summary>
- /// <param name="type"></param>
- /// <returns></returns>
- public static bool IsAsyncType(this Type type)
- {
- var awaiter = type.GetMethod("GetAwaiter");
- if (awaiter == null)
- return false;
- var retType = awaiter.ReturnType;
- //.NET Core 1.1及以下版本中没有 GetInterface 方法,为了兼容性使用 GetInterfaces
- if (retType.GetInterfaces().All(i => i.Name != "INotifyCompletion"))
- return false;
- if (retType.GetProperty("IsCompleted") == null)
- return false;
- if (retType.GetMethod("GetResult") == null)
- return false;
-
- return true;
- }
-
- /// <summary>
- /// 根据type 生成实例类型
- /// </summary>
- /// <param name="type"></param>
- /// <param name="args"></param>
- /// <returns></returns>
- public static object CreateInstance(this Type type, object[] args)
- {
- if (args == null || args.Length == 0)
- {
- return Activator.CreateInstance(type);
- }
-
- return Activator.CreateInstance(type, args: args);
- }
-
- /// <summary>
- /// 获得基础类型,获得比如被Task,ICollection<>,IEnumable<>,IQueryable<>等包裹的类型
- /// </summary>
- /// <param name="type"></param>
- /// <returns></returns>
- public static Type GetUnderlyingType(this Type type)
- {
- var resultTmp = type.IsAsyncType() ? type.GenericTypeArguments.First() : type;
- var resultTmp2 = resultTmp.IsGenericType
- ? resultTmp.GetGenericArguments().First()
- : resultTmp;
-
- return resultTmp2;
- }
-
- /// <summary>
- /// 根据类名获得Type实例
- /// </summary>
- /// <param name="typeName"></param>
- /// <returns></returns>
- public static Type LoadTypeByName(string typeName)
- {
- if (typeName.IsNullOrWhiteSpace()) throw new Exception("typeName must be not empty");
- Type t = Type.GetType(typeName);
- if (t != null) System.Runtime.CompilerServices.RuntimeHelpers.RunClassConstructor(t.TypeHandle);
- return t;
- }
-
- /// <summary>
- /// 获取某个类的默认值
- /// </summary>
- /// <param name="type"></param>
- /// <returns></returns>
- public static object GetDefaultValue(this Type type)
- {
- return type.IsValueType ? Activator.CreateInstance(type) : null;
- }
-
- /// <summary>
- /// 判断是否为字典类型
- /// </summary>
- /// <param name="type"></param>
- /// <returns></returns>
- public static bool IsDictionary(this Type type)
- {
- return type.GetInterfaces().Any(it => it == typeof(IDictionary));
- }
-
- /// <summary>
- /// 通过表达式树获取实体类的属性值
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <typeparam name="TResult"></typeparam>
- /// <param name="model"></param>
- /// <param name="propertyName"></param>
- /// <returns></returns>
- /// <exception cref="ArgumentNullException"></exception>
- public static TResult GetPropertyValue<T, TResult>(this T model, string propertyName)
- {
- var result = GetPropertyValue(model, propertyName);
- return (TResult)result;
- }
-
- public static ConcurrentDictionary<string, object> CacheDictionary = new ConcurrentDictionary<string, object>();
- public static ConcurrentDictionary<string, Delegate> CacheDelegateDictionary = new ConcurrentDictionary<string, Delegate>();
- /// <summary>
- /// 通过表达式树获取实体类的属性值
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="model"></param>
- /// <param name="propertyName"></param>
- /// <returns></returns>
- /// <exception cref="ArgumentNullException"></exception>
- public static object GetPropertyValue<T>(this T model, string propertyName)
- {
- var type = model.GetType();
- var property = type.GetProperty(propertyName);
- if (property == null)
- {
- throw new ArgumentNullException($"could not find property with name {propertyName}");
- }
-
- var key = "get:" + type.FullName + property.Name;
- if (CacheDictionary.TryGetValue(key, out var func))
- {
- return ((Delegate)func).DynamicInvoke(model);
- }
-
- var modelExpression = Expression.Parameter(type, "model");
- var propertyExpression = Expression.Property(modelExpression, property);
- var convertExpression = Expression.Convert(propertyExpression, typeof(object));
- var lambda = Expression.Lambda(convertExpression, modelExpression).Compile();
- var result = lambda.DynamicInvoke(model);
- CacheDictionary.TryAdd(key, lambda);
- return result;
- }
-
- /// <summary>
- /// 通过表达式树设置实体类的属性值
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="model"></param>
- /// <param name="propertyName"></param>
- /// <param name="value"></param>
- /// <exception cref="ArgumentNullException"></exception>
- public static void SetPropertyValue<T>(this T model, string propertyName, object value)
- {
- var type = model.GetType();
- var property = type.GetProperty(propertyName);
-
- if (property == null)
- {
- throw new ArgumentNullException($"could not find property with name {propertyName}");
- }
-
- var key = "set:" + type.FullName + property.Name;
- if (CacheDictionary.TryGetValue(key, out var func))
- {
- ((Delegate)func).DynamicInvoke(model, value);
- }
-
- var modelExpression = Expression.Parameter(type, "model");
- var propertyExpression = Expression.Parameter(typeof(object), "val");
- var convertExpression = Expression.Convert(propertyExpression, property.PropertyType);
- var methodCallExpression = Expression.Call(modelExpression, property.GetSetMethod(), convertExpression);
- var lambda = Expression.Lambda(methodCallExpression, modelExpression, propertyExpression).Compile();
- CacheDictionary.TryAdd(key, lambda);
- lambda.DynamicInvoke(model, value);
- }
-
- /// <summary>
- /// 构建一个object数据转换成一维数组数据的委托
- /// </summary>
- /// <param name="objType"></param>
- /// <param name="propertyInfos"></param>
- /// <returns></returns>
- public static Func<T, object[]> BuildObjectGetValuesDelegate<T>(List<PropertyInfo> propertyInfos) where T : class
- {
- var objParameter = Expression.Parameter(typeof(T), "model");
- var selectExpressions = propertyInfos.Select(it => BuildObjectGetValueExpression(objParameter, it));
- var arrayExpression = Expression.NewArrayInit(typeof(object), selectExpressions);
- var result = Expression.Lambda<Func<T, object[]>>(arrayExpression, objParameter).Compile();
- return result;
- }
-
-
- /// <summary>
- /// 构建对象获取单个值得
- /// </summary>
- /// <param name="modelExpression"></param>
- /// <param name="propertyInfo"></param>
- /// <returns></returns>
- public static Expression BuildObjectGetValueExpression(ParameterExpression modelExpression, PropertyInfo propertyInfo)
- {
- var propertyExpression = Expression.Property(modelExpression, propertyInfo);
- var convertExpression = Expression.Convert(propertyExpression, typeof(object));
- return convertExpression;
- }
-
- /// <summary>
- ///
- /// </summary>
- /// <typeparam name="T"></typeparam>
- /// <param name="source"></param>
- /// <param name="propertyInfos"></param>
- /// <param name="useColumnAttribute"></param>
- /// <returns></returns>
- public static DataTable ToDataTable<T>(this IEnumerable<T> source, List<PropertyInfo> propertyInfos = null, bool useColumnAttribute = false) where T : class
- {
- var table = new DataTable("template");
- if (propertyInfos == null || propertyInfos.Count == 0)
- {
- propertyInfos = typeof(T).GetProperties().Where(it => it.CanRead).ToList();
- }
- foreach (var propertyInfo in propertyInfos)
- {
- var columnName = useColumnAttribute ? (propertyInfo.GetCustomAttribute<ColumnAttribute>()?.Name ?? propertyInfo.Name) : propertyInfo.Name;
- table.Columns.Add(columnName, ChangeType(propertyInfo.PropertyType));
- }
-
- Func<T, object[]> func;
- var key = typeof(T).FullName + propertyInfos.Select(it => it.Name).ToList().StringJoin();
- if (CacheDictionary.TryGetValue(key, out var cacheFunc))
- {
- func = (Func<T, object[]>)cacheFunc;
- }
- else
- {
- func = BuildObjectGetValuesDelegate<T>(propertyInfos);
- CacheDictionary.TryAdd(key, func);
- }
-
- foreach (var model in source)
- {
- var rowData = func(model);
- table.Rows.Add(rowData);
- }
-
- return table;
- }
-
- private static Type ChangeType(Type type)
- {
- if (type.IsNullable())
- {
- type = Nullable.GetUnderlyingType(type);
- }
-
- return type;
- }
-
- /// <summary>
- ///
- /// </summary>
- /// <param name="constructorInfo"></param>
- /// <returns></returns>
- public static Delegate BuildGenerateObjectDelegate(ConstructorInfo constructorInfo)
- {
- var parameterExpressions = new List<ParameterExpression>();
- foreach (var parameterInfo in constructorInfo.GetParameters())
- {
- var parameterExpression = Expression.Parameter(parameterInfo.ParameterType);
- parameterExpressions.Add(parameterExpression);
- }
- var c = Expression.New(constructorInfo, parameterExpressions);
- var lambda = Expression.Lambda(c, parameterExpressions).Compile();
- return lambda;
- }
-
- /// <summary>
- /// 替换dataTable里的列类型
- /// </summary>
- /// <param name="dt"></param>
- public static void ReplaceDataTableColumnType<OldType, NewType>(DataTable dt, Func<OldType, NewType> replaceFunc)
- {
- var needUpdateColumnIndexList = new List<int>();
- var needUpdateColumnNameList = new List<string>();
-
- for (int i = 0; i < dt.Columns.Count; i++)
- {
- var column = dt.Columns[i];
- if (column.DataType.GetUnderlyingType() == typeof(OldType))
- {
- needUpdateColumnIndexList.Add(i);
- needUpdateColumnNameList.Add(column.ColumnName);
-
- }
- }
-
- if (needUpdateColumnIndexList.Count == 0)
- {
- return;
- }
-
- var nameMapping = new Dictionary<string, string>();
- for (int i = 0; i < needUpdateColumnIndexList.Count; i++)
- {
- var oldColumnName = needUpdateColumnNameList[i];
- var newColumnName = Guid.NewGuid().ToString("N");
- nameMapping.Add(newColumnName, oldColumnName);
-
- dt.Columns.Add(newColumnName, typeof(byte[])).SetOrdinal(needUpdateColumnIndexList[i]);
- for (int j = 0; j < dt.Rows.Count; j++)
- {
- var c = (dt.Rows[j][oldColumnName]);
- dt.Rows[j][newColumnName] = replaceFunc((OldType)(dt.Rows[j][oldColumnName]));
- }
- dt.Columns.Remove(oldColumnName);
- }
-
- for (int i = 0; i < dt.Columns.Count; i++)
- {
- var columnName = dt.Columns[i].ColumnName;
- if (nameMapping.ContainsKey(columnName))
- {
- dt.Columns[i].ColumnName = nameMapping[columnName];
- }
- }
-
- }
-
- /// <summary>
- /// 判断字符串是否为空
- /// </summary>
- /// <param name="str"></param>
- /// <returns></returns>
- public static bool IsNullOrWhiteSpace(this string str)
- {
- return string.IsNullOrWhiteSpace(str);
- }
-
- /// <summary>
- /// 对字符串集合进行拼接
- /// </summary>
- /// <param name="source"></param>
- /// <param name="separator"></param>
- /// <returns></returns>
- public static string StringJoin(this IEnumerable<string> source, char separator = ',')
- {
- return string.Join(separator, source);
- }
-
- public static OracleDbType ConvertToOracleDbType(this Type type)
- {
- switch (Type.GetTypeCode(type))
- {
- case TypeCode.Boolean:
- return OracleDbType.Boolean;
- case TypeCode.Byte:
- case TypeCode.SByte:
- return OracleDbType.Byte;
- case TypeCode.Int16:
- case TypeCode.UInt16:
- return OracleDbType.Int16;
- case TypeCode.Int32:
- case TypeCode.UInt32:
- return OracleDbType.Int32;
- case TypeCode.Int64:
- case TypeCode.UInt64:
- return OracleDbType.Int64;
- case TypeCode.Single:
- return OracleDbType.Single;
- case TypeCode.Double:
- return OracleDbType.Double;
- case TypeCode.Decimal:
- return OracleDbType.Decimal;
- case TypeCode.String:
- return OracleDbType.NVarchar2;
- case TypeCode.Char:
- return OracleDbType.Char;
- case TypeCode.DateTime:
- return OracleDbType.Date;
- case TypeCode.Object:
- if (type == typeof(Guid))
- {
- return OracleDbType.Raw;
- }
- else if (type == typeof(byte[]))
- {
- return OracleDbType.Blob;
- }
- else if (type == typeof(TimeSpan))
- {
- return OracleDbType.IntervalDS;
- }
- else if (type == typeof(OracleClob))
- {
- return OracleDbType.Clob;
- }
- else if (type == typeof(OracleBlob))
- {
- return OracleDbType.Blob;
- }
- else if (type == typeof(OracleXmlType))
- {
- return OracleDbType.XmlType;
- }
- else if (type == typeof(OracleRefCursor))
- {
- return OracleDbType.RefCursor;
- }
- else if (type == typeof(OracleTimeStamp))
- {
- return OracleDbType.TimeStamp;
- }
- else if (type == typeof(OracleIntervalYM))
- {
- return OracleDbType.IntervalYM;
- }
- else if (type == typeof(OracleIntervalDS))
- {
- return OracleDbType.IntervalDS;
- }
- else if (type == typeof(OracleCollectionType))
- {
- return OracleDbType.Object;
- }
- else
- {
- return OracleDbType.Varchar2;
- }
- default:
- return OracleDbType.Varchar2;
- }
- }
-
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。