有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

如何在java中实现ketama算法?

我们正在使用nginx作为负载平衡器,并基于url的第一部分使用consistent hash来分配负载。这个很好用。对/abc/的请求总是发送到同一个节点

在我的节点上,我也想实现这个算法。(需要让一些作业在这些节点上运行,并使用一些预加载的缓存元素)

所以我需要nginx ketama算法的Java实现

我发现了一些:

这些算法都不能产生与nginx相同的结果。这可能是因为nginx文档中描述了一些ketama点,或者散列算法中存在一些差异

我在这里找到了nginx哈希算法: https://github.com/nginx/nginx/blob/53d655f89407af017cd193fd4d8d82118c9c2c80/src/stream/ngx_stream_upstream_hash_module.c#L279

当我比较两者时,我发现java使用的是md5,nginx使用的是crc32,还有很多其他的区别

java中有兼容的实现吗?或者是否有关于ketama算法的明确文件


共 (1) 个答案

  1. # 1 楼答案

    这个代码对我有用。它不是防弹的,因为它不考虑重量或其他因素,如删除节点。如果您喜欢使用它,代码是公共域

    package org.kicktipp.ketama;
    
    import java.nio.ByteBuffer;
    import java.nio.charset.StandardCharsets;
    import java.util.Arrays;
    import java.util.List;
    import java.util.SortedMap;
    import java.util.TreeMap;
    import java.util.zip.CRC32;
    
    public class NginxKetamaConsistentHash
    {
        private final int                       KETAMA_POINTS   = 160;
        private final SortedMap<Long, String>   circle          = new TreeMap<>();
    
        public NginxKetamaConsistentHash ( List<String> nodes )
        {
            for (String node : nodes)
            {
                addNode(node);
            }
        }
    
        public String get ( String key )
        {
            if (circle.isEmpty())
            {
                return null;
            }
            long hash = getCrc32(key);
            if (!circle.containsKey(hash))
            {
                SortedMap<Long, String> tailMap = circle.tailMap(hash);
                hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
            }
            return circle.get(hash);
        }
    
        private void addNode ( String node )
        {
            String server = node;
            String port = "";
            if (node.contains(":"))
            {
                String[] split = node.split(":", 2);
                server = split[0];
                port = split[1];
            }
            byte[] prevHash = new byte[4];
            for (int i = 0; i < KETAMA_POINTS; i++)
            {
                CRC32 hash = getBaseHash(server, port);
                hash.update(prevHash);
                long value = hash.getValue();
                circle.put(value, node);
                prevHash = getPrevHash(value);
            }
        }
    
        private byte[] getPrevHash ( long value )
        {
            byte[] array = Arrays.copyOfRange(longToBytes(value), 4, 8);
            for (int i = 0; i < array.length / 2; i++)
            {
                byte temp = array[i];
                array[i] = array[array.length - i - 1];
                array[array.length - i - 1] = temp;
            }
            return array;
        }
    
        private CRC32 getBaseHash ( String server, String port )
        {
            CRC32 baseHash = new CRC32();
            baseHash.update(server.getBytes(StandardCharsets.US_ASCII));
            baseHash.update("\0".getBytes(StandardCharsets.US_ASCII));
            baseHash.update(port.getBytes(StandardCharsets.US_ASCII));
            return baseHash;
        }
    
        private byte[] longToBytes ( long x )
        {
            ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
            buffer.putLong(x);
            return buffer.array();
        }
    
        private long getCrc32 ( final String k )
        {
            CRC32 hash = new CRC32();
            hash.update(k.getBytes());
            return hash.getValue();
        }
    }