赞
踩
在多线程中使用同一mysql连接,一边读取数据库,一般修改数据库(insert,update,delete) 可能出现程序脏读的问题。
线程一:
delete from tb where ...
insert into tb...
线程二:
select * from tb
我们可以在查询中增加锁机制【lock in share mode】
select * from tb lock in share mode
允许不同事务之前共享加锁读取,但不允许其它事务修改或者加入排他锁
如果有修改必须等待一个事务提交完成,才可以执行,容易出现死锁
当一个事物加入排他锁后,不允许其他事务加共享锁或者排它锁读取,更加不允许其他事务修改加锁的行。
源程序如下:
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
-
- namespace MultiThreadMySqlDemo
- {
- /// <summary>
- /// 数据库类型
- /// </summary>
- public enum DatabaseType
- {
- /// <summary>
- /// 微软SqlServer数据库
- /// </summary>
- SqlServer = 0,
- /// <summary>
- /// Oracle数据库
- /// </summary>
- Oracle = 1,
- /// <summary>
- /// MySql数据库
- /// </summary>
- MySql = 2,
- /// <summary>
- /// 轻量级SQLite数据库,多用于本地保存数据
- /// </summary>
- SQLite = 3,
- /// <summary>
- /// PostgreSql数据库
- /// </summary>
- PostgreSql = 4,
- /// <summary>
- /// IBM DB2数据
- /// </summary>
- DB2 = 5,
- /// <summary>
- /// IBM Informix数据库
- /// </summary>
- Informix = 6,
- /// <summary>
- /// 对象链接和嵌入数据库:如Access数据库
- /// </summary>
- OleDb = 7,
- /// <summary>
- /// 开放数据库互连:Open Database Connectivity
- /// </summary>
- ODBC = 8
- }
- }
- using System;
- using System.Collections.Generic;
- using System.Data.Common;
- using System.Linq;
- using System.Text;
-
- namespace MultiThreadMySqlDemo
- {
- /// <summary>
- /// 数据库工厂,为不同类型的数据源类进行实例化
- /// </summary>
- public class ProviderFactory
- {
- private static Dictionary<DatabaseType, string> providerInvariantNames = new Dictionary<DatabaseType, string>();
- private static Dictionary<DatabaseType, DbProviderFactory> providerFactory = new Dictionary<DatabaseType, DbProviderFactory>();
-
- static ProviderFactory()
- {
- //加载已知的数据库访问类的程序集
- providerInvariantNames.Add(DatabaseType.SqlServer, "System.Data.SqlClient");
- providerInvariantNames.Add(DatabaseType.Oracle, "Oracle.DataAccess.Client");
- providerInvariantNames.Add(DatabaseType.MySql, "MySql.Data.MySqlClient");
- providerInvariantNames.Add(DatabaseType.SQLite, "System.Data.SQLite");
- providerInvariantNames.Add(DatabaseType.PostgreSql, "Npgsql");
- providerInvariantNames.Add(DatabaseType.DB2, "IBM.Data.DB2.iSeries");
- providerInvariantNames.Add(DatabaseType.Informix, "IBM.Data.Informix");
- providerInvariantNames.Add(DatabaseType.OleDb, "System.Data.OleDb");
- providerInvariantNames.Add(DatabaseType.ODBC, "System.Data.ODBC");
- }
-
- /// <summary>
- /// 获取指定数据库类型对应的程序集名称
- /// </summary>
- /// <param name="providerType"></param>
- /// <returns></returns>
- public static string GetProviderInvariantName(DatabaseType providerType)
- {
- return providerInvariantNames[providerType];
- }
-
- /// <summary>
- /// 获取指定类型的数据库对应的DbProviderFactory
- /// </summary>
- /// <param name="providerType"></param>
- /// <returns></returns>
- public static DbProviderFactory GetDbProviderFactory(DatabaseType providerType)
- {
- //如果还没有加载,则加载该DbProviderFactory
- if (!providerFactory.ContainsKey(providerType))
- {
- providerFactory.Add(providerType, ImportDbProviderFactory(providerType));
- }
- return providerFactory[providerType];
- }
- /// <summary>
- /// 加载指定数据库类型的DbProviderFactory
- /// </summary>
- /// <param name="providerType">数据库类型枚举</param>
- /// <returns></returns>
- private static DbProviderFactory ImportDbProviderFactory(DatabaseType providerType)
- {
- string providerName = providerInvariantNames[providerType];
- DbProviderFactory factory = null;
- try
- {
- //invariant 不变的,固定的 variant 变化的,不同的,变量
- //从全局程序集中查找,如果使用的dll(如:System.Data.SQLite.dll,该类库需要放在bin\debug目录下) 是32位的,则编译程序也要以32位(x86)运行
- //如果没有在App.config(或web.config)中配置节点:
- //<system.data><DbProviderFactories><add name="SQLite Data Provider".. /></DbProviderFactories></system.data>..,则需要更改对应的machine.config
- //32位(x86)的 C:\Windows\Microsoft.NET\Framework\v4.0.30319\Config\machine.config
- //64位(x64)的 C:\Windows\Microsoft.NET\Framework64\v4.0.30319\Config\machine.config
- //<remove invariant = "System.Data.SQLite" /><add name = "SQLite Data Provider" invariant = "System.Data.SQLite" description = ".Net Framework Data Provider for SQLite" type = "System.Data.SQLite.SQLiteFactory, System.Data.SQLite" />
- factory = DbProviderFactories.GetFactory(providerName);
- }
- catch (Exception ex)
- {
- Console.WriteLine("加载程序集失败:" + providerName + "\n" + ex.Message, "指定的数据库对应的程序集无法加载");
- factory = null;
- }
- return factory;
- }
- }
- }
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Data;
- using System.Data.Common;
- using System.Reflection;
- using System.Text.RegularExpressions;
-
- namespace MultiThreadMySqlDemo
- {
- /// <summary>
- /// 通用数据库访问类,封装了对数据库的常见操作
- /// Author:斯内科
- /// </summary>
- public sealed class DbUtility
- {
- /// <summary>
- /// 定义一个静态变量来保存该类的实例
- /// </summary>
- private static DbUtility uniqueInstance;
- /// <summary>
- /// 锁对象
- /// </summary>
- private static object thisLock = new object();
- /// <summary>
- /// 数据库连接字符串
- /// </summary>
- public string ConnectionString { get; private set; }
- /// <summary>
- /// 数据库工厂,可以根据不同的数据库类型,实例化出对应的数据源类对象
- /// </summary>
- private DbProviderFactory providerFactory;
- /// <summary>
- /// 构造函数
- /// </summary>
- /// <param name="connectionString">数据库连接字符串</param>
- /// <param name="providerType">数据库类型枚举</param>
- public DbUtility(string connectionString, DatabaseType providerType)
- {
- ConnectionString = connectionString;
- providerFactory = ProviderFactory.GetDbProviderFactory(providerType);
- if (providerFactory == null)
- {
- throw new ArgumentException("Can't load DbProviderFactory for given value of providerType");
- }
- }
-
- /// <summary>
- /// 实例化该数据库访问类
- /// </summary>
- /// <param name="connectionString">数据库连接字符串</param>
- /// <param name="dbType">数据库类型字符串,如 SqlServer、Oracle</param>
- /// <returns></returns>
- public static DbUtility GetInstance(string connectionString, string dbType)
- {
- DatabaseType databaseType;
- bool parseResult = Enum.TryParse(dbType, true, out databaseType);
- if (!parseResult)
- {
- throw new Exception("不支持的数据库类型:" + dbType);
- }
- return GetInstance(connectionString, databaseType);
- }
-
- /// <summary>
- /// 实例化该数据库访问类
- /// </summary>
- /// <param name="connectionString">数据库连接字符串</param>
- /// <param name="databaseType">数据库类型枚举</param>
- /// <returns></returns>
- public static DbUtility GetInstance(string connectionString, DatabaseType databaseType)
- {
- // 如果类的实例不存在则创建,否则直接返回
- if (uniqueInstance == null)
- {
- lock (thisLock)
- {
- if (uniqueInstance == null)
- {
- uniqueInstance = new DbUtility(connectionString, databaseType);
- }
- }
- }
- return uniqueInstance;
- }
-
- /// <summary>
- /// 创建一个数据库命令对象
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <returns></returns>
- public DbCommand CreateCommand(string sql, Dictionary<string, object> dict, CommandType commandType)
- {
- DbConnection connection = providerFactory.CreateConnection();
- connection.ConnectionString = ConnectionString;
- DbCommand command = CreateCommand(sql, dict, commandType, connection);
- return command;
- }
-
- /// <summary>
- /// 创建一个数据库命令对象
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <param name="connection">数据库连接对象</param>
- /// <returns></returns>
- public DbCommand CreateCommand(string sql, Dictionary<string, object> dict, CommandType commandType, DbConnection connection)
- {
- DbCommand command = providerFactory.CreateCommand();
- command.Connection = connection;
- command.CommandType = commandType;
- command.CommandText = sql;
- if (dict != null)
- {
- foreach (string parameterName in dict.Keys)
- {
- DbParameter dbParameter = command.CreateParameter();
- dbParameter.ParameterName = parameterName;
- dbParameter.Value = dict[parameterName];
- command.Parameters.Add(dbParameter);
- }
- }
- return command;
- }
-
- /// <summary>
- /// 更新数据,执行增(insert into)、删(delete)、改(update)、创建表(create table)等操作
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text(默认)为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <returns></returns>
- public int UpdateData(string sql, Dictionary<string, object> dict, CommandType commandType = CommandType.Text)
- {
- int affectedRows = -1;
- using (DbCommand command = CreateCommand(sql, dict, commandType))
- {
- try
- {
- command.Connection.Open();
- affectedRows = command.ExecuteNonQuery();
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.Message, "更改数据时出现异常");
- }
- finally
- {
- command.Connection.Close();
- }
- }
- return affectedRows;
- }
-
- /// <summary>
- /// 事务处理
- /// </summary>
- /// <param name="sqlCollection">sql语句集合</param>
- /// <param name="dictCollection">每一条对应sql中的参数以及对应的值,没有参数时为null</param>
- /// <returns></returns>
- public bool ProcessTransaction(List<string> sqlCollection, List<Dictionary<string, object>> dictCollection)
- {
- bool processResult = false;
- if (sqlCollection == null || dictCollection == null || sqlCollection.Count != dictCollection.Count)
- {
- throw new Exception("没有sql指令 或者 参数个数不匹配");
- }
- DbConnection connection = providerFactory.CreateConnection();
- connection.ConnectionString = ConnectionString;
- DbTransaction transaction = null;
- try
- {
- if (connection.State != ConnectionState.Open)
- {
- connection.Open();
- }
- using (transaction = connection.BeginTransaction())
- {
- for (int i = 0; i < sqlCollection.Count; i++)
- {
- DbCommand command = CreateCommand(sqlCollection[i], dictCollection[i], CommandType.Text, connection);
- //增加事务与命令的绑定
- command.Transaction = transaction;
- command.ExecuteNonQuery();
- }
- //提交事务
- transaction.Commit();
- processResult = true;
- }
- }
- catch (Exception ex)
- {
- //回滚事务
- if (transaction != null)
- {
- transaction.Rollback();
- }
- Console.WriteLine(ex.Message, "事务处理时出现异常");
- processResult = false;
- }
- finally
- {
- connection.Close();
- }
- return processResult;
- }
-
- /// <summary>
- /// 执行查询,并返回查询所返回的结果集中第一行的第一列。所有其他的列和行将被忽略
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text(默认)为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <returns></returns>
- public object GetScalar(string sql, Dictionary<string, object> dict, CommandType commandType = CommandType.Text)
- {
- object result = null;
- using (DbCommand command = CreateCommand(sql, dict, commandType))
- {
- try
- {
- command.Connection.Open();
- result = command.ExecuteScalar();
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.Message, "获取第一行的第一列数据时出现异常");
- }
- finally
- {
- command.Connection.Close();
- }
- }
- return result;
- }
-
- /// <summary>
- /// 执行查询,并返回查询所返回的结果集中第一行的第一列。返回一个字符串
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text(默认)为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <returns></returns>
- public string GetScalarString(string sql, Dictionary<string, object> dict, CommandType commandType = CommandType.Text)
- {
- object result = GetScalar(sql, dict, commandType);
- if (result == null)
- {
- return string.Empty;
- }
- return result.ToString();
- }
-
- /// <summary>
- /// 执行查询,并返回查询所返回的结果集中第一行的第一列。返回一个整数
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text(默认)为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <returns></returns>
- public int GetScalarInt(string sql, Dictionary<string, object> dict, CommandType commandType = CommandType.Text)
- {
- object result = GetScalar(sql, dict, commandType);
- if (result == null)
- {
- return 0;
- }
- return Convert.ToInt32(result);
- }
-
- /// <summary>
- /// 读取数据,注意读取数据完成后,请调用dataReader.Close()进行手动关闭
- /// 代码示例:while (reader.Read()){string result=reader.GetString(0);} reader.Close();
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text(默认)为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <returns></returns>
- public DbDataReader GetDataReader(string sql, Dictionary<string, object> dict, CommandType commandType = CommandType.Text)
- {
- DbDataReader dataReader = null;
- DbCommand command = CreateCommand(sql, dict, commandType);
- try
- {
- command.Connection.Open();
- dataReader = command.ExecuteReader(CommandBehavior.CloseConnection);
- }
- catch (Exception ex)
- {
- if (dataReader != null)
- {
- dataReader.Close();
- }
- command.Connection.Close();
- Console.WriteLine(ex.Message, "获取只读数据流出现错误");
- }
- return dataReader;
- }
-
- /// <summary>
- /// 获取查询的结果,存入一个数据集中,该函数多用于返回多个数据表的查询
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text(默认)为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <returns></returns>
- public DataSet GetDataSet(string sql, Dictionary<string, object> dict, CommandType commandType = CommandType.Text)
- {
- DataSet ds = new DataSet("HansDataSet");
- using (DbCommand command = CreateCommand(sql, dict, commandType))
- {
- using (DbDataAdapter adapter = providerFactory.CreateDataAdapter())
- {
- adapter.SelectCommand = command;
- try
- {
- adapter.Fill(ds);
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.Message, "查询数据时出现异常");
- }
- }
- }
- return ds;
- }
-
- /// <summary>
- /// 获取查询的结果,存入一个数据表中
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text(默认)为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <returns></returns>
- public DataTable GetDataTable(string sql, Dictionary<string, object> dict, CommandType commandType = CommandType.Text)
- {
- DataTable dt = new DataTable("HansDataTable");
- using (DbCommand command = CreateCommand(sql, dict, commandType))
- {
- using (DbDataAdapter adapter = providerFactory.CreateDataAdapter())
- {
- adapter.SelectCommand = command;
- try
- {
- adapter.Fill(dt);
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.Message, "查询数据时出现异常");
- }
- }
- }
- return dt;
- }
-
- /// <summary>
- /// 获取数据表某一行数据,多用于获取数据库中某一个唯一的ID的数据行
- /// </summary>
- /// <param name="sql">要执行的sql语句 或者 存储过程名</param>
- /// <param name="dict">sql(或存储过程)中的参数以及具体的值</param>
- /// <param name="commandType">命令字符串类型:Text(默认)为 SQL文本,StoredProcedure为 存储过程名</param>
- /// <returns></returns>
- public DataRow GetDataRow(string sql, Dictionary<string, object> dict, CommandType commandType = CommandType.Text)
- {
- DataTable dt = GetDataTable(sql, dict, commandType);
- if (dt != null && dt.Rows.Count > 0)
- {
- return dt.Rows[0];
- }
- return null;
- }
-
- }
- }
- using System;
- using System.Collections.Generic;
- using System.Data;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
-
- namespace MultiThreadMySqlDemo
- {
- class Program
- {
- //多线程并发mysql读写
- const string connectionString = "server=127.0.0.1;Database=testx;Uid=root;Pwd=root";
- static DbUtility dbUtility = DbUtility.GetInstance(connectionString, DatabaseType.MySql);
- static void Main(string[] args)
- {
- try
- {
- InitCreateTable();
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.Message);
- }
-
- Task.Factory.StartNew(() =>
- {
- for (int i = 0; i < 10; i++)
- {
- PrintCurrentData(GetCurrentDataWithLock);
- PrintCurrentData(GetCurrentData);
- Thread.Sleep(500);
- }
- });
- Task.Run(() =>
- {
- for (int i = 0; i < 10; i++)
- {
- ModifyData();
- Thread.Sleep(500);
- }
- });
- Console.ReadLine();
- }
-
- /// <summary>
- /// 初始化创建数据库
- /// </summary>
- private static void InitCreateTable()
- {
- string sqlCreateTable = @"CREATE TABLE IF NOT EXISTS `busbar_measure_fai` (
- `CoreId` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键自增编号',
- `ModuleBarcode` varchar(100) NOT NULL DEFAULT '' COMMENT '模组条码',
- `EquipmentNo` varchar(50) NOT NULL DEFAULT '' COMMENT '设备编号',
- `ProcessEndTime` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '处理结束时间',
- `ProcessResult` varchar(10) NOT NULL DEFAULT '' COMMENT '处理结果',
- `MeasureDistance1` float NOT NULL DEFAULT '0' COMMENT '首件测距1',
- `MeasureDistance2` float NOT NULL DEFAULT '0' COMMENT '首件测距2',
- `MeasureDistance3` float NOT NULL DEFAULT '0' COMMENT '首件测距3',
- PRIMARY KEY (`CoreId`),
- KEY `IDX_BSB_WELD_FAI_TIME` (`ProcessEndTime`) USING BTREE
- ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='首件测距表';";
- dbUtility.UpdateData(sqlCreateTable, null);
- }
-
- /// <summary>
- /// 获得当前表的全部数据
- /// </summary>
- /// <returns></returns>
- private static DataTable GetCurrentData()
- {
- return dbUtility.GetDataTable("select CoreId,ModuleBarcode,ProcessEndTime from busbar_measure_fai", null);
- }
-
- /// <summary>
- /// 获得当前表的全部数据【增加锁】
- /// from tb lock in share mode
- /// </summary>
- /// <returns></returns>
- private static DataTable GetCurrentDataWithLock()
- {
- return dbUtility.GetDataTable("select CoreId,ModuleBarcode,ProcessEndTime from busbar_measure_fai lock in share mode", null);
- }
-
- /// <summary>
- /// 打印当前数据
- /// </summary>
- /// <param name="funcGetData">一个返回当前数据表的方法</param>
- private static void PrintCurrentData(Func<DataTable> funcGetData)
- {
- DataTable dataTable = funcGetData();
- Console.WriteLine($"读取当前数据行数:【{dataTable.Rows.Count}】");
- for (int i = 0; i < dataTable.Rows.Count; i++)
- {
- Console.WriteLine($"----行【{i + 1}】:【{dataTable.Rows[i]["CoreId"]},{dataTable.Rows[i]["ModuleBarcode"]},{dataTable.Rows[i]["ProcessEndTime"]}】");
- }
- }
-
- /// <summary>
- /// 删除指定的数据后再添加
- /// </summary>
- /// <param name="moduleBarcode"></param>
- /// <returns></returns>
- private static int DeleteAndAddData(string moduleBarcode)
- {
- string sqlUpdate = @"delete from busbar_measure_fai where ModuleBarcode=?ModuleBarcode;
- insert into busbar_measure_fai (ModuleBarcode,EquipmentNo,ProcessEndTime,ProcessResult,MeasureDistance1,MeasureDistance2,MeasureDistance3)
- values (?ModuleBarcode,?EquipmentNo,?ProcessEndTime,?ProcessResult,?MeasureDistance1,?MeasureDistance2,?MeasureDistance3)";
- Dictionary<string, object> dict = new Dictionary<string, object>();
- dict.Add("ModuleBarcode", moduleBarcode);
- dict.Add("EquipmentNo", "XX焊接");
- dict.Add("ProcessEndTime", DateTime.Now);
- dict.Add("ProcessResult", "OK");
- dict.Add("MeasureDistance1", 1.1F);
- dict.Add("MeasureDistance2", 2.2F);
- dict.Add("MeasureDistance3", 3.3F);
- return dbUtility.UpdateData(sqlUpdate, dict);
- }
-
- /// <summary>
- /// 更新数据
- /// </summary>
- public static void ModifyData()
- {
- Random random = new Random(Guid.NewGuid().GetHashCode());
- int affectRow = DeleteAndAddData($"M{random.Next(1, 100).ToString("D2")}");
- Console.WriteLine($"更新当前数据,受影响的行数:【{affectRow}】");
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。