实例介绍
【实例简介】
1、本opc驱动采用订阅模式来读写OPC
2、通过订阅MQTT来接收外部命令以及将读到的数据写出
【核心代码】
using Newtonsoft.Json; using OPCAutomation; using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Threading.Tasks; using uPLibrary.Networking.M2Mqtt; using uPLibrary.Networking.M2Mqtt.Messages; namespace OpcDriverTest { class OpcService { MqttClient mqttClient; OPCServer opcServer; OPCGroups opcGroups; List<OPCItem> opcItemList; OPCGroup opcGroup; Dictionary<string,int> opcTagDictionary=new Dictionary<string, int>(); string[] opcTags; public OpcService() { startMqtt(); startOpc(); } private void startMqtt() { //获取MQTT配置 string url = ConfigurationSettings.AppSettings["mqtt.url"].ToString(); string clientId = ConfigurationSettings.AppSettings["mqtt.clientId"].ToString(); int port = Convert.ToInt32(ConfigurationSettings.AppSettings["mqtt.port"]); string receiveTopic = ConfigurationSettings.AppSettings["mqtt.topic.opc.receive"].ToString(); mqttClient = new MqttClient(url); mqttClient.MqttMsgPublishReceived = client_MqttMsgPublishReceived; //连接MQTT mqttClient.Connect(clientId); //订阅主题,用来接收上层发来的命令 mqttClient.Subscribe(new string[] { receiveTopic }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE }); } private void startOpc() { string opcServerName = ConfigurationSettings.AppSettings["opc.serverName"].ToString(); opcServer = new OPCServer(); connectOPC("", opcServerName); //配置组 opcGroups = opcServer.OPCGroups; opcGroups.DefaultGroupIsActive = true; opcGroups.DefaultGroupDeadband = 0; opcGroups.DefaultGroupUpdateRate = 200; //获取配置的标记 string tags = ConfigurationSettings.AppSettings["opc.tags"].ToString(); string group = ConfigurationSettings.AppSettings["opc.group".ToString()]; opcTags= tags.Split(','); //转成Dictionary,方便查询用 for (int i = 0; i < opcTags.Length; i ) { opcTagDictionary.Add(opcTags[i], i); } List<string> itemList = new List<string>(); addListenGroup(group, opcTags); } private bool connectOPC(string remoteServerName, string remoteServerIP) { try { opcServer.Connect(remoteServerIP, remoteServerName);//连接本地服务器:服务器名 主机名或IP if (opcServer.ServerState == (int)OPCServerState.OPCRunning) { Console.WriteLine("已连接到:{0}", opcServer.ServerName); } else { //这里你可以根据返回的状态来自定义显示信息,请查看自动化接口API文档 Console.WriteLine("状态:{0}", opcServer.ServerState.ToString()); } opcServer.ServerShutDown = ServerShutDown;//服务器断开事件 } catch (Exception err) { Console.WriteLine("连接远程服务器出现错误:{0}" err.Message); return false; } return true; } private void ServerShutDown(string Reason) { Console.WriteLine("OPC服务器已经先行断开!"); } //注册监听opc item public void addListenGroup(string group, string[] itemList) { opcGroup = opcGroups.Add(group); opcGroup.UpdateRate = 100; opcGroup.IsSubscribed = true; opcGroup.DataChange = new DIOPCGroupEvent_DataChangeEventHandler(OpcGroup_DataChange); opcGroup.AsyncWriteComplete = OpcGroup_AsyncWriteComplete; OPCItems opcItems = opcGroup.OPCItems; opcItemList = new List<OPCItem>(); for (int i = 1; i <= itemList.Length; i ) { string itemName = itemList[i - 1]; int clientHande = i; OPCItem item= opcItems.AddItem(itemName, clientHande); opcItemList.Add(item); } } //opc 数据变化事件 private void OpcGroup_DataChange(int TransactionID, int NumItems, ref Array ClientHandles, ref Array ItemValues, ref Array Qualities, ref Array TimeStamps) { Dictionary<string, object> param = new Dictionary<string, object>(); for (int i = 1; i <= NumItems; i ) { Console.WriteLine( "Tran:" TransactionID.ToString() " CH:" ClientHandles.GetValue(i).ToString() " value:" ItemValues.GetValue(i) " timestamps:" TimeStamps.GetValue(i).ToString()); int index = (int)ClientHandles.GetValue(i)-1; //得到数据的index,通过Index得知是哪个数据 param.Add(opcTags[index], ItemValues.GetValue(i)); //opcAsyncWrite(opcItemList[1], ItemValues.GetValue(i).ToString()); //异步写 //opcItemList[1].Write(ItemValues.GetValue(i).ToString()); //同步写 } //写入mqtt string publishTopic = ConfigurationSettings.AppSettings["mqtt.topic.opc.publish"].ToString(); string json = JsonConvert.SerializeObject(param); mqttClient.Publish(publishTopic, Encoding.UTF8.GetBytes(json)); } //单个写 private void opcAsyncWrite(OPCItem item,string value) { int[] temp = new int[2] { 0, item.ServerHandle }; Array serverHandles = (Array)temp; object[] valueTemp = new object[2] { "", value }; Array values = (Array)valueTemp; Array Errors; int cancelID; opcGroup.AsyncWrite(1, ref serverHandles, ref values, out Errors, 2009, out cancelID); } //单个写(无法批量写) private void opcAsyncWrite(Array serverHandles, Array values) { Array Errors; int cancelID; opcGroup.AsyncWrite(1, ref serverHandles, ref values, out Errors, 2009, out cancelID); } //opc异步写回调消息 private void OpcGroup_AsyncWriteComplete(int TransactionID, int NumItems, ref Array ClientHandles, ref Array Errors) { for (int i = 1; i <= NumItems; i ) { //lblState.Text = "Tran:" TransactionID.ToString() " CH:" ClientHandles.GetValue(i).ToString() " Error:" Errors.GetValue(i).ToString(); Console.WriteLine("Tran:" TransactionID.ToString() " CH:" ClientHandles.GetValue(i).ToString() " Error:" Errors.GetValue(i).ToString()); } } //收到MQTT消息 private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e) { try { string message= Encoding.UTF8.GetString(e.Message); Dictionary<string, object> param = JsonConvert.DeserializeObject<Dictionary<string, object>>(message); foreach (var item in param) { int[] sererHandles = new int[param.Count 1]; object[] values = new object[param.Count 1]; //获取opc item序号 try { int index = opcTagDictionary[item.Key]; //获取opc item sererHandles.SetValue(opcItemList[index].ServerHandle, 1);//opc是从1号开始 values.SetValue(item.Value, 1); opcAsyncWrite(sererHandles, values); } catch (Exception ex ) { Console.WriteLine("异常:未在点表中找到" item.Key ",已被忽略"); } } } catch (Exception e1) { Console.WriteLine(e1.Message); } } } }
小贴士
感谢您为本站写下的评论,您的评论对其它用户来说具有重要的参考价值,所以请认真填写。
- 类似“顶”、“沙发”之类没有营养的文字,对勤劳贡献的楼主来说是令人沮丧的反馈信息。
- 相信您也不想看到一排文字/表情墙,所以请不要反馈意义不大的重复字符,也请尽量不要纯表情的回复。
- 提问之前请再仔细看一遍楼主的说明,或许是您遗漏了。
- 请勿到处挖坑绊人、招贴广告。既占空间让人厌烦,又没人会搭理,于人于己都无利。
关于好例子网
本站旨在为广大IT学习爱好者提供一个非营利性互相学习交流分享平台。本站所有资源都可以被免费获取学习研究。本站资源来自网友分享,对搜索内容的合法性不具有预见性、识别性、控制性,仅供学习研究,请务必在下载后24小时内给予删除,不得用于其他任何用途,否则后果自负。基于互联网的特殊性,平台无法对用户传输的作品、信息、内容的权属或合法性、安全性、合规性、真实性、科学性、完整权、有效性等进行实质审查;无论平台是否已进行审查,用户均应自行承担因其传输的作品、信息、内容而可能或已经产生的侵权或权属纠纷等法律责任。本站所有资源不代表本站的观点或立场,基于网友分享,根据中国法律《信息网络传播权保护条例》第二十二与二十三条之规定,若资源存在侵权或相关问题请联系本站客服人员,点此联系我们。关于更多版权及免责申明参见 版权及免责申明
网友评论
我要评论