概述
- package com.bubble.cluster;
- import java.net.InetSocketAddress;
- import java.util.List;
- import java.util.Random;
- import java.util.concurrent.Executors;
- import org.I0Itec.zkclient.IZkChildListener;
- import org.I0Itec.zkclient.ZkClient;
- import org.jboss.netty.bootstrap.ClientBootstrap;
- import org.jboss.netty.channel.ChannelFactory;
- import org.jboss.netty.channel.ChannelPipeline;
- import org.jboss.netty.channel.ChannelPipelineFactory;
- import org.jboss.netty.channel.Channels;
- import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
- import org.jboss.netty.handler.codec.string.StringDecoder;
- import org.jboss.netty.handler.codec.string.StringEncoder;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-10
- */
- public class Client extends ClusterClient {
- private static String appServer;
- private static String zkServer = "127.0.0.1:2181";
- private static ClientBootstrap bootstrap;
- private static Client client;
- public static void main(String[] args) throws Exception {
- ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
- bootstrap = new ClientBootstrap(factory);
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() {
- ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("encode", new StringEncoder());
- pipeline.addLast("decode", new StringDecoder());
- pipeline.addLast("handler", new DemoHandler());
- return pipeline;
- }
- });
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
- client=new Client();
- ZkClient zkClient = new ZkClient(zkServer);
- client.connect(zkClient);
- client.failOver();
- }
- @Override
- public void connect(ZkClient zkClient) {
- while (true) {
- try {
- RoundRobinLoadBalance loadBlance = new RoundRobinLoadBalance();
- //loadBlance.SetClient("127.0.0.1:"+new Random().nextInt(1000));
- String server = loadBlance.select(zkServer);
- if (server != null) {
- String ip = server.split(":")[0];
- int port = Integer.parseInt(server.split(":")[1]);
- appServer = server;
- System.out.println(server);
- bootstrap.connect(new InetSocketAddress(ip, port));
- client.setZkClient(zkClient);
- client.join( "127.0.0.1:"+new Random().nextInt(5000));
- ZookeeperConnStatistic.incrementConn(zkServer, appServer);
- break;
- }
- Thread.sleep(1000);
- } catch (Exception e) {
- e.printStackTrace();
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- }
- connect(zkClient);
- }
- }
- }<pre name="code" class="html">package com.bubble.cluster;
- import java.util.List;
- import org.I0Itec.zkclient.IZkChildListener;
- import org.I0Itec.zkclient.ZkClient;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-11
- */
- public abstract class ClusterClient {
- public abstract void connect(ZkClient zkClient);
- public abstract String getAPPServer();
- public void setZkClient(ZkClient zkClient){
- this.zkClient=zkClient;
- }
- private ZkClient zkClient;
- public void failOver() {
- zkClient.subscribeChildChanges(Constant.root, new IZkChildListener() {
- @Override
- public void handleChildChange(String parentPath, List currentChilds) throws Exception {
- boolean has = false;
- for (int i = 0; i < currentChilds.size(); i++) {
- if (getAPPServer().equals(currentChilds.get(i))) {
- has = true;
- break;
- }
- }
- if (!has) {
- connect(zkClient);
- }
- }
- });
- }
- public void join(String client){
- if(!zkClient.exists(Constant.client)){
- zkClient.createPersistent(Constant.client);
- }
- if(!zkClient.exists(Constant.client+"/"+client)){
- zkClient.createEphemeral(Constant.client+"/"+client);
- }
- }
- public void leave(String client){
- if(zkClient.exists(Constant.client+"/"+client)){
- zkClient.delete(Constant.client+"/"+client);
- }
- zkClient.close();
- }
- }</pre><br>
- <pre name="code" class="html">package com.bubble.cluster;
- import java.io.UnsupportedEncodingException;
- import java.security.MessageDigest;
- import java.security.NoSuchAlgorithmException;
- import java.util.List;
- import org.I0Itec.zkclient.ZkClient;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-11
- */
- public class ConsistentHashLoadBalance implements LoadBlance {
- private String client;
- public void SetClient(String client){
- this.client=client;
- }
- @Override
- public String select(String zkServer) {
- ZkClient zkClient = new ZkClient(zkServer);
- List<String> serverList = zkClient.getChildren(Constant.root);
- ConsistentHashSelector selector=new ConsistentHashSelector(client,serverList);
- return selector.select();
- }
- private static final class ConsistentHashSelector {
- public ConsistentHashSelector(String client,List<String> appServer){
- this.client=client;
- this.appServer=appServer;
- }
- private String client;
- private List<String> appServer;
- public String select() {
- String key =client ;
- byte[] digest = md5(key);
- String server =appServer.get((int) hash(digest, 0));
- return server;
- }
- private long hash(byte[] digest, int number) {
- return (((long) (digest[3 + number * 4] & 0xFF) << 24)
- | ((long) (digest[2 + number * 4] & 0xFF) << 16)
- | ((long) (digest[1 + number * 4] & 0xFF) << 8)
- | (digest[0 + number * 4] & 0xFF))
- & 0xFFFFFFFFL;
- }
- private byte[] md5(String value) {
- MessageDigest md5;
- try {
- md5 = MessageDigest.getInstance("MD5");
- } catch (NoSuchAlgorithmException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- md5.reset();
- byte[] bytes = null;
- try {
- bytes = value.getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- md5.update(bytes);
- return md5.digest();
- }
- }
- }
- </pre><br>
- <pre name="code" class="html">package com.bubble.cluster;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-11
- */
- public class Constant {
- public static final String root="/cluster";
- public static final String round="/round";
- public static final String client="/client";
- public static final String route="/route";
- }
- </pre><br>
- <pre name="code" class="html">package com.bubble.cluster;
- import java.util.Date;
- import org.jboss.netty.channel.ChannelHandlerContext;
- import org.jboss.netty.channel.ChannelStateEvent;
- import org.jboss.netty.channel.MessageEvent;
- import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-10
- */
- public class DemoHandler extends SimpleChannelUpstreamHandler {
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- Thread.sleep(5000);
- System.out.println(e.getMessage());
- ctx.getChannel().write("bbb");
- }
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- e.getChannel().write("abcd");
- }
- }
- </pre><br>
- <pre name="code" class="html">package com.bubble.cluster;
- import java.util.List;
- import org.I0Itec.zkclient.ZkClient;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-11
- */
- public class LeastActiveLoadBalance implements LoadBlance {
- @Override
- public String select(String zkServer) {
- ZkClient zkClient = new ZkClient(zkServer);
- List<String> serverList = zkClient.getChildren(Constant.root);
- String tempServer = null;
- int tempConn = -1;
- for (int i = 0; i < serverList.size(); i++) {
- String server = serverList.get(i);
- if (zkClient.readData(Constant.root + "/" + server) != null) {
- int connNum = zkClient.readData(Constant.root + "/" + server);
- if (tempConn == -1) {
- tempServer = server;
- tempConn = connNum;
- }
- if (connNum < tempConn) {
- tempServer = server;
- tempConn = connNum;
- }
- }else{
- zkClient.close();
- return server;
- }
- }
- zkClient.close();
- if (tempServer != null && !tempServer.equals("")) {
- return tempServer;
- }
- return null;
- }
- }
- </pre><br>
- <pre name="code" class="html">package com.bubble.cluster;
- import java.util.List;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-11
- */
- public interface LoadBlance {
- String select(String zkServer);
- }
- </pre><br>
- <pre name="code" class="html">package com.bubble.cluster;
- import java.util.List;
- import java.util.Random;
- import org.I0Itec.zkclient.ZkClient;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-11
- */
- public class RandomLoadBalance implements LoadBlance {
- @Override
- public String select(String zkServer) {
- ZkClient zkClient = new ZkClient(zkServer);
- List<String> serverList = zkClient.getChildren(Constant.root);
- zkClient.close();
- Random r=new Random();
- if(serverList.size()>=1){
- String server=serverList.get(r.nextInt(serverList.size()));
- return server;
- }else{
- return null;
- }
- }
- }
- </pre><br>
- <pre name="code" class="html">package com.bubble.cluster;
- import java.util.List;
- import java.util.concurrent.atomic.AtomicInteger;
- import org.I0Itec.zkclient.ZkClient;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-11
- */
- public class RoundRobinLoadBalance implements LoadBlance {
- @Override
- public String select(String zkServer) {
- ZkClient zkClient = new ZkClient(zkServer);
- List<String> serverList = zkClient.getChildren(Constant.root);
- int round=0;
- if(!zkClient.exists(Constant.round)){
- zkClient.createPersistent(Constant.round);
- zkClient.writeData(Constant.round, 0);
- }else{
- round=(Integer)zkClient.readData(Constant.round);
- zkClient.writeData(Constant.round, ++round);
- }
- zkClient.close();
- if (serverList != null && serverList.size() > 0) {
- return serverList.get(round % serverList.size());
- } else {
- return null;
- }
- }
- }
- </pre><br>
- <pre name="code" class="html">package com.bubble.cluster;
- import java.net.InetSocketAddress;
- import java.util.concurrent.Executors;
- import org.I0Itec.zkclient.ZkClient;
- import org.apache.zookeeper.Watcher.Event.KeeperState;
- import org.jboss.netty.bootstrap.ServerBootstrap;
- import org.jboss.netty.channel.ChannelFactory;
- import org.jboss.netty.channel.ChannelPipeline;
- import org.jboss.netty.channel.ChannelPipelineFactory;
- import org.jboss.netty.channel.Channels;
- import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
- import org.jboss.netty.handler.codec.string.StringDecoder;
- import org.jboss.netty.handler.codec.string.StringEncoder;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-10
- */
- public class Server {
- /**
- * @param args
- */
- public static void main(String[] args) {
- ChannelFactory factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool());
- ServerBootstrap bootstrap = new ServerBootstrap (factory);
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() {
- ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("encode",new StringEncoder());
- pipeline.addLast("decode",new StringDecoder());
- pipeline.addLast("handler",new DemoHandler());
- return pipeline;
- }
- });
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.bind(new InetSocketAddress(8081));
- ClusterServer.join("127.0.0.1:8081", "127.0.0.1:2181");
- }
- }
- </pre><br>
- <pre name="code" class="html">package com.bubble.cluster;
- import java.util.List;
- import org.I0Itec.zkclient.ZkClient;
- /**
- * @author hxpwangyi@163.com
- * @date 2013-2-11
- */
- public class ZookeeperConnStatistic {
- public static void incrementConn(String zkServer,String appServer){
- ZkClient zkClient = new ZkClient(zkServer);
- List<String> serverList = zkClient.getChildren(Constant.root);
- for(int i=0;i<serverList.size();i++){
- String server=serverList.get(i);
- if(server.equals(appServer)){
- if(zkClient.readData(Constant.root+"/"+appServer)==null){
- zkClient.writeData(Constant.root+"/"+appServer, 1);
- }else{
- int conn=zkClient.readData(Constant.root+"/"+appServer);
- zkClient.writeData(Constant.root+"/"+appServer, ++conn);
- }
- break;
- }
- }
- zkClient.close();
- }
- public static int getNodeConn(String zkServer,String appServer){
- ZkClient zkClient = new ZkClient(zkServer);
- List<String> serverList = zkClient.getChildren(Constant.root);
- for(int i=0;i<serverList.size();i++){
- String server=serverList.get(i);
- if(server.equals(appServer)){
- int conn=zkClient.readData(Constant.root+"/"+appServer);
- zkClient.close();
- return conn;
- }
- }
- zkClient.close();
- return 0;
- }
- }
- </pre><br>
- @Overridepublic String getAPPServer() {return appServer;}}
- <pre></pre>
- <br>
最后
以上就是狂野冰淇淋为你收集整理的使用zookeeper实现集群和负载均衡的全部内容,希望文章能够帮你解决使用zookeeper实现集群和负载均衡所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复