通过WebSocket连接同步集合

亚历克斯

目前,我正在开发一个客户端-服务器系统,并且正在尝试获取一个集合以在WebSocket之间进行同步。一切都在C#+ .Net 4.5中,我想知道是否存在通过websocket同步数据的最佳实践。这是一种同步方式:

服务器:BindingCollection <MyClass> ----- Websocket ----->客户端:BindingCollection <MyClass>

该集合最多可以包含1000个对象,每个对象具有20个字段,因此每次发送全部批次似乎有点浪费。

乌菲

我将使用观察者模式,仅发送更改的对象进行同步。

因此,我终于花时间写了一个小例子。我正在使用一个内存通用仓库,该仓库在更改时调用事件。然后将更改发送给所有客户端,这样您就不必发送完整的列表/集合。

监控简单模型

using System;

namespace SynchronizingCollection.Common.Model
{
    public class MyModel
    {
        public Guid Id { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
    }
}

通用存储库

注意添加/更新/删除某些东西时调用的事件OnChange。该事件在XSockets长时间运行的控制器(单例)中被“订阅”。请参见“ RepoMonitor”类

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;

namespace SynchronizingCollection.Server.Repository
{
    /// <summary>
    /// A static generic thread-safe repository for in-memory storage
    /// </summary>
    /// <typeparam name="TK">Key Type</typeparam>
    /// <typeparam name="T">Value Type</typeparam>
    public static class Repository<TK, T>
    {
        /// <summary>
        /// When something changes
        /// </summary>
        public static event EventHandler<OnChangedArgs<TK,T>> OnChange;

        private static ConcurrentDictionary<TK, T> Container { get; set; }

        static Repository()
        {
            Container = new ConcurrentDictionary<TK, T>();
        }

        /// <summary>
        /// Adds or updates the entity T with key TK
        /// </summary>
        /// <param name="key"></param>
        /// <param name="entity"></param>
        /// <returns></returns>
        public static T AddOrUpdate(TK key, T entity)
        {
            var obj = Container.AddOrUpdate(key, entity, (s, o) => entity);
            if(OnChange != null)
                OnChange.Invoke(null,new OnChangedArgs<TK, T>(){Key = key,Value = entity, Operation =  Operation.AddUpdate});
            return obj;
        }

        /// <summary>
        /// Removes the entity T with key TK
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static bool Remove(TK key)
        {
            T entity;
            var result = Container.TryRemove(key, out entity);
            if (result)
            {
                if (OnChange != null)
                    OnChange.Invoke(null, new OnChangedArgs<TK, T>() { Key = key, Value = entity, Operation = Operation.Remove});
            }
            return result;
        }

        /// <summary>
        /// Removes all entities matching the expression f
        /// </summary>
        /// <param name="f"></param>
        /// <returns></returns>
        public static int Remove(Func<T, bool> f)
        {
            return FindWithKeys(f).Count(o => Remove(o.Key));
        }        

        /// <summary>
        /// Find all entities T matching the expression f
        /// </summary>
        /// <param name="f"></param>
        /// <returns></returns>
        public static IEnumerable<T> Find(Func<T, bool> f)
        {
            return Container.Values.Where(f);
        }

        /// <summary>
        /// Find all entities T matching the expression f and returns a Dictionary TK,T
        /// </summary>
        /// <param name="f"></param>
        /// <returns></returns>
        public static IDictionary<TK, T> FindWithKeys(Func<T, bool> f)
        {
            var y = from x in Container
                    where f.Invoke(x.Value)
                    select x;
            return y.ToDictionary(x => x.Key, x => x.Value);
        }

        /// <summary>
        /// Returns all entities as a Dictionary TK,T
        /// </summary>
        /// <returns></returns>
        public static IDictionary<TK, T> GetAllWithKeys()
        {
            return Container;
        }

        /// <summary>
        /// Returns all entities T from the repository
        /// </summary>
        /// <returns></returns>
        public static IEnumerable<T> GetAll()
        {
            return Container.Values;
        }

        /// <summary>
        /// Get a single entity T with the key TK
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static T GetById(TK key)
        {
            return Container.ContainsKey(key) ? Container[key] : default(T);
        }

        /// <summary>
        /// Get a single entity T as a KeyValuePair TK,T with the key TK
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static KeyValuePair<TK, T> GetByIdWithKey(TK key)
        {
            return Container.ContainsKey(key) ? new KeyValuePair<TK, T>(key, Container[key]) : new KeyValuePair<TK, T>(key, default(T));
        }

        /// <summary>
        /// Checks if the repository has a key TK
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static bool ContainsKey(TK key)
        {
            return Container.ContainsKey(key);
        }
    }
}

事件参数和一个枚举,用于了解发生了什么变化

using System;

namespace SynchronizingCollection.Server.Repository
{
    /// <summary>
    /// To send changes in the repo
    /// </summary>
    /// <typeparam name="TK"></typeparam>
    /// <typeparam name="T"></typeparam>
    public class OnChangedArgs<TK,T> : EventArgs
    {
        public Operation Operation { get; set; }
        public TK Key { get; set; }
        public T Value { get; set; }
    }
}

namespace SynchronizingCollection.Server.Repository
{
    /// <summary>
    /// What kind of change was performed
    /// </summary>
    public enum Operation
    {
        AddUpdate,
        Remove
    }
}

将更改发送到客户端的控制器...

using System;
using SynchronizingCollection.Common.Model;
using SynchronizingCollection.Server.Repository;
using XSockets.Core.XSocket;
using XSockets.Core.XSocket.Helpers;
using XSockets.Plugin.Framework;
using XSockets.Plugin.Framework.Attributes;

namespace SynchronizingCollection.Server
{
    /// <summary>
    /// Long running controller that will send information to clients about the collection changes
    /// </summary>
    [XSocketMetadata(PluginRange = PluginRange.Internal, PluginAlias = "RepoMonitor")]
    public class RepositoryMonitor : XSocketController
    {
        public RepositoryMonitor()
        {
            Repository<Guid, MyModel>.OnChange += RepositoryOnChanged;
        }

        private void RepositoryOnChanged(object sender, OnChangedArgs<Guid, MyModel> e)
        {
            switch (e.Operation)
            {
                case Operation.Remove:
                    this.InvokeTo<Demo>(p => p.SendUpdates, e.Value,"removed");
                break;                    
                case Operation.AddUpdate:
                    this.InvokeTo<Demo>(p => p.SendUpdates, e.Value, "addorupdated");
                break;                    
            }
        }       
    }
}

客户端调用以添加/删除/更新集合的XSockets控制器。

using System;
using SynchronizingCollection.Common.Model;
using SynchronizingCollection.Server.Repository;
using XSockets.Core.XSocket;

namespace SynchronizingCollection.Server
{
    public class Demo : XSocketController
    {
        public bool SendUpdates { get; set; }

        public Demo()
        {
            //By default all clients get updates
            SendUpdates = true;
        }

        public void AddOrUpdateModel(MyModel model)
        {
            Repository<Guid, MyModel>.AddOrUpdate(model.Id, model);
        }

        public void RemoveModel(MyModel model)
        {
            Repository<Guid, MyModel>.Remove(model.Id);
        }        
    }
}

还有一个用C#编写的演示客户端,它添加和删除了10个不同的对象...但是使用JavaScript API也很容易。尤其是与用于在客户端上操纵集合的kickoutjs。

using System;
using System.Threading;
using SynchronizingCollection.Common.Model;
using XSockets.Client40;

namespace SynchronizingCollection.Client
{
    class Program
    {
        static void Main(string[] args)
        {
            var c = new XSocketClient("ws://127.0.0.1:4502","http://localhost","demo");

            c.Controller("demo").OnOpen += (sender, connectArgs) => Console.WriteLine("Demo OPEN");

            c.Controller("demo").On<MyModel>("addorupdated", model => Console.WriteLine("Updated " + model.Name));
            c.Controller("demo").On<MyModel>("removed", model => Console.WriteLine("Removed " + model.Name));

            c.Open();

            for (var i = 0; i < 10; i++)
            {
                var m = new MyModel() {Id = Guid.NewGuid(), Name = "Person Nr" + i, Age = i};
                c.Controller("demo").Invoke("AddOrUpdateModel", m);

                Thread.Sleep(2000);

                c.Controller("demo").Invoke("RemoveModel", m);
                Thread.Sleep(2000);
            }

            Console.ReadLine();
        }
    }
}

您可以从我的保管箱下载该项目:https ://www.dropbox.com/s/5ljbedovx6ufkww/SynchronizingCollection.zip ? dl =0

问候乌菲

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

通过WebSocket连接的Spring SseEmitter

来自分类Dev

通过WebSocket连接的Spring SseEmitter

来自分类Dev

通过查找邻域点来连接点的集合

来自分类Dev

Websocket通过Amazon ELB的连接错误

来自分类Dev

strophe无法通过websocket连接openfire

来自分类Dev

客户端 websocket 通过代理连接

来自分类Dev

WebSocket 通过 ws 未连接但 wss 是

来自分类Dev

从 Starlette 中的同步迭代器通过 websocket 发送数据

来自分类Dev

通过R连接时无法在mongo DB中看到集合

来自分类Dev

通过LAN IP地址连接WebSocket服务器

来自分类Dev

仅允许通过一个URL进行websocket连接

来自分类Dev

通过 Caddy 保护最重要的连接,但 websocket 不起作用

来自分类Dev

通过管道而不是 websocket 连接 Puppeteer 的优缺点是什么

来自分类Dev

通过Kubernetes的Azure Websocket连接,使用代码1006断开许多连接

来自分类Dev

Java中的同步嵌套集合

来自分类Dev

平滑滑块自适应高度取消通过asNavFor连接的滑块的同步

来自分类Dev

通过与Exchange服务器进行主动同步来连接Android

来自分类Dev

android同步连接服务

来自分类Dev

同步连接到mongodb

来自分类Dev

未捕获的SecurityError:无法构造“ WebSocket”:可能无法通过HTTPS加载的页面启动不安全的WebSocket连接

来自分类Dev

无法通过类变量连接到MongoDB数据库和集合

来自分类Dev

使用Mongo-Hadoop连接器通过Apache Spark更新MongoDb中的集合

来自分类Dev

无法通过类变量连接到MongoDB数据库和集合

来自分类Dev

通过FTP同步文件

来自分类Dev

通过reflex-dom中的websocket检测到关闭的服务器连接?

来自分类Dev

Python客户端与socket.io v1.0建立连接并通过websocket触发事件

来自分类Dev

Apollo-server-express内省功能已禁用,但仍然可以通过websocket连接

来自分类Dev

Safari 5.0通过Nginx https代理连接时的Websocket位置不匹配

来自分类Dev

如果websocket使用TCP连接,通过它们进行的通讯是否和TCP一样慢?

Related 相关文章

  1. 1

    通过WebSocket连接的Spring SseEmitter

  2. 2

    通过WebSocket连接的Spring SseEmitter

  3. 3

    通过查找邻域点来连接点的集合

  4. 4

    Websocket通过Amazon ELB的连接错误

  5. 5

    strophe无法通过websocket连接openfire

  6. 6

    客户端 websocket 通过代理连接

  7. 7

    WebSocket 通过 ws 未连接但 wss 是

  8. 8

    从 Starlette 中的同步迭代器通过 websocket 发送数据

  9. 9

    通过R连接时无法在mongo DB中看到集合

  10. 10

    通过LAN IP地址连接WebSocket服务器

  11. 11

    仅允许通过一个URL进行websocket连接

  12. 12

    通过 Caddy 保护最重要的连接,但 websocket 不起作用

  13. 13

    通过管道而不是 websocket 连接 Puppeteer 的优缺点是什么

  14. 14

    通过Kubernetes的Azure Websocket连接,使用代码1006断开许多连接

  15. 15

    Java中的同步嵌套集合

  16. 16

    平滑滑块自适应高度取消通过asNavFor连接的滑块的同步

  17. 17

    通过与Exchange服务器进行主动同步来连接Android

  18. 18

    android同步连接服务

  19. 19

    同步连接到mongodb

  20. 20

    未捕获的SecurityError:无法构造“ WebSocket”:可能无法通过HTTPS加载的页面启动不安全的WebSocket连接

  21. 21

    无法通过类变量连接到MongoDB数据库和集合

  22. 22

    使用Mongo-Hadoop连接器通过Apache Spark更新MongoDb中的集合

  23. 23

    无法通过类变量连接到MongoDB数据库和集合

  24. 24

    通过FTP同步文件

  25. 25

    通过reflex-dom中的websocket检测到关闭的服务器连接?

  26. 26

    Python客户端与socket.io v1.0建立连接并通过websocket触发事件

  27. 27

    Apollo-server-express内省功能已禁用,但仍然可以通过websocket连接

  28. 28

    Safari 5.0通过Nginx https代理连接时的Websocket位置不匹配

  29. 29

    如果websocket使用TCP连接,通过它们进行的通讯是否和TCP一样慢?

热门标签

归档